In this example, you load Avro-format key and value data as JSON from a Kafka topic named topic_avrokv
into a VMware Tanzu Greenplum table named avrokv_from_kafka
. You perform the load as the Greenplum role gpadmin
. The table avrokv_from_kafka
resides in the public
schema in a Tanzu Greenplum named testdb
.
A producer of the Kafka topic_avrokv
topic emits customer expense messages in JSON format that include the customer identifier (integer), the year (integer), and one or more expense amounts (decimal). For example, a message with key 1 for a customer with identifier 123 who spent $456.78 and $67.89 in the year 1997 follows:
1 { "cust_id": 123, "year": 1997, "expenses":[456.78, 67.89] }
You will use the Confluent Schema Registry and run a Kafka Avro console producer to emit keys and Avro JSON-format customer expense messages, and use the VMware Tanzu Greenplum streaming server gpkafka load
command to load the data into the avrokv_from_kafka
table.
Before you start this procedure, ensure that you:
This procedure assumes that you have installed the Confluent Kafka distribution.
Login to a host in your Kafka cluster. For example:
$ ssh kafkauser@kafkahost
kafkahost$
Create a Kafka topic named topic_json_gpkafka
. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-topics --create \
--zookeeper localhost:2181 --replication-factor 1 --partitions 1 \
--topic topic_avrokv
Start a Kafka Avro console producer. You will manually input message data to this producer. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-avro-console-producer \
--broker-list localhost:9092 \
--topic topic_avrokv \
--property parse.key=true --property key.schema='{"type" : "int", "name" : "id"}' \
--property value.schema='{ "type" : "record", "name" : "example_schema", "namespace" : "com.example", "fields" : [ { "name" : "cust_id", "type" : "int", "doc" : "Id of the customer account" }, { "name" : "year", "type" : "int", "doc" : "year of expense" }, { "name" : "expenses", "type" : {"type": "array", "items": "float"}, "doc" : "Expenses for the year" } ], "doc:" : "A basic schema for storing messages" }'
The producer waits for messages.
Input the following messages to the Avro console producer.
NoteYou must enter a tab between the key and value. Replace
TAB
with a tab.
1 TAB {"cust_id":1313131, "year":2012, "expenses":[1313.13, 2424.24]}
2 TAB {"cust_id":3535353, "year":2011, "expenses":[761.35, 92.18, 14.41]}
3 TAB {"cust_id":7979797, "year":2011, "expenses":[4489.00]}
Verify that the Kafka Avro console producer published the messages to the topic by running a Kafka Avro console consumer. Specify the print.key
property to have the consumer display the Kafka key. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-avro-console-consumer \
--bootstrap-server localhost:9092 --topic topic_avrokv \
--from-beginning --property print.key=true
Open a new terminal window, log in to the Tanzu Greenplum coordinator host as the gpadmin
administrative user, and set up the Greenplum environment. For example:
$ ssh gpadmin@gpcoord
gpcoord$ . /usr/local/greenplum-db/greenplum_path.sh
Construct the load configuration file. Open a file named avrokvload_cfg.yaml
in the editor of your choice. For example:
gpcoord$ vi avrokvload_cfg.yaml
Fill in the load configuration parameter values based on your environment. This example assumes:
gpcoord
.localhost:9092
.http://localhost:8081
. The avrokvload_cfg.yaml
file might include the following contents:DATABASE: testdb
USER: gpadmin
HOST: gpcoord
PORT: 5432
VERSION: 2
KAFKA:
INPUT:
SOURCE:
BROKERS: localhost:9092
TOPIC: topic_avrokv
VALUE:
COLUMNS:
- NAME: c1
TYPE: json
FORMAT: avro
AVRO_OPTION:
SCHEMA_REGISTRY_ADDR: http://localhost:8081
KEY:
COLUMNS:
- NAME: id
TYPE: json
FORMAT: avro
AVRO_OPTION:
SCHEMA_REGISTRY_ADDR: http://localhost:8081
ERROR_LIMIT: 0
OUTPUT:
TABLE: avrokv_from_kafka
MAPPING:
- NAME: id
EXPRESSION: id
- NAME: customer_id
EXPRESSION: (c1->>'cust_id')::int
- NAME: year
EXPRESSION: (c1->>'year')::int
- NAME: expenses
EXPRESSION: array(select json_array_elements(c1->'expenses')::text::float)
COMMIT:
MINIMAL_INTERVAL: 2000
The mapping in this configuration assigns each message value field to a separate column and ignores the message key.
Create the target Tanzu Greenplum table named avrokv_from_kafka
. For example:
gpcoord$ psql -d testdb
testdb=# CREATE TABLE avrokv_from_kafka( id json, customer_id int, year int, expenses decimal(9,2)[] );
Exit the psql
subsystem:
testdb=# \q
Run the gpkafka load
command to batch load the JSON data published to the topic_json_gpkafka
topic into the Greenplum table. For example:
gpcoord$ gpkafka load --quit-at-eof ./avrokvload_cfg.yaml
The command exits after it reads all data published to the topic.
Examine the command output, looking for messages that identify the number of rows inserted/rejected. For example:
... -[INFO]:- ... Inserted 3 rows
... -[INFO]:- ... Rejected 0 rows
View the contents of the Tanzu Greenplum target table avrokv_from_kafka
:
gpcoord$ psql -d testdb
testdb=# SELECT * FROM avrokv_from_kafka ORDER BY customer_id;
id | customer_id | year | expenses
----+-------------+------+----------------------
1 | 1313131 | 2012 | {1313.13,2424.24}
2 | 3535353 | 2011 | {761.35,92.18,14.41}
3 | 7979797 | 2011 | {4489.00}
(3 rows)