In this example, you load Avro-format key and value data as JSON from a Kafka topic named topic_avrokv into a VMware Tanzu Greenplum table named avrokv_from_kafka. You perform the load as the Greenplum role gpadmin. The table avrokv_from_kafka resides in the public schema in a Tanzu Greenplum named testdb.

A producer of the Kafka topic_avrokv topic emits customer expense messages in JSON format that include the customer identifier (integer), the year (integer), and one or more expense amounts (decimal). For example, a message with key 1 for a customer with identifier 123 who spent $456.78 and $67.89 in the year 1997 follows:

1	{ "cust_id": 123, "year": 1997, "expenses":[456.78, 67.89] }

You will use the Confluent Schema Registry and run a Kafka Avro console producer to emit keys and Avro JSON-format customer expense messages, and use the VMware Tanzu Greenplum streaming server gpkafka load command to load the data into the avrokv_from_kafka table.

Prerequisites

Before you start this procedure, ensure that you:

  • Have administrative access to running Confluent Kafka and Tanzu Greenplum clusters
  • Have configured connectivity as described in the loading Prerequisites.
  • Identify and note the ZooKeeper hostname and port.
  • Identify and note the address of the Confluent Schema Registry server(s).
  • Identify and note the hostname and port of the Kafka broker(s).
  • Identify and note the hostname and port of the Tanzu Greenplum coordinator node.

This procedure assumes that you have installed the Confluent Kafka distribution.

Procedure

  1. Login to a host in your Kafka cluster. For example:

    $ ssh kafkauser@kafkahost
    kafkahost$ 
    
  2. Create a Kafka topic named topic_json_gpkafka. For example:

    kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-topics --create \
        --zookeeper localhost:2181 --replication-factor 1 --partitions 1 \
        --topic topic_avrokv
    
  3. Start a Kafka Avro console producer. You will manually input message data to this producer. For example:

    kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-avro-console-producer \
        --broker-list localhost:9092 \
        --topic topic_avrokv \
        --property parse.key=true --property key.schema='{"type" : "int", "name" : "id"}' \
        --property value.schema='{ "type" : "record", "name" : "example_schema", "namespace" : "com.example", "fields" : [ { "name" : "cust_id", "type" : "int", "doc" : "Id of the customer account" }, { "name" : "year", "type" : "int", "doc" : "year of expense" }, { "name" : "expenses", "type" : {"type": "array", "items": "float"}, "doc" : "Expenses for the year" } ], "doc:" : "A basic schema for storing messages" }'
    

    The producer waits for messages.

  4. Input the following messages to the Avro console producer.

    Note

    You must enter a tab between the key and value. Replace TAB with a tab.

    1 TAB {"cust_id":1313131, "year":2012, "expenses":[1313.13, 2424.24]}
    2 TAB {"cust_id":3535353, "year":2011, "expenses":[761.35, 92.18, 14.41]}
    3 TAB {"cust_id":7979797, "year":2011, "expenses":[4489.00]}
    
  5. Verify that the Kafka Avro console producer published the messages to the topic by running a Kafka Avro console consumer. Specify the print.key property to have the consumer display the Kafka key. For example:

    kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-avro-console-consumer \
        --bootstrap-server localhost:9092 --topic topic_avrokv \
        --from-beginning --property print.key=true
    
  6. Open a new terminal window, log in to the Tanzu Greenplum coordinator host as the gpadmin administrative user, and set up the Greenplum environment. For example:

    $ ssh gpadmin@gpcoord
    gpcoord$ . /usr/local/greenplum-db/greenplum_path.sh
    
  7. Construct the load configuration file. Open a file named avrokvload_cfg.yaml in the editor of your choice. For example:

    gpcoord$ vi avrokvload_cfg.yaml
    
  8. Fill in the load configuration parameter values based on your environment. This example assumes:

    • Your Tanzu Greenplum coordinator hostname is gpcoord.
    • The Tanzu Greenplum server is running on the default port.
    • Your Kafka broker host and port is localhost:9092.
    • Your Confluent Schema Registry address is http://localhost:8081. The avrokvload_cfg.yaml file might include the following contents:
    DATABASE: testdb
    USER: gpadmin
    HOST: gpcoord
    PORT: 5432
    VERSION: 2
    KAFKA:
       INPUT:
         SOURCE:
            BROKERS: localhost:9092
            TOPIC: topic_avrokv
         VALUE:
            COLUMNS:
              - NAME: c1
                TYPE: json
            FORMAT: avro
            AVRO_OPTION:
              SCHEMA_REGISTRY_ADDR: http://localhost:8081
         KEY:
            COLUMNS:
              - NAME: id
                TYPE: json
            FORMAT: avro
            AVRO_OPTION:
              SCHEMA_REGISTRY_ADDR: http://localhost:8081
         ERROR_LIMIT: 0
       OUTPUT:
         TABLE: avrokv_from_kafka
         MAPPING:
            - NAME: id
              EXPRESSION: id
            - NAME: customer_id
              EXPRESSION: (c1->>'cust_id')::int
            - NAME: year
              EXPRESSION: (c1->>'year')::int
            - NAME: expenses
              EXPRESSION: array(select json_array_elements(c1->'expenses')::text::float)
       COMMIT:
         MINIMAL_INTERVAL: 2000
    

    The mapping in this configuration assigns each message value field to a separate column and ignores the message key.

  9. Create the target Tanzu Greenplum table named avrokv_from_kafka. For example:

    gpcoord$ psql -d testdb
    
    testdb=# CREATE TABLE avrokv_from_kafka( id json, customer_id int, year int, expenses decimal(9,2)[] );
    
  10. Exit the psql subsystem:

    testdb=# \q
    
  11. Run the gpkafka load command to batch load the JSON data published to the topic_json_gpkafka topic into the Greenplum table. For example:

    gpcoord$ gpkafka load --quit-at-eof ./avrokvload_cfg.yaml
    

    The command exits after it reads all data published to the topic.

  12. Examine the command output, looking for messages that identify the number of rows inserted/rejected. For example:

    ... -[INFO]:- ... Inserted 3 rows
    ... -[INFO]:- ... Rejected 0 rows
    
  13. View the contents of the Tanzu Greenplum target table avrokv_from_kafka:

    gpcoord$ psql -d testdb
    
    testdb=# SELECT * FROM avrokv_from_kafka ORDER BY customer_id;
     id | customer_id | year |       expenses       
    ----+-------------+------+----------------------
     1  |     1313131 | 2012 | {1313.13,2424.24}
     2  |     3535353 | 2011 | {761.35,92.18,14.41}
     3  |     7979797 | 2011 | {4489.00}
    (3 rows)
    
check-circle-line exclamation-circle-line close-line
Scroll to top icon