In this example, you load data from a Kafka topic named topic_for_gpkafka
into a Greenplum Database table named data_from_kafka
. You perform the load as the Greenplum role gpadmin
. The table data_from_kafka
resides in the public
schema in a Greenplum database named testdb
.
A producer of the Kafka topic_for_gpkafka
topic emits customer expense messages in CSV format that include the customer identifier (integer), the month (integer), and an expense amount (decimal). For example, a message for a customer with identifier 123 who spent $456.78 in the month of September follows:
"123","09","456.78"
You will run a Kafka console producer to emit customer expense messages, and use the Greenplum Streaming Server gpkafka load
command to transform and load the data into the data_from_kafka
table and verify the load operation.
Before you start this procedure, ensure that you:
This procedure assumes that you have installed the Apache Kafka distribution. If you are using a different Kafka distribution, you may need to adjust certain commands in the procedure.
Log in to a host in your Kafka cluster. For example:
$ ssh kafkauser@kafkahost
kafkahost$
Create a Kafka topic named topic_for_gpkafka
. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-topics.sh --create \
--zookeeper localhost:2181 --replication-factor 1 --partitions 1 \
--topic topic_for_gpkafka
Open a file named sample_data.csv
in the editor of your choice. For example:
kafkahost$ vi sample_data.csv
Copy/paste the following text to add CSV-format data into the file, and then save and exit:
"1313131","12","1313.13"
"3535353","11","761.35"
"7979797","10","4489.00"
"7979797","11","18.72"
"3535353","10","6001.94"
"7979797","12","173.18"
"1313131","10","492.83"
"3535353","12","81.12"
"1313131","11","368.27"
Stream the contents of the sample_data.csv
file to a Kafka console producer. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic topic_for_gpkafka < sample_data.csv
Verify that the Kafka console producer published the messages to the topic by running a Kafka console consumer. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 --topic topic_for_gpkafka \
--from-beginning
Open a new terminal window, log in to the Greenplum Database master host as the gpadmin
administrative user, and set up the Greenplum environment. For example:
$ ssh gpadmin@gpmaster
gpmaster$ . /usr/local/greenplum-db/greenplum_path.sh
Construct the load configuration file. Open a file named firstload_cfg.yaml
in the editor of your choice. For example:
gpmaster$ vi firstload_cfg.yaml
Fill in the load configuration parameter values based on your environment. This example assumes:
gpmaster
.localhost:9092
.data_from_kafka
located in the public
schema of a database named testdb
.firstload_cfg.yaml
file would include the following contents:DATABASE: testdb
USER: gpadmin
HOST: gpmaster
PORT: 5432
KAFKA:
INPUT:
SOURCE:
BROKERS: localhost:9092
TOPIC: topic_for_gpkafka
COLUMNS:
- NAME: cust_id
TYPE: int
- NAME: __IGNORED__
TYPE: int
- NAME: expenses
TYPE: decimal(9,2)
FORMAT: csv
ERROR_LIMIT: 125
OUTPUT:
TABLE: data_from_kafka
MAPPING:
- NAME: customer_id
EXPRESSION: cust_id
- NAME: expenses
EXPRESSION: expenses
- NAME: tax_due
EXPRESSION: expenses * .0725
COMMIT:
MINIMAL_INTERVAL: 2000
Create the target Greenplum Database table named data_from_kafka
. For example:
gpmaster$ psql -d testdb
testdb=# CREATE TABLE data_from_kafka( customer_id int8, expenses decimal(9,2),
tax_due decimal(7,2) );
Exit the psql
subsystem:
testdb=# \q
Run the gpkafka load
command to batch load the CSV data published to the topic_for_gpkafka
topic into the Greenplum table. For example:
gpmaster$ gpkafka load --quit-at-eof ./firstload_cfg.yaml
The command exits after it reads all data published to the topic.
Examine the command output, looking for messages identifying the number of rows inserted/rejected. For example:
... -[INFO]:- ... Inserted 9 rows
... -[INFO]:- ... Rejected 0 rows
Run the gpkafka load
command again, this time in streaming mode. For example:
gpmaster$ gpkafka load ./firstload_cfg.yaml
The command waits for a producer to publish new messages to the topic.
Navigate back to your Kafka host terminal window. Stream the contents of the sample_data.csv
file to the Kafka console producer once more:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic topic_for_gpkafka < sample_data.csv
Notice the activity in your Greenplum Database master terminal window. gpkafka load
consumes the new round of messages and waits.
Interrupt and exit the waiting gpkafka load
command by entering Control-c in the Greenplum Database master host terminal window.
View the contents of the Greenplum Database target table data_from_kafka
:
gpmaster$ psql -d testdb
testdb=# SELECT * FROM data_from_kafka WHERE customer_id='1313131'
ORDER BY expenses;
customer_id | expenses | tax_due
-------------+----------+---------
1313131 | 368.27 | 26.70
1313131 | 368.27 | 26.70
1313131 | 492.83 | 35.73
1313131 | 492.83 | 35.73
1313131 | 1313.13 | 95.20
1313131 | 1313.13 | 95.20
(6 rows)
The table contains two entries for each expense because the producer published the sample_data.csv
file twice.