In this example, you load JSON format data from a Kafka topic named topic_json_gpkafka into a Greenplum Database table named json_from_kafka. You perform the load as the Greenplum role gpadmin. The table json_from_kafka resides in the public schema in a Greenplum database named testdb.

A producer of the Kafka topic_json_gpkafka topic emits customer expense messages in JSON format that include the customer identifier (integer), the month (integer), and an expense amount (decimal). For example, a message for a customer with identifier 123 who spent $456.78 in the month of September follows:

{ "cust_id": 123, "month": 9, "amount_paid":456.78 }

You will run a Kafka console producer to emit JSON-format customer expense messages, and use the Greenplum Streaming Server gpkafka load command to transform and load the data into the json_from_kafka table.

Prerequisites

Before you start this procedure, ensure that you:

  • Have administrative access to running Kafka and Greenplum Database clusters.
  • Have configured connectivity as described in the loading Prerequisites.
  • Identify and note the ZooKeeper hostname and port.
  • Identify and note the hostname and port of the Kafka broker(s).
  • Identify and note the hostname and port of the Greenplum Database master node.

This procedure assumes that you have installed the Apache Kafka distribution. If you are using a different Kafka distribution, you may need to adjust certain commands in the procedure.

Procedure

  1. Login to a host in your Kafka cluster. For example:

    $ ssh kafkauser@kafkahost
    kafkahost$ 
    
  2. Create a Kafka topic named topic_json_gpkafka. For example:

    kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-topics.sh --create \
        --zookeeper localhost:2181 --replication-factor 1 --partitions 1 \
        --topic topic_json_gpkafka
    
  3. Open a file named sample_data.json in the editor of your choice. For example:

    kafkahost$ vi sample_data.json
    
  4. Copy/paste the following text to add JSON-format data into the file, and then save and exit:

    { "cust_id": 1313131, "month": 12, "expenses": 1313.13 }
    { "cust_id": 3535353, "month": 11, "expenses": 761.35 }
    { "cust_id": 7979797, "month": 10, "expenses": 4489.00 }
    { "cust_id": 7979797, "month": 11, "expenses": 18.72 }
    { "cust_id": 3535353, "month": 10, "expenses": 6001.94 }
    { "cust_id": 7979797, "month": 12, "expenses": 173.18 }
    { "cust_id": 1313131, "month": 10, "expenses": 492.83 }
    { "cust_id": 3535353, "month": 12, "expenses": 81.12 }
    { "cust_id": 1313131, "month": 11, "expenses": 368.27 }
    
  5. Stream the contents of the sample_data.json file to a Kafka console producer. For example:

    kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-producer.sh \
        --broker-list localhost:9092 \
        --topic topic_json_gpkafka < sample_data.json
    
  6. Verify that the Kafka console producer published the messages to the topic by running a Kafka console consumer. For example:

    kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-consumer.sh \
        --bootstrap-server localhost:9092 --topic topic_json_gpkafka \
        --from-beginning
    
  7. Open a new terminal window, log in to the Greenplum Database master host as the gpadmin administrative user, and set up the Greenplum environment. For example:

    $ ssh gpadmin@gpmaster
    gpmaster$ . /usr/local/greenplum-db/greenplum_path.sh
    
  8. Construct the load configuration file. Open a file named jsonload_cfg.yaml in the editor of your choice. For example:

    gpmaster$ vi jsonload_cfg.yaml
    
  9. Fill in the load configuration parameter values based on your environment. This example assumes:

    • Your Greenplum Database master hostname is gpmaster.
    • The Greenplum Database server is running on the default port.
    • Your Kafka broker host and port is localhost:9092.
    • You want to write the Kafka data to a Greenplum Database table named json_from_kafka located in the public schema of a database named testdb.
    • You want to write the customer identifier and expenses data to Greenplum. The jsonload_cfg.yaml file would include the following contents:
    DATABASE: testdb
    USER: gpadmin
    HOST: gpmaster
    PORT: 5432
    KAFKA:
       INPUT:
         SOURCE:
            BROKERS: localhost:9092
            TOPIC: topic_json_gpkafka
         COLUMNS:
            - NAME: jdata
              TYPE: json
         FORMAT: json
         ERROR_LIMIT: 10
       OUTPUT:
         TABLE: json_from_kafka
         MAPPING:
            - NAME: customer_id
              EXPRESSION: (jdata->>'cust_id')::int
            - NAME: month
              EXPRESSION: (jdata->>'month')::int
            - NAME: amount_paid
              EXPRESSION: (jdata->>'expenses')::decimal
       COMMIT:
         MINIMAL_INTERVAL: 2000
    
  10. Create the target Greenplum Database table named json_from_kafka. For example:

    gpmaster$ psql -d testdb
    
    testdb=# CREATE TABLE json_from_kafka( customer_id int8, month int4, amount_paid decimal(9,2) );
    
  11. Exit the psql subsystem:

    testdb=# \q
    
  12. Run the gpkafka load command to batch load the JSON data published to the topic_json_gpkafka topic into the Greenplum table. For example:

    gpmaster$ gpkafka load --quit-at-eof ./jsonload_cfg.yaml
    

    The command exits after it reads all data published to the topic.

  13. Examine the command output, looking for messages that identify the number of rows inserted/rejected. For example:

    ... -[INFO]:- ... Inserted 9 rows
    ... -[INFO]:- ... Rejected 0 rows
    
  14. View the contents of the Greenplum Database target table json_from_kafka:

    gpmaster$ psql -d testdb
    
    testdb=# SELECT * FROM json_from_kafka WHERE customer_id='1313131' 
               ORDER BY amount_paid;
     customer_id | month | amount_paid 
    -------------+-------+-------------
         1313131 |    11 |      368.27
         1313131 |    10 |      492.83
         1313131 |    12 |     1313.13
    (3 rows)
    
check-circle-line exclamation-circle-line close-line
Scroll to top icon