Note

The Greenplum Streaming Server Kafka data source is also known as the Greenplum-Kafka Integration.

Apache Kafka is a fault-tolerant, low-latency, distributed publish-subscribe message system. The Greenplum Streaming Server supports loading Kafka data from the Apache and Confluent Kafka distributions. Refer to the Apache Kafka Documentation for more information about Apache Kafka.

A Kafka message may include a key and a value, and may be comprised of a single line or multiple lines. Kafka stores streams of messages (or records) in categories called topics. A Kafka producer publishes records to partitions in one or more topics. A Kafka consumer subscribes to a topic and receives records in the order that they were sent within a given Kafka partition. Kafka does not guarantee the order of data originating from different Kafka partitions.

You can use the gpsscli or gpkafka load utilities to load Kafka data into Greenplum Database.

Note

gpkafka load is a wrapper around the Greenplum Streaming Server gpss and gpsscli commands. VMware recommends that you migrate to using these utilities directly.

Both the gpss server and the gpkafka load utilities are a Kafka consumer. They ingest streaming data from a single Kafka topic, using Greenplum Database readable external tables to transform and insert or update the data into a target Greenplum table. You identify the Kafka source, data format, and the Greenplum connection options and target table definition in a YAML-formatted load configuration file that you provide to the utility. In the case of user interrupt or exit, the utility resumes a subsequent data load operation specifying the same Kafka topic and target Greenplum Database table names from the last recorded offset.

Requirements

The Greenplum Streaming Server requires Kafka version 0.11 or newer for exactly-once delivery assurance. You can run with an older version of Kafka (but lose the exactly-once guarantee) by adding the following PROPERTIES or rdkafka_prop (v3) block to your gpkafka.yaml load configuration file:

PROPERTIES:
      api.version.request: false
      broker.version.fallback: 0.8.2.1

Load Procedure

You will perform the following tasks when you use the Greenplum Streaming Server to load Kafka data into a Greenplum Database table:

  1. Ensure that you meet the Prerequisites.
  2. Register the Greenplum Streaming Server extension.
  3. Identify the format of the Kafka data.
  4. (Optional) Register custom data formatters.
  5. Construct the load configuration file.
  6. Create the target Greenplum Database table.
  7. Assign Greenplum Database role permissions to the table, if required.
  8. Run the gpkafka load command to load the Kafka data into Greenplum Database.
  9. Check the progress of the load operation.
  10. Check for load errors. (Note that the naming format for gpkafka log files is gpkafka_*date*.log.)

Prerequisites

Before using the gpsscli or gpkafka utilities to load Kafka data to Greenplum Database, ensure that you:

  • Meet the Prerequisites documented for the Greenplum Streaming Server, and configure and start the server.
  • Have access to a running Kafka cluster with ZooKeeper, and that you can identify the hostname(s) and port number(s) of the Kafka broker(s) serving the data.
  • Can identify the Kafka topic of interest.
  • Can run the command on a host that has connectivity to:
    • Each Kafka broker host in the Kafka cluster.
    • The Greenplum Database coordinator and all segment hosts.

About Supported Kafka Message Data Formats

The Greenplum Streaming Server supports Kafka message key and value data in the following formats:

Format Description
avro Avro-format data. GPSS supports:
  • Loading Kafka message key or value data from a single-object encoded Avro file.
  • Using the Avro schema of a Kafka message key and/or value registered in a Confluent Schema Registry to load Avro-format key and/or value data.
  • Using the Avro schema specified in a separate .avsc file located on each Greenplum Database segment host file system to load Avro-format key or value data, but not both.

In all cases, GPSS reads Avro data from Kafka only as a single JSON-type column.

GPSS supports libz-, lzma- and snappy-compressed Avro data from Kafka.

