Enabling Security

This site contains the documentation that is relevant to older WSO2 product versions and offerings.
For the latest WSO2 documentation, visit https://wso2.com/documentation/.

Enabling Security

Security is an important aspect today because cyber-attacks have become a common occurrence and the threat of data breaches is a reality for businesses of all sizes. Before version 0.9, Kafka did not support built-in security. Although it was possible to lock down access at the network level, that approach was not viable for a large shared multi-tenant cluster used across a large company.

There are a number of different ways to secure a Kafka cluster depending on your requirements.  Let's have a look at how to secure a Kafka cluster using Transport Layer Security (TLS) authentication.

For client/broker and inter-broker communication you need to do the following:

  • Use TLS or Kerberos authentication

  • Encrypt network traffic via TLS

  • Perform authorization via access control lists (ACLs)

Now let's take a look at how TLS authentication can be applied to Kafka brokers, producers and consumers.



Generating TLS keys and certificates

Before you start, you need to generate a key and certificate for each broker and client in the cluster. The common name (CN) of the broker certificate must match the fully qualified domain name (FQDN) of the server because the client compares the CN with the DNS domain name to ensure that it is connecting to the desired broker, instead of a malicious one.

Now that each broker has a public-private key pair and an unsigned certificate to identify itself, it is important for each certificate to be signed by a certificate authority (CA) to prevent forged certificates. As long as the CA is a genuine and trusted authority, the clients have high assurance that they are connecting to authentic brokers.

In contrast to the keystore, which stores each application’s identity, the truststore stores all the certificates that the application should trust. Importing a certificate into a truststore also means trusting all certificates that are signed by that certificate. This attribute is called the chain of trust, and it is particularly useful when deploying TLS on a large Kafka cluster. You can sign all certificates in the cluster with a single CA, and have all machines share the same truststore that contains the CA certificate. That way all machines can authenticate all other machines. A slightly more complex alternative is to use two CAs, one to sign brokers’ keys and another to sign clients’ keys.

Now Let's see how you can generate your own CA, which is simply a public-private key pair and certificate.  Then you can add the CA certificate to each client and broker’s truststore.

The following bash script generates the keystore and truststore for brokers (kafka.server.keystore.jks and kafka.server.truststore.jks) and clients (kafka.client.keystore.jks and kafka.client.truststore.jks):

createCertificates
#!/bin/bash PASSWORD=test1234 VALIDITY=365 keytool -keystore kafka.server.keystore.jks -alias localhost -validity $VALIDITY -genkey openssl req -new -x509 -keyout ca-key -out ca-cert -days $VALIDITY keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days $VALIDITY -CAcreateserial -passin pass:$PASSWORD keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed keytool -keystore kafka.client.keystore.jks -alias localhost -validity $VALIDITY -genkey keytool -keystore kafka.client.keystore.jks -alias localhost -certreq -file cert-file openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days $VALIDITY -CAcreateserial -passin pass:$PASSWORD keytool -keystore kafka.client.keystore.jks -alias CARoot -import -file ca-cert keytool -keystore kafka.client.keystore.jks -alias localhost -import -file cert-signed

Configuring TLS authentication for the Kafka broker

  • Configure the required security protocols and ports in the <KAFKA_HOME>/config/server.properties file.

    listeners=SSL://:9093

Now let's take a look at how to apply protocol-specific configuration settings.

  • Configure the following in the <KAFKA-HOME>/config/server.properties file:

    TLS configuration in Broker

    ssl.client.auth=required ssl.keystore.location={file-path}/kafka.server.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234 ssl.truststore.location={file-path}/kafka.server.truststore.jks ssl.truststore.password=test1234

    The above configuration should have the TLS client authentication and configuration key details as well as keystore and truststore details. Since you need to store passwords in the broker configuration, it is important to restrict access to the broker configuration via filesystem permission.

Configuring TLS authentication for Kafka clients/producers

Enabling TLS authentication for Kafka producers and consumers can be done by configuring a set of parameters. It does not require any code changes.

Kafka versions 0.9.0.0 and above support TLS. The older APIs do not support TLS.

TLS

The parameters you need to specify to support TLS is the same for both producers and consumers. It is required to specify the security protocol as well as the truststore and keystore information since you are using mutual authentication:

Console Clients

The client configuration can slightly differ depending on whether you want the client to use TLS or SASL/Kerberos.

  • Use the following configuration to create a producer that sends messages to the broker.

    Proxy with Kafka Security

    <proxy xmlns="http://ws.apache.org/ns/synapse" name="testKafka" startOnLoad="true" statistics="disable" trace="disable" transports="http,https"> <target> <inSequence> <kafkaTransport.init> <bootstrapServers>localhost:9093</bootstrapServers> <keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass> <valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass> <securityProtocol>SSL</securityProtocol> <sslTruststoreLocation>{file-path}/kafka.client.truststore.jks</sslTruststoreLocation> <sslTruststorePassword>test1234</sslTruststorePassword> <sslKeystoreLocation>{file-path}/kafka.client.keystore.jks</sslKeystoreLocation> <sslKeystorePassword>test1234</sslKeystorePassword> <sslKeyPassword>test1234</sslKeyPassword> </kafkaTransport.init> <kafkaTransport.publishMessages> <topic>test</topic> </kafkaTransport.publishMessages> </inSequence> </target> <description/> </proxy>

The console producer is a convenient way to send a small amount of data to the broker.

Configuring TLS authentication for the Kafka consumer

The console consumer is a convenient way to consume messages. You can either use the console consumer or the Kafka inbound endpoint to consume messages.

  • Execute the following command on the terminal to start the consumer with security:

    command to start the cosumer

    bin/kafka-console-consumer --bootstrap-server localhost:9093 --topic test --new-consumer --from-beginning --consumer.config {file-path}/consumer_ssl.properties

    Ensure that you Include the following configuration to enable security.

    consumer_ssl.properties

    bootstrap.servers=localhost:9093 security.protocol=SSL ssl.truststore.location={file-path}/kafka.client.truststore.jks ssl.truststore.password=test1234 ssl.keystore.location={file-path}/kafka.client.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234

Now that you have applied TLS authentication to Kafka brokers, producers and consumers, let's 

Analyzing the output

You will see the following output on the consumer console:

{“test”:”wso2”} {“test”:”wso2”} {“test”:”wso2"}