In this example, you load JSON format data from a Kafka topic named topic_json
into a single column Greenplum Database table named single_json_column
. You perform the load as the Greenplum role gpadmin
. The table single_json_column
resides in the public
schema in a Greenplum database named testdb
.
A producer of the Kafka topic_json
topic emits customer expense messages in JSON format that include the customer identifier (integer), the month (integer), and an expense amount (decimal). For example, a message for a customer with identifier 123 who spent $456.78 in the month of September follows:
{ "cust_id": 123, "month": 9, "amount_paid":456.78 }
You will run a Kafka console producer to emit JSON-format customer expense messages, and use the Greenplum Streaming Server gpkafka load
command to load each Kafka message into a row in the single_json_column
table.
Before you start this procedure, ensure that you:
This procedure assumes that you have installed the Apache Kafka distribution. If you are using a different Kafka distribution, you may need to adjust certain commands in the procedure.
Login to a host in your Kafka cluster. For example:
$ ssh kafkauser@kafkahost
kafkahost$
Create a Kafka topic named topic_json
. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-topics.sh --create \
--zookeeper localhost:2181 --replication-factor 1 --partitions 1 \
--topic topic_json
Open a file named sample_data.json
in the editor of your choice. For example:
kafkahost$ vi sample_data.json
Copy/paste the following text to add JSON-format data into the file, and then save and exit:
{ "cust_id": 1313131, "month": 12, "expenses": 1313.13 }
{ "cust_id": 3535353, "month": 11, "expenses": 761.35 }
{ "cust_id": 7979797, "month": 10, "expenses": 4489.00 }
{ "cust_id": 7979797, "month": 11, "expenses": 18.72 }
{ "cust_id": 3535353, "month": 10, "expenses": 6001.94 }
{ "cust_id": 7979797, "month": 12, "expenses": 173.18 }
{ "cust_id": 1313131, "month": 10, "expenses": 492.83 }
{ "cust_id": 3535353, "month": 12, "expenses": 81.12 }
{ "cust_id": 1313131, "month": 11, "expenses": 368.27 }
Stream the contents of the sample_data.json
file to a Kafka console producer. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic topic_json < sample_data.json
Verify that the Kafka console producer published the messages to the topic by running a Kafka console consumer. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 --topic topic_json \
--from-beginning
Open a new terminal window, log in to the Greenplum Database master host as the gpadmin
administrative user, and set up the Greenplum environment. For example:
$ ssh gpadmin@gpmaster
gpmaster$ . /usr/local/greenplum-db/greenplum_path.sh
Construct the load configuration file. Open a file named simple_jsonload_cfg.yaml
in the editor of your choice. For example:
gpmaster$ vi simple_jsonload_cfg.yaml
Fill in the load configuration parameter values based on your environment. This example assumes:
gpmaster
.localhost:9092
.single_json_column
located in the public
schema of a database named testdb
.json
type column. The simple_jsonload_cfg.yaml
file would include the following contents:DATABASE: testdb
USER: gpadmin
HOST: gpmaster
PORT: 5432
KAFKA:
INPUT:
SOURCE:
BROKERS: localhost:9092
TOPIC: topic_json
FORMAT: json
ERROR_LIMIT: 10
OUTPUT:
TABLE: single_json_column
COMMIT:
MINIMAL_INTERVAL: 1000
Create the target Greenplum Database table named single_json_column
. For example:
gpmaster$ psql -d testdb
testdb=# CREATE TABLE single_json_column( value json );
Exit the psql
subsystem:
testdb=# \q
Run the gpkafka load
command to batch load the JSON data published to the topic_json
topic into the Greenplum table. For example:
gpmaster$ gpkafka load --quit-at-eof ./simple_jsonload_cfg.yaml
The command exits after it reads all data published to the topic.
Examine the command output, looking for messages that identify the number of rows inserted/rejected. For example:
... -[INFO]:- ... Inserted 9 rows
... -[INFO]:- ... Rejected 0 rows
View the contents of the Greenplum Database target table single_json_column
:
gpmaster$ psql -d testdb
testdb=# SELECT * FROM single_json_column;
value
----------------------------------------------------------
{ "cust_id": 7979797, "month": 10, "expenses": 4489.00 }
{ "cust_id": 7979797, "month": 11, "expenses": 18.72 }
{ "cust_id": 3535353, "month": 12, "expenses": 81.12 }
{ "cust_id": 3535353, "month": 11, "expenses": 761.35 }
{ "cust_id": 1313131, "month": 12, "expenses": 1313.13 }
{ "cust_id": 3535353, "month": 10, "expenses": 6001.94 }
{ "cust_id": 7979797, "month": 12, "expenses": 173.18 }
{ "cust_id": 1313131, "month": 10, "expenses": 492.83 }
{ "cust_id": 1313131, "month": 11, "expenses": 368.27 }
(9 rows)
Use json
operators to view the expenses associated with a specific customer:
testdb=# SELECT (value->>'expenses')::decimal AS expenses FROM single_json_column
WHERE (value->>'cust_id')::int = 1313131;
expenses
----------
1313.13
492.83
368.27
(3 rows)