binary Binary format data. GPSS reads binary data from Kafka only as a single bytea-type column.
csv Comma-delimited text format data.
custom Data of a custom format, parsed by a custom formatter function.
delimited Text data separated by a configurable delimiter. The `delimited` data format supports a multi-byte delimiter.
json, jsonl (version 2 only) JSON- or JSONB-format data. Specify the json format when the file is in JSON or JSONB format. GPSS can read JSON data as a single object or can read a single JSON record per line. You must define a mapping if you want GPSS to write the data into specific columns in the target Greenplum Database table.
Note: GPSS supports JSONB-format data only when loading to Greenplum 6.
Note: Specify FORMAT: jsonl in version 2 format load configuration files. Specify json with is_jsonl: true in version 3 (Beta) format load configuration files.

To write Kafka message data into a Greenplum Database table, you must identify the data format in the load configuration file.

Avro

Specify the avro format when your Kafka message data is a single-object encoded Avro file or you are using the Confluent Schema Registry to load Avro message key and/or value data. (If the schema registry is SSL-secured, refer to Accessing an SSL-Secured Schema Registry for configuration details.) GPSS reads Avro data from Kafka and loads it into a single JSON-type column. You must define a mapping if you want GPSS to write the data into specific columns in the target Greenplum Database table.

Binary

Use the binary format when your Kafka message data is a stream of bytes. GPSS reads binary data from Kafka and loads it into a single bytea-type column.

CSV

Use the csv format when your Kafka message data is comma-delimited text and conforms to RFC 4180. The message content may not contain line ending characters (CR and LF).

Data in csv format may appear in Kafka messages as follows:

"1313131","12","backorder","1313.13"
"3535353","11","shipped","761.35"
"7979797","11","partial","18.72"

Custom

The Greenplum Streaming Server provides a custom data formatter plug-in framework for Kafka messages using user-defined functions. The type of Kafka message data processed by a custom formatter is formatter-specific. For example, a custom formatter may process compressed or complex data.

Refer to Custom Formatter for Kafka for an example custom formatter that loads Kafka data into Greenplum Database.

Delimited Text

The Greenplum Streaming Server supports loading Kafka data delimited by one or more characters that you specify. Use the delimited format for such data. The delimiter may be a multi-byte value and up to 32 bytes in length. You can also specify quote and escape characters, and an end-of-line prefix.

Note

The delimiter may not contain the quote or escape characters.

When you specify a quote character:

  • The left and right quotes are the same.
  • Each data element must be quoted. GPSS does not support mixed quoted and unquoted content.
  • You must also define an escape character.
  • GPSS keeps the original format of any character between the quotes, except the quote and escape characters. This especially applies to the delimiter and \n, which do not require additional escape if they are quoted.
  • The quote character is presented as the escape character plus the quote character (for example, \”).
  • The escape character is presented as the escape character plus the escape character (for example, \)
  • GPSS parses multiple escape characters from left to right.

When you do not specify a quote character:

  • The escape character is optional.

  • If you do not specify an escape character, GPSS treats the delimiter as the column separator, and treats any end-of-line prefix plus \n as the row separator.

  • If you do specify an escape character:

    • GPSS uses the escape character plus the delimiter as the column separator.
    • GPSS uses the escape character plus the end-of-line prefix plus \n as the row separator.
    • The escape character plus the escape character is the escape character itself.
    • GPSS parses multiple escape characters from left to right.

Sample data using a pipe ('|') delimiter character follows:

1313131|12|backorder|1313.13
3535353|11|shipped|761.35
7979797|11|partial|18.72

JSON (single object)

Specify the json format when your Kafka message data is in JSON or JSONB format and you want GPSS to read JSON data from Kafka as a single object into a single column (per the JSON specification, newlines and white space are ignored). You must define a mapping if you want GPSS to write the data into specific columns in the target Greenplum Database table.

Note

GPSS supports JSONB-format data only when loading to Greenplum 6.

JSON (single record per line)

