NoteThis example uses the Greenplum Streaming Server client utility,
gpsscli
, rather than thegpkafka
utility, to load JSON-format data from Kafka into Greenplum Database.
In this example, you load JSON format data from a Kafka topic named topic_json_gpkafka
into a Greenplum Database table named json_from_kafka
. You perform the load as the Greenplum role gpadmin
. The table json_from_kafka
resides in the public
schema in a Greenplum database named testdb
.
A producer of the Kafka topic_json_gpkafka
topic emits customer expense messages in JSON format that include the customer identifier (integer), the month (integer), and an expense amount (decimal). For example, a message for a customer with identifier 123 who spent $456.78 in the month of September follows:
{ "cust_id": 123, "month": 9, "amount_paid":456.78 }
You will run a Kafka console producer to emit JSON-format customer expense messages, start a Greenplum Streaming Server instance, and use the GPSS gpsscli
subcommands to load the data into the json_from_kafka
table.
Before you start this procedure, ensure that you:
This procedure assumes that you have installed the Apache Kafka distribution. If you are using a different Kafka distribution, you may need to adjust certain commands in the procedure.
Login to a host in your Kafka cluster. For example:
$ ssh kafkauser@kafkahost
kafkahost$
Create a Kafka topic named topic_json_gpkafka
. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-topics.sh --create \
--zookeeper localhost:2181 --replication-factor 1 --partitions 1 \
--topic topic_json_gpkafka
Open a file named sample_data.json
in the editor of your choice. For example:
kafkahost$ vi sample_data.json
Copy/paste the following text to add JSON-format data into the file, and then save and exit:
{ "cust_id": 1313131, "month": 12, "expenses": 1313.13 }
{ "cust_id": 3535353, "month": 11, "expenses": 761.35 }
{ "cust_id": 7979797, "month": 10, "expenses": 4489.00 }
{ "cust_id": 7979797, "month": 11, "expenses": 18.72 }
{ "cust_id": 3535353, "month": 10, "expenses": 6001.94 }
{ "cust_id": 7979797, "month": 12, "expenses": 173.18 }
{ "cust_id": 1313131, "month": 10, "expenses": 492.83 }
{ "cust_id": 3535353, "month": 12, "expenses": 81.12 }
{ "cust_id": 1313131, "month": 11, "expenses": 368.27 }
Stream the contents of the sample_data.json
file to a Kafka console producer. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic topic_json_gpkafka < sample_data.json
Verify that the Kafka console producer published the messages to the topic by running a Kafka console consumer. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 --topic topic_json_gpkafka \
--from-beginning
Open a new terminal window, log in to the Greenplum Database master host as the gpadmin
administrative user, and set up the Greenplum environment. For example:
$ ssh gpadmin@gpmaster
gpmaster$ . /usr/local/greenplum-db/greenplum_path.sh
Construct the Greenplum Streaming Server configuration file. For example, open a file named gpsscfg_ex.json
in the editor of your choice:
gpmaster$ vi gpsscfg_ex.json
Designate a GPSS listen port number of 5019 and a gpfdist
port number of 8319 in the configuration file. For example, copy/paste the following into the gpsscfg_ex.json
file, and then save and exit the editor:
{
"ListenAddress": {
"Host": "",
"Port": 5019
},
"Gpfdist": {
"Host": "",
"Port": 8319
}
}
Start the Greenplum Streaming Server instance in the background, specifying the log directory ./gpsslogs
. For example:
gpmaster$ gpss --config gpsscfg_ex.json --log-dir ./gpsslogs &
Construct the load configuration file. Open a file named jsonload_cfg.yaml
in the editor of your choice. For example:
gpmaster$ vi jsonload_cfg.yaml
Fill in the load configuration parameter values based on your environment. This example assumes:
gpmaster
.localhost:9092
.json_from_kafka
located in the public
schema of a database named testdb
.jsonload_cfg.yaml
file would include the following contents:DATABASE: testdb
USER: gpadmin
HOST: gpmaster
PORT: 5432
KAFKA:
INPUT:
SOURCE:
BROKERS: localhost:9092
TOPIC: topic_json_gpkafka
COLUMNS:
- NAME: jdata
TYPE: json
FORMAT: json
ERROR_LIMIT: 10
OUTPUT:
TABLE: json_from_kafka
MAPPING:
- NAME: customer_id
EXPRESSION: (jdata->>'cust_id')::int
- NAME: month
EXPRESSION: (jdata->>'month')::int
- NAME: amount_paid
EXPRESSION: (jdata->>'expenses')::decimal
COMMIT:
MINIMAL_INTERVAL: 2000
Create the target Greenplum Database table named json_from_kafka
. For example:
gpmaster$ psql -d testdb
testdb=# CREATE TABLE json_from_kafka( customer_id int8, month int4, amount_paid decimal(9,2) );
Exit the psql
subsystem:
testdb=# \q
Submit the Kafka data load job to the GPSS instance running on port number 5019. (You may consider opening a new terminal window to run the command.) For example to submit a job named kafkajson2gp
:
gpmaster$ gpsscli submit --name kafkajson2gp --gpss-port 5019 ./jsonload_cfg.yaml
20200804 12:54:19.25262,116652,info,JobID: d577cf37890b5b6bf4e713a9586e86c9,JobName: kafkajson2gp
List all GPSS jobs. For example:
gpmaster$ gpsscli list --all --gpss-port 5019
JobName JobID GPHost GPPort DataBase Schema Table Topic Status
kafkajson2gp d577cf37890b5b6bf4e713a9586e86c9 localhost 5432 testdb public json_from_kafka topic_json_gpkafka JOB_SUBMITTED
The list
subcommand displays all jobs. Notice the entry for the kafkajson2gp
that you just submitted, and that the job is in the Submitted state.
Start the job named kafkajson2gp
. For example:
gpmaster$ gpsscli start kafkajson2gp --gpss-port 5019
20200804 12:57:57.35153,117918,info,Job kafkajson2gp is started
Stop the job named kafkajson2gp
. For example:
gpmaster$ gpsscli stop kafkajson2gp --gpss-port 5019
20200804 13:05:09.24280,117506,info,stop job: kafkajson2gp success
Examine the gpss
command output and log file, looking for messages that identify the number of rows inserted/rejected. For example:
... -[INFO]:- ... Inserted 9 rows
... -[INFO]:- ... Rejected 0 rows
View the contents of the Greenplum Database target table json_from_kafka
:
gpmaster$ psql -d testdb
testdb=# SELECT * FROM json_from_kafka WHERE customer_id='1313131'
ORDER BY amount_paid;
customer_id | month | amount_paid
-------------+-------+-------------
1313131 | 11 | 368.27
1313131 | 10 | 492.83
1313131 | 12 | 1313.13
(3 rows)