In this example, you merge data from a Kafka topic named customer_orders into a Greenplum Database table named customer_orders_tbl. You perform the operation as the Greenplum role gpadmin. The table customer_orders_tbl resides in the public schema in a Greenplum database named testdb.

A producer of the Kafka customer_orders topic emits customer order messages in CSV format that include the customer identifier (integer) and an order amount (decimal). For example, a message for a customer with identifier 123 who spent $456.78 follows:

"123","456.78"

You will run a Kafka console producer to emit customer order messages, start a Greenplum Streaming Server instance, and use the GPSS gpsscli subcommands to merge and load the data into the customer_orders_tbl Greenplum table. This table has pre-existing data that the merge will overwrite.

Prerequisites

Before you start this procedure, ensure that you:

  • Have administrative access to run Kafka and Greenplum Database clusters.
  • Have configured connectivity as described in both the Greenplum Streaming Server Prerequisites section and the Kafka Prerequisites.
  • Identify and note the ZooKeeper hostname and port.
  • Identify and note the hostname and port of the Kafka broker(s).
  • Identify and note the hostname and port of the Greenplum Database coordinator node.
  • Register the GPSS extension.

This procedure assumes that you have installed the Apache Kafka distribution. If you are using a different Kafka distribution, you may need to adjust certain commands in the procedure.