Specify FORMAT: jsonl in version 2 format load configuration files or specify json with is_jsonl: true in version 3 (Beta) format load configuration files when your Kafka message data is in JSON format, single JSON record per line. You must define a mapping if you want GPSS to write the data into specific columns in the target Greenplum Database table.

Sample JSON message data:

{ "cust_id": 1313131, "month": 12, "amount_paid":1313.13 }
{ "cust_id": 3535353, "month": 11, "amount_paid":761.35 }
{ "cust_id": 7979797, "month": 11, "amount_paid":18.82 }

About Multiple-Line Kafka Messages

A Kafka message may contain a single line or multiple lines.

GPSS supports the following combinations of single and multiple line messages for the key and value data input components:

Key Value
single-line none
none single-line
single-line single-line
multi-line none
none multi-line

GPSS does not support multiple-line messages for both the key and value.

Registering a Custom Formatter

A custom data formatter for Kafka messages is a user-defined function. If you are using a custom formatter, you must create the formatter function and register it in each database in which you will use the function to write Kafka data to Greenplum tables.

Constructing the gpkafka.yaml Configuration File

You configure a data load operation from Kafka to Greenplum Database via a YAML-formatted configuration file. This configuration file includes parameters that identify the source Kafka data and information about the Greenplum Database connection and target table, as well as error and commit thresholds for the operation.

The Greenplum Streaming Server supports three versions of the YAML configuration file: version 1 (deprecated), version 2, and version 3 (Beta). Versions 2 and 3 of the configuration file format supports all features of Version 1 of the configuration file, and introduce support for loading both the Kafka message key and value to Greenplum, as well as loading meta data.

Refer to the gpkafka.yaml reference page for Version 1 configuration file contents and syntax. Refer to the gpkafka-v2.yaml reference page for Version 2 configuration file format and the configuration parameters that this version supports. gpkafka-v3.yaml describes the Version 3 (Beta) format.

Contents of a sample Version 2 YAML configuration file named loadcfg2.yaml follows:

DATABASE: ops
USER: gpadmin
PASSWORD: changeme
HOST: mdw-1
PORT: 5432
VERSION: 2
KAFKA:
   INPUT:
      SOURCE:
         BROKERS: kbrokerhost1:9092
         TOPIC: customer_expenses2
         PARTITIONS: (1, 2...4, 7)
      VALUE:
         COLUMNS:
           - NAME: c1
             TYPE: json
         FORMAT: avro
         AVRO_OPTION:
           SCHEMA_REGISTRY_ADDR: http://localhost:8081
      KEY:
         COLUMNS:
           - NAME: key
             TYPE: json
         FORMAT: avro
         AVRO_OPTION:
           SCHEMA_REGISTRY_ADDR: http://localhost:8081
      FILTER: (c1->>'month')::int = 11 
      ERROR_LIMIT: 25
   OUTPUT:
      SCHEMA: payables
      TABLE: expenses2
      MAPPING:
        - NAME: customer_id
          EXPRESSION: (c1->>'cust_id')::int
        - NAME: newcust
          EXPRESSION: ((c1->>'cust_id')::int > 5000000)::boolean
        - NAME: expenses
          EXPRESSION: (c1->>'expenses')::decimal
        - NAME: tax_due
          EXPRESSION: ((c1->>'expenses')::decimal * .075)::decimal
   METADATA:
      SCHEMA: gpkafka_internal
   COMMIT:
      MINIMAL_INTERVAL: 2000

Greenplum Database Options (Version 2-Focused)

You identify the Greenplum Database connection options via the DATABASE, USER, PASSWORD, HOST, and PORT parameters.

The VERSION parameter identifies the version of the GPSS YAML configuration file. The default version is Version 1. You must specify version 2 or version v3.

KAFKA:INPUT Options

