In this example, you load JSON format data from a Kafka topic named topic_json_gpkafka
into a Greenplum Database table named json_from_kafka
. You perform the load as the Greenplum role gpadmin
. The table json_from_kafka
resides in the public
schema in a Greenplum database named testdb
.
A producer of the Kafka topic_json_gpkafka
topic emits customer expense messages in JSON format that include the customer identifier (integer), the month (integer), and an expense amount (decimal). For example, a message for a customer with identifier 123 who spent $456.78 in the month of September follows:
{ "cust_id": 123, "month": 9, "amount_paid":456.78 }
You will run a Kafka console producer to emit JSON-format customer expense messages, and use the Greenplum Streaming Server gpkafka load
command to transform and load the data into the json_from_kafka
table.
Before you start this procedure, ensure that you:
This procedure assumes that you have installed the Apache Kafka distribution. If you are using a different Kafka distribution, you may need to adjust certain commands in the procedure.
Login to a host in your Kafka cluster. For example:
$ ssh kafkauser@kafkahost
kafkahost$
Create a Kafka topic named topic_json_gpkafka
. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-topics.sh --create \
--zookeeper localhost:2181 --replication-factor 1 --partitions 1 \
--topic topic_json_gpkafka
Open a file named sample_data.json
in the editor of your choice. For example:
kafkahost$ vi sample_data.json
Copy/paste the following text to add JSON-format data into the file, and then save and exit:
{ "cust_id": 1313131, "month": 12, "expenses": 1313.13 }
{ "cust_id": 3535353, "month": 11, "expenses": 761.35 }
{ "cust_id": 7979797, "month": 10, "expenses": 4489.00 }
{ "cust_id": 7979797, "month": 11, "expenses": 18.72 }
{ "cust_id": 3535353, "month": 10, "expenses": 6001.94 }
{ "cust_id": 7979797, "month": 12, "expenses": 173.18 }
{ "cust_id": 1313131, "month": 10, "expenses": 492.83 }
{ "cust_id": 3535353, "month": 12, "expenses": 81.12 }
{ "cust_id": 1313131, "month": 11, "expenses": 368.27 }
Stream the contents of the sample_data.json
file to a Kafka console producer. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic topic_json_gpkafka < sample_data.json
Verify that the Kafka console producer published the messages to the topic by running a Kafka console consumer. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 --topic topic_json_gpkafka \
--from-beginning
Open a new terminal window, log in to the Greenplum Database master host as the gpadmin
administrative user, and set up the Greenplum environment. For example:
$ ssh gpadmin@gpmaster
gpmaster$ . /usr/local/greenplum-db/greenplum_path.sh
Construct the load configuration file. Open a file named jsonload_cfg.yaml
in the editor of your choice. For example:
gpmaster$ vi jsonload_cfg.yaml
Fill in the load configuration parameter values based on your environment. This example assumes:
gpmaster
.localhost:9092
.json_from_kafka
located in the public
schema of a database named testdb
.jsonload_cfg.yaml
file would include the following contents:DATABASE: testdb
USER: gpadmin
HOST: gpmaster
PORT: 5432
KAFKA:
INPUT:
SOURCE:
BROKERS: localhost:9092
TOPIC: topic_json_gpkafka
COLUMNS:
- NAME: jdata
TYPE: json
FORMAT: json
ERROR_LIMIT: 10
OUTPUT:
TABLE: json_from_kafka
MAPPING:
- NAME: customer_id
EXPRESSION: (jdata->>'cust_id')::int
- NAME: month
EXPRESSION: (jdata->>'month')::int
- NAME: amount_paid
EXPRESSION: (jdata->>'expenses')::decimal
COMMIT:
MINIMAL_INTERVAL: 2000
Create the target Greenplum Database table named json_from_kafka
. For example:
gpmaster$ psql -d testdb
testdb=# CREATE TABLE json_from_kafka( customer_id int8, month int4, amount_paid decimal(9,2) );
Exit the psql
subsystem:
testdb=# \q
Run the gpkafka load
command to batch load the JSON data published to the topic_json_gpkafka
topic into the Greenplum table. For example:
gpmaster$ gpkafka load --quit-at-eof ./jsonload_cfg.yaml
The command exits after it reads all data published to the topic.
Examine the command output, looking for messages that identify the number of rows inserted/rejected. For example:
... -[INFO]:- ... Inserted 9 rows
... -[INFO]:- ... Rejected 0 rows
View the contents of the Greenplum Database target table json_from_kafka
:
gpmaster$ psql -d testdb
testdb=# SELECT * FROM json_from_kafka WHERE customer_id='1313131'
ORDER BY amount_paid;
customer_id | month | amount_paid
-------------+-------+-------------
1313131 | 11 | 368.27
1313131 | 10 | 492.83
1313131 | 12 | 1313.13
(3 rows)