There are many ways to push external data into Kafka. This example shows you how you can push a csv file to Kafka in the correct format.

The Script

The following script converts csv to JSON and then pushes the JSON to Kafka. It needs to be saved into a file called csvToKafka.sh and made runnable using chmod +x csvToKafka.sh.
#!/bin/bash
#
# Copyright © 2020 VMware, Inc. All rights reserved.
 
 
CSV_FILE=$1
KAFKA_ADDRESS=$2
KAFKA_TOPIC=$3
 
if [ ! -f "$CSV_FILE" ]; then
    echo "The specified csv file does not exist"
    exit 1
fi
 
if [ -z "$KAFKA_ADDRESS" ]; then
    echo "KAFKA_ADDRESS has not been specified.  It should be in format of <ip>:<port>"
    exit 1
fi
 
if [ -z "$KAFKA_TOPIC" ]; then
    echo "KAFKA_TOPIC not set.  Using the default as dataInput"
    KAFKA_TOPIC=dataInput
fi
 
if ! command -v jq &> /dev/null
then
    echo "jq could not be found. Please install jq and try again."
    exit 1
fi
 
if [ -z "$JAVA_HOME" ]; then
  JAVA="java"
else
  JAVA="$JAVA_HOME/bin/java"
fi
if ! command -v $JAVA &> /dev/null
then
    echo "java could not be found. Please ensure java is installed and JAVA_HOME is set properly."
    exit 1
fi
 
if [ -z "$KAFKA_HOME" ]; then
    echo "KAFKA_HOME is not set. Please set KAFKA_HOME to the location of the kafka binaries"
    exit 1
fi
 
jqCmd=( jq -s -R 'split("\n") | map(split(",")) | .[0] as $header | .[1:] | map( . as $o | reduce .[] as $item(  {};  ($o | index($item)) as $index |  ($header[$index]) as $field |  if $index==0 then .key = $item else .data[$field] = $item end  )  )' )
echo $("${jqCmd[@]}" "$CSV_FILE") | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list $KAFKA_ADDRESS --topic $KAFKA_TOPIC

Running the Script

The following command will push the data into Kafka: ./csvToKafka.sh /path/to/file.csv DomainManagerNode-ip:kafka-port external-dataTopic

For example: ./csvToKafka.sh /home/user/test.csv 10.161.128.175:9092 dataInput

CSV Format

The first row of the csv file should define the names of the properties that will be added, with the first column being the key that will be used to match the data.

For example:

instance,tenant,customerID
KC-1,tenant1,customerA
KC-2,tenant2,customerB

Corresponds to this external data which can be read by the enrichment stream.

[
  {
    "key": "KC-1",
    "data": {
      "tenant": "tenant1",
      "customerID": "customerA"
    }
  },
  {
    "key": "KC-2",
    "data": {
      "tenant": "tenant2",
      "customerID": "customerB"
    }
  }
]

All events matched with KC-1 will have "tenant": "tenant1" and "customerID": "customerA" added to their tags field and all events matched with KC-2 will have "tenant": "tenant2" and "customerID": "customerB" added to their tags field.

Requirements:

The following tools need to be installed in order for this example to work

  • jq - jQuery is a command line tool to help with reading and writing json.
    • Can be typically installed using: "yum install jq"
  • kafka binaries - We use the kafka console-producer to push data into kafka. Can be downloaded from https://kafka.apache.org/downloads. The KAFKA_HOME environment variable needs to be set pointing to where you unzipped the binaries.
    • wget https://downloads.apache.org/kafka/2.7.0/kafka_2.12-2.7.0.tgz
    • tar -xvf kafka_2.12-2.7.0.tgz
    • export KAFKA_HOME=/path/to/kafka_2.12-2.7.0
  • java - Kafka depends on Java. JAVA_HOME needs to be specified on the path.
    • Install with a binary from Oracle, or use openjdk: "yum install openjdk8"