Specify the Kafka brokers and topic of interest using the SOURCE block. You must create the Kafka topic prior to loading data. By default, GPSS reads Kafka messages from all partitions. You may specify a single, a comma-separated list, and/or a range of partition numbers to restrict the partitions from which GPSS reads messages. The PARTITIONS property is supported only for version 2 and 3 load configuration file formats.

Note

You must configure different jobs that load from the same Kafka topic to the same Greenplum Database table with non-overlapping PARTITIONS values.

When you provide a VALUE block, you must specify the COLUMNS and FORMAT parameters. The VALUE:COLUMNS block includes the name and type of each data element in the Kafka message. The default source-to-target data mapping behaviour of GPSS is to match a column name as defined in COLUMNS:NAME with a column name in the target Greenplum Database OUTPUT:TABLE:

  • You must identify the Kafka data elements in the order in which they appear in the Kafka message.
  • You may specify NAME: __IGNORED__ to omit a Kafka message value data element from the load operation.
  • You must provide the same name for each non-ignored Kafka data element and its associated Greenplum Database table column.
  • You must specify an equivalent data type for each non-ignored Kafka data element and its associated Greenplum Database table column.

The VALUE:FORMAT keyword identifies the format of the Kafka message value. GPSS supports comma-delimited text format (csv ) and data that is separated by a configurable delimiter (delimited). GPSS also supports binary (binary), single object or single record per line JSON/JSONB (json or jsonl), custom (custom), and Avro (avro) format value data.

When you provide a META block, you must specify a single JSON-type COLUMNS and the FORMAT: json. Meta data for Kafka includes the following properties:

  • topic - text
  • partition - int
  • offset - bigint
  • timestamp - bigint

When you provide a KEY block, you must specify the COLUMNS and FORMAT parameters. The KEY:COLUMNS block includes the name and type of each element of the Kafka message key, and is subject to the same restrictions as identified for VALUE:COLUMNS above. The KEY:FORMAT keyword identifies the format of the Kafka message key. GPSS supports avro, binary, csv, custom, delimited, json, and jsonl format key data.

The FILTER parameter identifies a filter to apply to the Kafka input messages before the data is loaded into Greenplum Database. If the filter evaluates to true, GPSS loads the message. The message is dropped if the filter evaluates to false. The filter string must be a valid SQL conditional expression and may reference one or more KEY or VALUE column names.

The ERROR_LIMIT parameter identifies the number of errors or the error percentage threshold after which GPSS should exit the load operation. The default ERROR_LIMIT is zero; the load operation is stopped when the first error is encountered.

KAFKA:OUTPUT Options

Note

You must specify only one of the OUTPUT or OUTPUTS blocks.

You identify the target Greenplum Database schema name and table name via the KAFKA:OUTPUT: SCHEMA and TABLE parameters. You must pre-create the Greenplum Database table before you attempt to load Kafka data.

The default load mode is to insert Kafka data into the Greenplum Database table. GPSS also supports updating and merging Kafka message data into a Greenplum table. You specify the load MODE, the MATCH_COLUMNS and UPDATE_COLUMNS, and any UPDATE_CONDITIONs that must be met to merge or update the data. In MERGE MODE, you can also specify ORDER_COLUMNS to filter out duplicates and a DELETE_CONDITION.

You can override the default mapping of the INPUT VALUE:COLUMNS and KEY:COLUMNS by specifying a MAPPING block in which you identify the association between a specific column in the target Greenplum Database table and a Kafka message value or key data element. You can also map the META data columns, and map a Greenplum Database table column to a value expression.

Note

When you specify a MAPPING block, ensure that you provide entries for all Kafka data elements of interest - GPSS does not automatically match column names when you provide a MAPPING.

Loading to Multiple Greenplum Database Tables

Note

(version 2) You must specify only one of the OUTPUT or OUTPUTS blocks.

If you want to load from a single Kafka topic to multiple Greenplum Database tables, you provide an OUTPUTS:TABLE (version 2) or targets:gpdb:tables:table (version 3 (Beta)) block for each table, and specify the properties that identify the data targeted to each.