Procedure

  1. Log in to a host in your Kafka cluster. For example:

    $ ssh kafkauser@kafkahost
    kafkahost$ 
    
  2. Create a Kafka topic named customer_orders. For example:

    kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-topics.sh --create \
        --zookeeper localhost:2181 --replication-factor 1 --partitions 1 \
        --topic customer_orders
    
  3. Open a file named sample_customer_data.csv in the editor of your choice. For example:

    kafkahost$ vi sample_customer_data.csv
    
  4. Copy/paste the following text to add CSV-format data into the file, and then save and exit:

    "1313131","1000.00"
    "4444444","99.13"
    "1515151","500.05"
    "6666666","1.12"
    "1717171","3000.03"
    
  5. Stream the contents of the sample_customer_data.csv file to a Kafka console producer. For example:

    kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-producer.sh \
        --broker-list localhost:9092 \
        --topic customer_orders < sample_customer_data.csv
    
  6. Run the Kafka console consumer to verify that the Kafka console producer published the messages to the topic. For example:

    kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-consumer.sh \
        --bootstrap-server localhost:9092 --topic customer_orders \
        --from-beginning
    
  7. Open a new terminal window, log in to the Greenplum Database coordinator host as the gpadmin administrative user, and set up the Greenplum environment. For example:

    $ ssh gpadmin@gpcoord
    gpcoord$ . /usr/local/greenplum-db/greenplum_path.sh
    
  8. Construct the the Greenplum Streaming Server configuration file. For example, open a file named gpsscfg_ex.json in the editor of your choice:

    gpcoord$ vi gpsscfg_ex.json
    
  9. Designate a GPSS listen port number of 5019 and a gpfdist port number of 8319 in the configuration file. For example, copy/paste the following into the gpsscfg_ex.json file, and then save and exit the editor:

    {
        "ListenAddress": {
            "Host": "",
            "Port": 5019
        },
        "Gpfdist": {
            "Host": "",
            "Port": 8319
        }
    }
    
  10. Start the Greenplum Streaming Server instance in the background, specifying the log directory ./gpsslogs. For example:

    gpcoord$ gpss --config gpsscfg_ex.json --log-dir ./gpsslogs & 
    
  11. Construct the gpkafka load configuration file. Open a file named custorders_cfg.yaml in the editor of your choice. For example:

    gpcoord$ vi custorders_cfg.yaml
    
  12. Fill in the load configuration parameter values based on your environment. This example assumes:

    • Your Greenplum Database coordinator hostname is gpcoord.
    • The Greenplum Database server is running on the default port.
    • Your Kafka broker host and port is localhost:9092.
    • You want to write the Kafka data to a Greenplum Database table named customer_orders_tbl located in the public schema of a database named testdb.
    • You want to write the customer identifier and order data to Greenplum. If the customer is already present in the table, replace the order amount with the amount read from Kafka. If the customer is not present in the table, add the customer identifier and order amount. You will set merge- and update-related properties in the file to reflect this. The custorders_cfg.yaml file would include the following contents:
    DATABASE: testdb
    USER: gpadmin
    HOST: gpcoord
    PORT: 5432
    KAFKA:
       INPUT:
          SOURCE:
            BROKERS: localhost:9092
            TOPIC: customer_orders
          COLUMNS:
            - NAME: id
              TYPE: int
            - NAME: order_amount
              TYPE: decimal(9,2)
          FORMAT: csv
          ERROR_LIMIT: 25
       OUTPUT:
          TABLE: customer_orders_tbl
          MODE: MERGE
          MATCH_COLUMNS:
            - id
          UPDATE_COLUMNS:
            - amount
          MAPPING:
            - NAME: id
              EXPRESSION: id
            - NAME: amount
              EXPRESSION: order_amount
       COMMIT:
          MINIMAL_INTERVAL: 2000
    
  13. Start the psql subsystem:

    gpcoord$ psql -d testdb
    testdb=#
    
  14. Create the target Greenplum Database table named customer_orders_tbl, and insert two rows of data in the table. For example:

    CREATE TABLE customer_orders_tbl( id int8, amount decimal(9,2) );
    INSERT INTO customer_orders_tbl VALUES (1717171, 17.17);
    INSERT INTO customer_orders_tbl VALUES (1515151, 15.15);
    
  15. Exit the psql subsystem:

    testdb=# \q
    
  16. Submit the Kafka data load job to the GPSS instance running on port number 5019. (You may consider opening a new terminal window to run the command.) For example to submit a job named orders1:

    gpcoord$ gpsscli submit --name orders1 --gpss-port 5019 ./custorders_cfg.yaml
    20200804 12:54:19.25262,116652,info,JobID: d577cf37890b5b6bf4e713a9586e86c9,JobName: orders1
    
  17. List all GPSS jobs. For example:

    gpcoord$ gpsscli list --all --gpss-port 5019
    JobName          JobID                               GPHost       GPPort   DataBase    Schema      Table                  Topic            Status
    orders1          d577cf37890b5b6bf4e713a9586e86c9    localhost    5432     testdb      public      customer_orders_tbl    customer_orders  JOB_SUBMITTED 
    

    The list subcommand displays all jobs. Notice the entry for the orders1 job that you just submitted, and that the job is in the Submitted state.

  18. Start the job named orders1. For example:

    gpcoord$ gpsscli start orders1 --gpss-port 5019
    20200804 12:57:57.35153,117918,info,Job orders1 is started
    
  19. Stop the job named orders1. For example:

    gpcoord$ gpsscli stop orders1 --gpss-port 5019
    20200804 13:05:09.24280,117506,info,stop job: orders1 success
    
  20. Examine the gpss command output and log file, looking for messages that identify the number of rows inserted/rejected. For example:

    ... -[INFO]:- ... Inserted 5 rows
    ... -[INFO]:- ... Rejected 0 rows
    
  21. View the contents of the Greenplum Database target table customer_orders_tbl:

    gpcoord$ psql -d testdb
    
    SELECT * FROM customer_orders_tbl ORDER BY id;
       id    | amount  
    ---------+---------
     1313131 | 1000.00
     1515151 |  500.05
     1717171 | 3000.03
     4444444 |   99.13
     6666666 |    1.12
    (5 rows)
    

    Notice that the amount value for customers with ids 1515151 and 1717171 have been updated to the total_amount read from the Kafka message.

check-circle-line exclamation-circle-line close-line
Scroll to top icon