Procedure

  1. Stop all zookeepers in the cluster.
    1. Run zookeeper-server-stop.sh under <KAFKA_HOME>/bin directory to stop kafka server. If it doesn't work, then run step b & c.
    2. Get the pid by running the command "netstat -tnlup | grep 2181"
    3. kill -9 <pid>
  2. Delete the topic by executing below command from any one cluster node (Optional):
    <KAFKA_HOME>/bin/kafka-topics.sh --delete --topic <Topic Name> --zookeeper <KAFKA_CLUSTER_HOST1_IPADDRESS>:2181 <KAFKA_CLUSTER_HOST2_IPADDRESS>:2181 <KAFKA_CLUSTER_HOST3_IPADDRESS>::2181
  3. Create Kafka Keystore and Truststore on all nodes in the cluster by executing below command:
    1. Create CA Cert and CA Key by executing below command on any linux machine.

      openssl req -new-x509 -keyout ca-key -out ca-cert -days365-subj'/CN=<fqdn>'-extensions san -config <(echo'[req]'; echo'distinguished_name=req'; echo'[san]'; echo 'subjectAltName = DNS:localhost, IP:127.0.0.1, DNS:<hostname>, IP:<ip-address>')

    2. Enter the passphrase(changeit) when prompted. Make a note of this passphrase.
    3. Copy ca-key and ca-cert files to any location on all 3 cluster nodes.
    4. Create server keystore file by executing below command on all 3 cluster nodes:
      Change directory to <KAFKA_HOME>/config
      <JREHome>/bin/keytool -keystore server.keystore.jks -alias <alias> -validity 365 -genkey -keyalg RSA –ext SAN=DNS:<hostname>,DNS:<fqdn>,DNS:localhost,IP:<IP-ADDRESS>,IP:127.0.0.1
    5. Import ca-cert file into server truststore on all 3 cluster nodes by executing below command:

      <JREHome>/bin/keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert

    6. Import ca-cert file into client truststore on all 3 cluster nodes by executing below command:

      <JREHome>/bin/keytool -keystore client.truststore.jks -alias CARoot -import-file ca-cert

    7. Export the unsigned certificate from server keystore by executing below command on all 3 cluster nodes:

      <JREHome>/bin/keytool -keystore server.keystore.jks -alias <alias> -certreq -file cert-file -ext SAN=DNS:<hostname>,DNS:localhost,IP:<ip-address >,IP:127.0.0.1

    8. Sign the unsigned certificate cert-file using CA certificate & key ca-cert and ca-key on all 3 cluster nodes by executing below command:

      openssl x509 -req -extfile <(printf"subjectAltName = DNS:localhost, IP:127.0.0.1,DNS:<fqdn>, IP:<ip-address>") -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days365-CAcreateserial -passin pass:<password>

    9. Import ca-cert file into server keystore on all 3 cluster nodes by running below command:

      <JREHome>/bin/keytool -keystore server.keystore.jks -alias CARoot -import-file ca-cert

    10. Import signed cert file cert-signed into server keystore on all 3 cluster nodes by running below command:

      <JREHome>/bin/keytool -keystore server.keystore.jks -alias <alias> -import-file cert-signed

  4. Edit server.properties file under <KAFKA_HOME>/config directory and add/edit the following configurations:
    security.inter.broker.protocol=SASL_SSL
    listeners= SASL_SSL//<KAFKA_CLUSTER_HOST_IPADDRESS>:9092
    advertised.listeners=SASL_SSL//<KAFKA_CLUSTER_HOST_IPADDRESS>:9092
    ssl.keystore.location=<KAFKA_HOME>/config/kafka.server.keystore.jks
    ssl.keystore.password=changeit
    ssl.key.password=changeit
    ssl.truststore.location=<KAFKA_HOME>/config/kafka.server.truststore.jks
    ssl.truststore.password=changeit
  5. Edit consumer.properties file under <KAFKA_HOME>/config directory and add/edit the following configurations:
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    ssl.truststore.location=<KAFKA_HOME>/config/kafka.client.truststore.jks
    ssl.truststore.password=changeit
  6. Edit producer.properties file under <KAFKA_HOME>/config directory and add/edit the following configurations:
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret";
    ssl.truststore.location= <KAFKA_HOME>/config/client.truststore.jks
    ssl.truststore.password=<password>
  7. Start Kafka broker on all 3 servers.
  8. Create the topic (If the topic is not deleted in step 2).