About the Merge Load Mode

MERGE mode is similar to an UPSERT operation; GPSS may insert new rows in the database, or may update an existing database row that satisfies match and update conditions. GPSS deletes rows in MERGE mode when the data satisfies an optional DELETE_CONDITION that you specify.

GPSS stages a merge operation in a temporary table, generating the SQL to populate the temp table from the set of OUTPUT configuration properties that you provide.

GPSS uses the following algorithm for MERGE mode processing:

  1. Create a temporary table like the target table.
  2. Generate the SQL to insert the source data into the temporary table.
    1. Add the MAPPINGS.
    2. Add the FILTER.
    3. Use MATCH_COLUMNS and ORDER_COLUMNS to filter out duplicates.
  3. Update the target table from rows in the temporary table that satisfy MATCH_COLUMNS, UPDATE_COLUMNS, and UPDATE_CONDITION.
  4. Insert non-matching rows into the target table.
  5. Delete rows in the target table that satisfy MATCH_COLUMNS and the DELETE_CONDITION.
  6. Truncate the temporary table.

Other Options

The KAFKA:METADATA:SCHEMA parameter specifies the name of the Greenplum Database schema in which GPSS creates external and history tables.

GPSS commits Kafka data to the Greenplum Database table at the row and/or time intervals that you specify in the KAFKA:COMMIT: MAX_ROW and/or MINIMAL_INTERVAL parameters. If you do not specify these properties, GPSS commits data at the default MINIMAL_INTERVAL, 5000ms.

You can configure GPSS to run a task (user-defined function or SQL commands) after GPSS reads a configurable number of batches from Kafka. Use the KAFKA:TASK: POST_BATCH_SQL and BATCH_INTERVAL configuration parameters to specify the task and the batch interval.

Specify a KAFKA:PROPERTIES block to set Kafka consumer configuration properties. GPSS sends the property names and values to Kafka when it instantiates a consumer for the load operation.

About KEYs, VALUEs, and FORMATs

You can specify any data format in the Version 2 configuration file KEY:FORMAT and VALUE:FORMAT parameters, with some restrictions. The Greenplum Streaming Server supports the following KEY:FORMAT and VALUE:FORMAT combinations:

KEY:FORMAT VALUE:FORMAT Description
any none (VALUE block omitted) GPSS loads only the Kafka message key data, subject to any MAPPING that you specify, to Greenplum Database.
none (KEY block omitted) any Equivalent to configuration file Version 1. GPSS ignores the Kafka message key and loads only the Kafka message value data, subject to any MAPPING that you specify, to Greenplum Database.
csv any Not permitted.
any csv Not permitted.
avro, binary, delimited, json, jsonl avro, binary, delimited, json, jsonl Any combination is permitted. GPSS loads both the Kafka message key and value data, subject to any MAPPING that you specify, to Greenplum Database.

About the JSON Format and Column Type

When you specify FORMAT: json or FORMAT: jsonl, valid COLUMN:TYPEs for the data include json or jsonb. You can also specify the new GPSS gp_jsonb (Beta) or gp_json (Beta) column types.

  • gp_jsonb is an enhanced JSONB type that adds support for \u escape sequences and unicode. For example, gp_jsonb can escape \uDD8B and \u0000 as text format, but jsonb treats these characters as illegal.
  • gp_json is an enhanced JSON type that can tolerate certain illegal unicode sequences. For example, gp_json automatically escapes incorrect surrogate pairs and processes \u0000 as \\u0000. Note that unicode escape values cannot be used for code point values above 007F when the server encoding is not UTF8.

