In this example, you merge data from a Kafka topic named customer_orders
into a Tanzu Greenplum table named customer_orders_tbl
. You perform the operation as the Greenplum role gpadmin
. The table customer_orders_tbl
resides in the public
schema in a Tanzu Greenplum named testdb
.
A producer of the Kafka customer_orders
topic emits customer order messages in CSV format that include the customer identifier (integer) and an order amount (decimal). For example, a message for a customer with identifier 123 who spent $456.78 follows:
"123","456.78"
You will run a Kafka console producer to emit customer order messages, start a VMware Tanzu Greenplum streaming server instance, and use the GPSS gpsscli
subcommands to merge and load the data into the customer_orders_tbl
Greenplum table. This table has pre-existing data that the merge will overwrite.
Before you start this procedure, ensure that you:
This procedure assumes that you have installed the Apache Kafka distribution. If you are using a different Kafka distribution, you may need to adjust certain commands in the procedure.
Log in to a host in your Kafka cluster. For example:
$ ssh kafkauser@kafkahost
kafkahost$
Create a Kafka topic named customer_orders
. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-topics.sh --create \
--zookeeper localhost:2181 --replication-factor 1 --partitions 1 \
--topic customer_orders
Open a file named sample_customer_data.csv
in the editor of your choice. For example:
kafkahost$ vi sample_customer_data.csv
Copy/paste the following text to add CSV-format data into the file, and then save and exit:
"1313131","1000.00"
"4444444","99.13"
"1515151","500.05"
"6666666","1.12"
"1717171","3000.03"
Stream the contents of the sample_customer_data.csv
file to a Kafka console producer. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic customer_orders < sample_customer_data.csv
Run the Kafka console consumer to verify that the Kafka console producer published the messages to the topic. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 --topic customer_orders \
--from-beginning
Open a new terminal window, log in to the Tanzu Greenplum coordinator host as the gpadmin
administrative user, and set up the Greenplum environment. For example:
$ ssh gpadmin@gpcoord
gpcoord$ . /usr/local/greenplum-db/greenplum_path.sh
Construct the the Tanzu Greenplum streaming server configuration file. For example, open a file named gpsscfg_ex.json
in the editor of your choice:
gpcoord$ vi gpsscfg_ex.json
Designate a GPSS listen port number of 5019 and a gpfdist
port number of 8319 in the configuration file. For example, copy/paste the following into the gpsscfg_ex.json
file, and then save and exit the editor:
{
"ListenAddress": {
"Host": "",
"Port": 5019
},
"Gpfdist": {
"Host": "",
"Port": 8319
}
}
Start the Tanzu Greenplum streaming server instance in the background, specifying the log directory ./gpsslogs
. For example:
gpcoord$ gpss --config gpsscfg_ex.json --log-dir ./gpsslogs &
Construct the gpkafka
load configuration file. Open a file named custorders_cfg.yaml
in the editor of your choice. For example:
gpcoord$ vi custorders_cfg.yaml
Fill in the load configuration parameter values based on your environment. This example assumes:
gpcoord
.localhost:9092
.customer_orders_tbl
located in the public
schema of a database named testdb
.custorders_cfg.yaml
file would include the following contents:DATABASE: testdb
USER: gpadmin
HOST: gpcoord
PORT: 5432
KAFKA:
INPUT:
SOURCE:
BROKERS: localhost:9092
TOPIC: customer_orders
COLUMNS:
- NAME: id
TYPE: int
- NAME: order_amount
TYPE: decimal(9,2)
FORMAT: csv
ERROR_LIMIT: 25
OUTPUT:
TABLE: customer_orders_tbl
MODE: MERGE
MATCH_COLUMNS:
- id
UPDATE_COLUMNS:
- amount
MAPPING:
- NAME: id
EXPRESSION: id
- NAME: amount
EXPRESSION: order_amount
COMMIT:
MINIMAL_INTERVAL: 2000
Start the psql
subsystem:
gpcoord$ psql -d testdb
testdb=#
Create the target Tanzu Greenplum table named customer_orders_tbl
, and insert two rows of data in the table. For example:
CREATE TABLE customer_orders_tbl( id int8, amount decimal(9,2) );
INSERT INTO customer_orders_tbl VALUES (1717171, 17.17);
INSERT INTO customer_orders_tbl VALUES (1515151, 15.15);
Exit the psql
subsystem:
testdb=# \q
Submit the Kafka data load job to the GPSS instance running on port number 5019. (You may consider opening a new terminal window to run the command.) For example to submit a job named orders1
:
gpcoord$ gpsscli submit --name orders1 --gpss-port 5019 ./custorders_cfg.yaml
20200804 12:54:19.25262,116652,info,JobID: d577cf37890b5b6bf4e713a9586e86c9,JobName: orders1
List all GPSS jobs. For example:
gpcoord$ gpsscli list --all --gpss-port 5019
JobName JobID GPHost GPPort DataBase Schema Table Topic Status
orders1 d577cf37890b5b6bf4e713a9586e86c9 localhost 5432 testdb public customer_orders_tbl customer_orders JOB_SUBMITTED
The list
subcommand displays all jobs. Notice the entry for the orders1
job that you just submitted, and that the job is in the Submitted state.
Start the job named orders1
. For example:
gpcoord$ gpsscli start orders1 --gpss-port 5019
20200804 12:57:57.35153,117918,info,Job orders1 is started
Stop the job named orders1
. For example:
gpcoord$ gpsscli stop orders1 --gpss-port 5019
20200804 13:05:09.24280,117506,info,stop job: orders1 success
Examine the gpss
command output and log file, looking for messages that identify the number of rows inserted/rejected. For example:
... -[INFO]:- ... Inserted 5 rows
... -[INFO]:- ... Rejected 0 rows
View the contents of the Tanzu Greenplum target table customer_orders_tbl
:
gpcoord$ psql -d testdb
SELECT * FROM customer_orders_tbl ORDER BY id;
id | amount
---------+---------
1313131 | 1000.00
1515151 | 500.05
1717171 | 3000.03
4444444 | 99.13
6666666 | 1.12
(5 rows)
Notice that the amount
value for customers with id
s 1515151
and 1717171
have been updated to the total_amount
read from the Kafka message.