There are many ways to push external data into Kafka. This example shows you how you can push a csv file to Kafka in the correct format.
The Script
#!/bin/bash # # Copyright © 2020 VMware, Inc. All rights reserved. CSV_FILE=$1 KAFKA_ADDRESS=$2 KAFKA_TOPIC=$3 if [ ! -f "$CSV_FILE" ]; then echo "The specified csv file does not exist" exit 1 fi if [ -z "$KAFKA_ADDRESS" ]; then echo "KAFKA_ADDRESS has not been specified. It should be in format of <ip>:<port>" exit 1 fi if [ -z "$KAFKA_TOPIC" ]; then echo "KAFKA_TOPIC not set. Using the default as dataInput" KAFKA_TOPIC=dataInput fi if ! command -v jq &> /dev/null then echo "jq could not be found. Please install jq and try again." exit 1 fi if [ -z "$JAVA_HOME" ]; then JAVA="java" else JAVA="$JAVA_HOME/bin/java" fi if ! command -v $JAVA &> /dev/null then echo "java could not be found. Please ensure java is installed and JAVA_HOME is set properly." exit 1 fi if [ -z "$KAFKA_HOME" ]; then echo "KAFKA_HOME is not set. Please set KAFKA_HOME to the location of the kafka binaries" exit 1 fi jqCmd=( jq -s -R 'split("\n") | map(split(",")) | .[0] as $header | .[1:] | map( . as $o | reduce .[] as $item( {}; ($o | index($item)) as $index | ($header[$index]) as $field | if $index==0 then .key = $item else .data[$field] = $item end ) )' ) echo $("${jqCmd[@]}" "$CSV_FILE") | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list $KAFKA_ADDRESS --topic $KAFKA_TOPIC
Running the Script
The following command will push the data into Kafka: ./csvToKafka.sh /path/to/file.csv DomainManagerNode-ip:kafka-port external-dataTopic
For example: ./csvToKafka.sh /home/user/test.csv 10.161.128.175:9092
dataInput
CSV Format
The first row of the csv file should define the names of the properties that will be added, with the first column being the key that will be used to match the data.
For example:
instance,tenant,customerID KC-1,tenant1,customerA KC-2,tenant2,customerB
Corresponds to this external data which can be read by the Enrichment stream.
[ { "key": "KC-1", "data": { "tenant": "tenant1", "customerID": "customerA" } }, { "key": "KC-2", "data": { "tenant": "tenant2", "customerID": "customerB" } } ]
All events matched with KC-1 will have "tenant": "tenant1" and "customerID": "customerA" added to their tags field and all events matched with KC-2 will have "tenant": "tenant2" and "customerID": "customerB" added to their tags field.
Requirements:
The following tools need to be installed in order for this example to work
- jq - jQuery is a command line tool to help with reading and writing json.
- Can be typically installed using: "yum install jq"
- kafka binaries - We use the kafka console-producer to push data into kafka. Can be downloaded from https://kafka.apache.org/downloads. The KAFKA_HOME environment variable needs to be set pointing to where you unzipped the binaries.
- wget https://downloads.apache.org/kafka/2.7.0/kafka_2.12-2.7.0.tgz
- tar -xvf kafka_2.12-2.7.0.tgz
- export KAFKA_HOME=/path/to/kafka_2.12-2.7.0
- java - Kafka depends on Java. JAVA_HOME needs to be specified on the path.
- Install with a binary from Oracle, or use openjdk: "yum install openjdk8"