You can use the gp_jsonb (Beta) and gp_json (Beta) data types as follows:

  • As the COLUMN:TYPE when the target Greenplum Database table column type is json or jsonb.

  • In a MAPPING when the target Greenplum Database column is text or varchar. For example:

    EXPRESSION: (j->>'a')::text
    
  • In a MAPPING when FORMAT: avro and the target Greenplum Database column is json or jsonb. For example:

    EXPRESSION: j::gp_jsonb
    

    or

    EXPRESSION: j::gp_json
    
  • In a MAPPING when FORMAT: avro and the target Greenplum Database column is text or varchar. For example:

    EXPRESSION: (j::gp_jsonb->>'a')::text
    

    or

    EXPRESSION: (j::gp_json->>'a')::text
    
Note

The gp_jsonb (Beta) and gp_json (Beta) data types are defined in an extension named dataflow. You must CREATE EXTENSION dataflow; in each database in which you choose to use these (Beta) data types.

Preserving Ill-Formed JSON Escape Sequences

GPSS exposes a configuration parameter that you can use with the gp_jsonb and gp_json types. The name of this parameter is gpss.json_preserve_ill_formed_prefix. When set, GPSS does not return an error when it encounters an ill-formed JSON escape sequence with these types, but instead prepends it with the prefix that you specify.

For example, if gpss.json_preserve_ill_formed_prefix is set to the string "##" as follows:

SET gpss.json_preserve_ill_formed_prefix = "##";

and GPSS encounters an ill-formed JSON sequence such as the orphaned low surrogate \ude04X, GPSS writes the data as ##\ude04X instead.

About Transforming and Mapping Kafka Input Data

You can define a MAPPING between the Kafka input data (VALUE:COLUMNS, KEY:COLUMNS, and META:COLUMNS) and the columns in the target Greenplum Database table. Defining a mapping may be useful when you have a multi-field input column (such as a JSON-type column), and you want to assign individual components of the input field to specific columns in the target table.

You might also use a MAPPING to assign a value expression to a target table column. The expression must be one that you could specify in the SELECT list of a query, and can include a constant value, a column reference, an operator invocation, a built-in or user-defined function call, and so forth.

If you choose to map more than one input column in an expression, you can can create a user-defined function to parse and transform the input column and return the columns of interest.

For example, suppose a Kafka producer emits the following JSON messages to a topic:

{ "customer_id": 1313131, "some_intfield": 12 }
{ "customer_id": 77, "some_intfield": 7 }
{ "customer_id": 1234, "some_intfield": 56 }

You could define a user-defined function, udf_parse_json(), to parse the data as follows:

=> CREATE OR REPLACE FUNCTION udf_parse_json(value json)
     RETURNS TABLE (x int, y text)
   LANGUAGE plpgsql AS $$
     BEGIN
        RETURN query
        SELECT ((value->>'customer_id')::int), ((value->>'some_intfield')::text);
     END $$;

This function returns the two fields in each JSON record, casting the fields to integer and text, respectively.

An example MAPPING for the topic data in a JSON-type KAFKA:INPUT:COLUMNS named jdata follows:

MAPPING: 
  cust_id: (jdata->>'customer_id')
  field2: ((jdata->>'some_intfield') * .075)::decimal
  j1, j2: (udf_parse_json(jdata)).*

The Greenplum Database table definition for this example scenario is:

=> CREATE TABLE t1map( cust_id int, field2 decimal(7,2), j1 int, j2 text );

About Mapping Avro Bytes Fields to Base64-Encoded Strings

When you specify AVRO_OPTION:BYTES_TO_BASE64, GPSS maps Avro bytes fields to base64-encoded strings. You can provide a MAPPING to decode these strings and write the data to a Greenplum bytea column.

For example, if the Avro schema is:

{
    "type": "record",
    "name": "bytes_test",
    "fields": [
                {"name": "id", "type": "long"},
                {"name": "string", "type": "string"},
                {"name": "bytes",  "type": "bytes"},
        {
            "name": "inner_record",
            "type": {
                "type": "map",
                "values": {
                                                "type": "bytes",
                                                "name": "nested_bytes"
                }
            }
        }
    ]
}

And if your load configuration file includes these input property settings:

VALUE:
    COLUMNS:
      - NAME: c1
        TYPE: json
    FORMAT: avro
    AVRO_OPTION:
      SCHEMA_REGISTRY_ADDR: http://localhost:8081
      BYTES_TO_BASE64: true

You can define a MAPPING to decode the encoded strings as follows:

MAPPING:
  - NAME: id
    EXPRESSION: (c1->>'id')::int
  - NAME: bytes1
    EXPRESSION: (decode(c1->>'bytes', 'base64'))
  - NAME: bytes2
    EXPRESSION: (decode((c1->>'inner_record')::json->>'nested_bytes', 'base64'))

This mapping decodes the bytes1 and bytes2 fields to the Greenplum bytea data type. GPSS would expect to load these mapped fields to a Greenplum table with the following definition:

CREATE TABLE avbyte( id int, bytes1 bytea, bytes2 bytea);

Creating the Greenplum Table

You must pre-create the Greenplum table before you load Kafka data into Greenplum Database. You use the KAFKA:OUTPUT: SCHEMA and TABLE load configuration file parameters to identify the schema and table names.

The target Greenplum table definition must include each column that GPSS will load into the table. The table definition may include additional columns; GPSS ignores these columns, and loads no data into them.

The name and data type that you specify for a column of the target Greenplum Database table must match the name and data type of the related, non-ignored Kafka message element. If you have defined a column mapping, the name of the Greenplum Database column must match the target column name that you specified for the mapping, and the type must match the target column type or expression that you define.

The CREATE TABLE command for the target Greenplum Database table receiving the Kafka topic data defined in the loadcfg2.yaml file presented in the Constructing the gpkafka.yaml Configuration File section follows:

 testdb=# CREATE TABLE payables.expenses2( customer_id int8, newcust bool,
            expenses decimal(9,2), tax_due decimal(7,2) );

Running the gpkafka load Command

Note

gpkafka load is a wrapper around the Greenplum Streaming Server (GPSS) gpss and gpsscli utilities. Starting in Greenplum Streaming Server version 1.3.2, gpkafka load no longer launches a gpss server instance, but rather calls the backend server code directly.

When you run gpkafka load, the command submits, starts, and stops a GPSS job on your behalf.

VMware recommends that you migrate to using the GPSS utilities directly.

You run the gpkafka load command to load Kafka data to Greenplum. When you run the command, you provide the name of the configuration file that defines the parameters of the load operation. For example:

$ gpkafka load loadcfg2.yaml

The default mode of operation for gpkafka load is to read all pending messages and then to wait for, and then consume, new Kafka messages. When running in this mode, gpkafka load waits indefinitely; you can interrupt and exit the command with Control-c.

To run the command in batch mode, you provide the --quit-at-eof option. In this mode, gpkafka load exits when there are no new messages in the Kafka stream.

gpkafka load resumes a subsequent data load operation specifying the same Kafka topic and target Greenplum Database table names from the last recorded offset.

Refer to the gpkafka load reference page for additional information about this command.

Note

GPSS cannot detect the addition of a new Kafka partition while a load operation is in progress. You must stop, and then restart the load operation to read Kafka messages published to the new partition.

Configuring the gpfdist Server Instance

The gpkafka load command uses the gpfdist or gpfdists protocol to load data into Greenplum. You can configure the protocol used for the load request by providing the --config gpfdistconfig.json option to the command, where gpfdistconfig.json identifies a GPSS configuration file that specifies gpfdist configuration in a Gpfdist protocol block. Refer to Configuring the Greenplum Streaming Server in the Greenplum Streaming Server documentation for detailed information about the file format and properties supported.

Note

gpkafka load reads the configuration specified in the Gpfdist protocol block of the gpfdistconfig.json file; it ignores the GPSS configuration specified in the ListenAddress block of the file.

Or, you may choose to provide gpfdist host or port configuration settings on the gpkafka load command line by specifying the --gpfdist-host hostaddr or --gpfdist-port portnum options to the command. Any options that you specify on the command line override settings provided in the gpfdistconfig.json file.

About Kafka Offsets, Message Retention, and Loading

Kafka maintains a partitioned log for each topic, assigning each record/message within a partition a unique sequential id number. This id is referred to as an offset. Kafka retains, for each gpkafka load invocation specifying the same Kafka topic and Greenplum Database table names, the last offset within the log consumed by the load operation. The Greenplum Streaming Server also records this offset value. Refer to Understanding Kafka Message Offset Management for more detailed information about how GPSS manages message offsets.

Kafka persists a message for a configurable retention time period and/or log size, after which it purges messages from the log. Kafka topics or messages can also be purged on demand. This may result in an offset mismatch between Kafka and the Greenplum Streaming Server.

gpkafka load returns an error if its recorded offset for the Kafka topic and Greenplum Database table combination is behind that of the current earliest Kafka message offset for the topic, or when the earliest and latest offsets do not match.

When you receive one of these messages, you can choose to:

  • Resume the load operation from the earliest available message published to the topic by specifying the --force‑reset‑earliest option to gpkafka load:

    $ gpkafka load --force-reset-earliest loadcfg2.yaml
    
  • Load only new messages published to the Kafka topic, by specifying the ‑‑force‑reset‑latest option with the command:

    $ gpkafka load --force-reset-latest loadcfg2.yaml
    
  • Load messages published since a specific timestamp (milliseconds since epoch), by specifying the --force‑reset‑timestamp option to gpkafka load. To determine the create time epoch timestamp for a Kafka message, run the Kafka console consumer on the topic specifying the --property print.timestamp=true option, and review the output. You can also use a converter such as EpocConverter to convert a human-readable date to epoch time.

    $ gpkafka load --force-reset-timestamp 1571066212000 loadcfg2.yaml
    
Note

Specifying the --force‑reset‑<xxx> options when loading data may result in missing or duplicate messages. Use of these options outside of the offset mismatch scenario is discouraged.

Alternatively, you can provide the FALLBACK_OPTION (version 2) or fallback_option (version 3 (Beta)) property in the load configuration file to instruct GPSS to automatically read from the specified offset when it detects a mismatch.

Checking the Progress of a Load Operation

Starting in version 1.4.1, GPSS keeps track of the progress of each Kafka load job in a separate CSV-format log file. The progress log file for a specific job is named progress_*jobname*_*jobid*_*date*.log, and resides in the following log directory:

  • If you are loading Kafka data to Greenplum with the gpkafka load command, GPSS writes the progress log file to the directory that you specified with the -l | --log-dir option to the command, or to the $HOME/gpAdminLogs directory.
  • If you are loading Kafka data to Greenplum with the gpsscli commands, GPSS writes the progress log file to the directory that you specified with the -l | --log-dir option when you started the GPSS server instance, or to the $HOME/gpAdminLogs directory.

A progress log file includes information and statistics about the load time, data size, and speed. It also includes the number of rows written to the Greenplum table, the number of rows rejected by Greenplum, and the total number of rows operated on by GPSS (inserted rows plus rejected rows).

A progress log file includes the following header row:

timestamp,pid,batch_id,start_time,end_time,total_byte,speed,total_read_count,inserted_rows,rejected_rows,total_rows

Example Kafka progress log message:

20220704 10:17:00.52827,101417,1,2022-07-04 17:16:33.421+00,2022-07-04 17:17:00.497+00,79712,2.88KB,997,991,6,997

When GPSS reads Kafka data in jsonl, delimited, or csv formats, a Kafka message may contain multiple rows. For these formats, the progress log total_read_count identifies the Kafka message number, while total_rows identifies the total number of inserted and rejected rows.

check-circle-line exclamation-circle-line close-line
Scroll to top icon