gpkafka configuration file (version 2).

Synopsis

DATABASE: <db_name>
USER: <user_name>
PASSWORD: <password>
HOST: <host>
PORT: <greenplum_port>
VERSION: 2
KAFKA:
   INPUT:
      SOURCE:
        BROKERS: <kafka_broker_host:broker_port> [, ... ]
        TOPIC: <kafka_topic>
        [PARTITIONS: (<partition_numbers>)]
        [FALLBACK_OFFSET: { earliest | latest }]
      [VALUE:
        COLUMNS:
           - NAME: { <column_name> | __IGNORED__ }
             TYPE: <column_data_type>
           [ ... ]
         FORMAT: <value_data_format>
         [[DELIMITED_OPTION:
            DELIMITER: <delimiter_string>
            [EOL_PREFIX: <prefix_string>]
            [QUOTE: <quote_char>]
            [ESCAPE: <escape_char>] ] |
         [AVRO_OPTION:
            [SCHEMA_REGISTRY_ADDR: <http://schemareg_host:schemareg_port> [, ... ]]
            [SCHEMA_CA_ON_GPDB: <sr_ca_file_path>]
            [SCHEMA_CERT_ON_GPDB: <sr_cert_file_path>]
            [SCHEMA_KEY_ON_GPDB: <sr_key_file_path>]
            [SCHEMA_MIN_TLS_VERSION: <minimum_version>]
            [SCHEMA_PATH_ON_GPDB: <path_to_file>]r
            [BYTES_TO_BASE64: <boolean>]] |
         [CSV_OPTION:
            [DELIMITER: <delim_char>]
            [QUOTE: <quote_char>]
            [NULL_STRING: <nullstr_val>]
            [ESCAPE: <escape_char>]
            [FORCE_NOT_NULL: <columns>]
            [FILL_MISSING_FIELDS: <boolean>]] |
         [JSONL_OPTION:
            [NEWLINE: <newline_str>]] |
         [CUSTOM_OPTION:
            NAME: <udf_name>
            PARAMSTR: <udf_parameter_string>]]
      [KEY:
        COLUMNS:
           - NAME: { <column_name> | __IGNORED__ }
             TYPE: <column_data_type>
           [ ... ]
         FORMAT: <key_data_format>
         [[DELIMITED_OPTION:
            DELIMITER: <delimiter_string> |
            [EOL_PREFIX: <prefix_string>]
            [QUOTE: <quote_char>]
            [ESCAPE: <escape_char>] ] |
         [AVRO_OPTION:
            [SCHEMA_REGISTRY_ADDR: <http://schemareg_host:schemareg_port> [, ... ]]
            [SCHEMA_CA_ON_GPDB: <sr_ca_file_path>]
            [SCHEMA_CERT_ON_GPDB: <sr_cert_file_path>]
            [SCHEMA_KEY_ON_GPDB: <sr_key_file_path>]
            [SCHEMA_MIN_TLS_VERSION: <minimum_version>]
            [SCHEMA_PATH_ON_GPDB: <path_to_file>]
            [BYTES_TO_BASE64: <boolean>]] |
         [CSV_OPTION:
            [DELIMITER: <delim_char>]
            [QUOTE: <quote_char>]
            [NULL_STRING: <nullstr_val>]
            [ESCAPE: <escape_char>]
            [FORCE_NOT_NULL: <columns>]
            [FILL_MISSING_FIELDS: <boolean>] |
         [CUSTOM_OPTION:
            NAME: <udf_name>
            PARAMSTR: <udf_parameter_string>]]
      [META:
         COLUMNS:
            - NAME: <meta_column_name>
              TYPE: { json | jsonb }
         FORMAT: json]
      [TRANSFORMER:
         PATH: <path_to_plugin_transform_library>
         ON_INIT: <plugin_transform_init_name>
         TRANSFORM: <plugin_transform_name>
         PROPERTIES:
           <plugin_transform_property_name>: <property_value>
           [ ... ] ]
      [FILTER: <filter_string>]
      [ENCODING: <char_set>]
      [ERROR_LIMIT: { <num_errors> | <percentage_errors> }]
   { OUTPUT:
      [SCHEMA: <output_schema_name>]
      TABLE: <table_name>
      [FILTER: <output_filter_string>]
      [MODE: <mode>]
      [MATCH_COLUMNS: 
         - <match_column_name>
         [ ... ]]
      [ORDER_COLUMNS: 
         - <order_column_name>
         [ ... ]]
      [UPDATE_COLUMNS: 
         - <update_column_name>
         [ ... ]]
      [UPDATE_CONDITION: <update_condition>]
      [DELETE_CONDITION: <delete_condition>]
      [TRANSFORMER:
         TRANSFORM: <udf_transform_udf_name>
         PROPERTIES:
           <udf_transform_property_name>: <property_value>
           [ ... ]
         COLUMNS:
           - <udf_transform_column_name>
           [ ... ] ]
      [MAPPING: 
         - NAME: <target_column_name>
           EXPRESSION: { <source_column_name> | <expression> } 
         [ ... ]
           |
         <target_column_name> : { <source_column_name> | <expression> }
         [ ... ] ] |
   OUTPUTS:
      - TABLE: <table_name>
        [MODE: <mode>]
        [MATCH_COLUMNS: 
           - <match_column_name>
           [ ... ]]
        [ORDER_COLUMNS: 
           - <order_column_name>
           [ ... ]]
        [UPDATE_COLUMNS: 
           - <update_column_name>
           [ ... ]]
        [UPDATE_CONDITION: <update_condition>]
        [DELETE_CONDITION: <delete_condition>]
        [TRANSFORMER:
           TRANSFORM: <udf_transform_udf_name>
           PROPERTIES:
              <udf_transform_property_name>: <property_value>
              [ ... ]
           COLUMNS:
              - <udf_transform_column_name>
              [ ... ] ]
        [MAPPING: 
           - NAME: <target_column_name>
             EXPRESSION: { <source_column_name> | <expression> } 
           [ ... ]
             |
           <target_column_name> : { <source_column_name> | <expression> }
           [ ... ] ]
      [...] }
   [METADATA:
      [SCHEMA: <metadata_schema_name>]]
   COMMIT:
      SAVE_FAILING_BATCH: <boolean>
      RECOVER_FAILING_BATCH: <boolean> (Beta)
      MAX_ROW: <num_rows>
      MINIMAL_INTERVAL: <wait_time>
      CONSISTENCY: { strong | at-least | at-most | none }
      IDLE_DURATION: <idle_time>
   [POLL:
      BATCHSIZE: <num_records>
      TIMEOUT: <poll_time>]
   [TASK:
      POST_BATCH_SQL: <udf_or_sql_to_run>
      BATCH_INTERVAL: <num_batches>
      PREPARE_SQL: <udf_or_sql_to_run>
      TEARDOWN_SQL: <udf_or_sql_to_run> ]
   [PROPERTIES:
      <kafka_property_name>: <kafka_property_value>
      [ ... ]]
[SCHEDULE:
   RETRY_INTERVAL: <retry_time>
   MAX_RETRIES: <num_retries>
   RUNNING_DURATION: <run_time>
   AUTO_STOP_RESTART_INTERVAL: <restart_time>
   MAX_RESTART_TIMES: <num_restarts>
   QUIT_AT_EOF_AFTER: <clock_time>]
[ALERT:
   COMMAND: <command_to_run>
   WORKDIR: <directory>
   TIMEOUT: <alert_time>]

Where you may specify any property value with a template variable that GPSS substitutes at runtime using the following syntax:

<PROPERTY:> {{<template_var>}}

Description

You specify load configuration parameters for the gpsscli and gpkafka utilities in a YAML-formatted configuration file. (This reference page uses the name gpkafka.yaml when referring to this file; you may choose your own name for the file.) Load parameters include VMware Tanzu Greenplum connection and target table information, Kafka broker and topic information, and error and commit thresholds.

Note

Version 2 of the gpkafka.yaml configuration file syntax supports KEY and VALUE blocks. Version 1 does not.

The gpsscli and gpkafka utilities process the YAML configuration file in order, using indentation (spaces) to determine the document hierarchy and the relationships between the sections. The use of white space in the file is significant, and keywords are case-sensitive.

Keywords and Values

Tanzu Greenplum Options

DATABASE: db_name
The name of the Tanzu Greenplum.
USER: user_name
The name of the Tanzu Greenplum user/role. This user_name must have permissions as described in the VMware Tanzu Greenplum Streaming Server documentation.
PASSWORD: password
The password for the Tanzu Greenplum user/role.
HOST: host
The host name or IP address of the Tanzu Greenplum coordinator host.
PORT: greenplum_port
The port number of the Tanzu Greenplum server on the coordinator host.
VERSION: 2
The version of the configuration file. You must specify VERSION: 2 when you configure VALUE and/or KEY blocks in the file.

KAFKA:INPUT: Options

SOURCE

Kafka input configuration parameters.

BROKERS: kafka_broker_host:broker_port
The host and port identifying the Kafka broker.
TOPIC: kafka_topic
The name of the Kafka topic from which to load data. The topic must exist.
PARTITIONS: (partition_numbers)
A single, a comma-separated list, and/or a range of partition numbers from which GPSS reads messages from the Kafka topic. A range that you specify with the M...N syntax includes both the range start and end values. By default, GPSS reads messages from all partitions of the Kafka topic.
Note

Ensure that you do not configure multiple jobs that specify overlapping partition numbers in the same topic; GPSS behavior is undefined.

FALLBACK_OFFSET: { earliest | latest }
Specifies the behaviour of GPSS when it detects a Kafka message offset gap. When set to earliest, GPSS automatically resumes a load operation from the earliest available published message. When set to latest, GPSS loads only new messages to the Kafka topic. If this property is not set, GPSS returns an error.
VALUE:
The Kafka message value field names, data types, and format. You must specify all Kafka data elements in the order in which they appear in the Kafka message. Optional when you specify a KEY block; GPSS ignores the Kafka message value in this circumstance.
KEY:
The Kafka message key field names, data types, and format. You must specify all Kafka key elements in the order in which they appear in the Kafka message. Optional when you specify a VALUE block; GPSS ignores the Kafka message key in this circumstance.
COLUMNS:NAME: column_name

The name of a key or value column. column_name must match the column name of the target Tanzu Greenplum table. Specify __IGNORED__ to omit this Kafka message data element from the load operation.

The default source-to-target data mapping behaviour of GPSS is to match a column name as defined in COLUMNS:NAME with a column name in the target Tanzu Greenplum TABLE. You can override the default mapping by specifying a MAPPING block.
COLUMNS:TYPE: data_type
The data type of the column. You must specify an equivalent data type for each non-ignored Kafka message data element and the associated Tanzu Greenplum table column.
FORMAT: data_format

The format of the Kafka message key or value data. You may specify a FORMAT of avro, binary, csv, custom, delimited, json, or jsonl for the key and value, with some restrictions.

avro
When you specify the avro data format, you must define only a single json type column in COLUMNS. If the Kafka message key or value schema is registered in a Confluent Schema Registry, you must also provide the AVRO_OPTION.
binary
When you specify the binary data format, you must define only a single bytea type column in COLUMNS.
csv
When you specify the csv data format, the message content cannot contain line ending characters (CR and LF).

You must not provide a VALUE block when you specify csv format for a KEY block. Similarly, you must not provide a KEY block when you specify csv format for a VALUE block.

custom
When you specify the custom data format, you must provide a CUSTOM_OPTION.
delimited
When you specify the delimited data format, you must provide a DELIMITED_OPTION.
json
When you specify the json data format, you must define only a single json type column in COLUMNS.
jsonl
When you specify the jsonl data format, you may provide a JSONL_OPTION to define a newline character.
AVRO_OPTION

Optional. When you specify avro as the FORMAT, you may provide AVRO_OPTIONs that identify a schema registry location and optional SSL certificates and keys, and whether or not you want GPSS to convert Avro bytes fields into base64-encoded strings.

SCHEMA_REGISTRY_ADDR: schemareg_host:schemareg_port
When you specify FORMAT: avro and the Avro schema of the JSON data you want to load is registered in the Confluent Schema Registry, you must identify the host name and port number of each Confluent Schema Registry server in your Kafka cluster. You may specify more than one address, and at least one of the addresses must be legal.
SCHEMA_CA_ON_GPDB: sr_ca_file_path
The file system path to the CA certificate that GPSS uses to verify the peer. This file must reside in sr_ca_file_path on all Tanzu Greenplum segment hosts.
SCHEMA_CERT_ON_GPDB: sr_cert_file_path
The file system path to the client certificate that GPSS uses to connect to the HTTPS schema registry. This file must reside in sr_cert_file_path on all Tanzu Greenplum segment hosts.
SCHEMA_KEY_ON_GPDB: sr_key_file_path
The file system path to the private key file that GPSS uses to connect to the HTTPS schema registry. This file must reside in sr_key_file_path on all Tanzu Greenplum segment hosts.
SCHEMA_MIN_TLS_VERSION: minimum_version
The minimum transport layer security (TLS) version that GPSS requests on the connection to the schema registry. Supported versions are 1.0, 1.1, 1.2, or 1.3. The default minimum TLS version is 1.0.
SCHEMA_PATH_ON_GPDB: path_to_file
When you specify the avro format and the Avro schema of the JSON key or value data that you want to load is specified in a separate .avsc file, you must identify the file system location in path_to_file, and the file must reside in this location on every Tanzu Greenplum segment host.
Note

GPSS does not cache the schema. GPSS must reload the schema for every batch of Kafka data. Also, GPSS supports providing the schema for either the key or the value, but not both.

BYTES_TO_BASE64: boolean
When true, GPSS converts Avro bytes fields into base64-encoded strings. The default value is false, GPSS does not perform the conversion.
CSV_OPTION

When you specify FORMAT: csv, you may provide the following options:

DELIMITER: delim_char
Specifies a single ASCII character that separates columns within each message or row of data. The default delimiter is a comma ( ,).
QUOTE: quote_char
Specifies the quotation character. Because GPSS does not provide a default value for this property, you must specify a value.
NULL_STRING: nullstr_val
Specifies the string that represents the null value. Because GPSS does not specify a default value for this property, you must specify a value.
ESCAPE: escape_char
Specifies the single character that is used for escaping data characters in the content that might otherwise be interpreted as row or column delimiters. Make sure to choose an escape character that is not used anywhere in your actual column data. Because GPSS does not provide a default value for this property. you must specify a value.
FORCE_NOT_NULL: columns
Specifies a comma-separated list of column names to process as though each column were quoted and hence not a NULL value. For the default null_string (nothing between two delimiters), missing values are evaluated as zero-length strings.
FILL_MISSING_FIELDS: boolean
Specifies the action of GPSS when it reads a row of data that has missing trailing field values (the row has missing data fields at the end of a line or row). The default value is false, GPSS returns an error when it encounters a row with missing trailing field values.
If set to true, GPSS sets missing trailing field values to NULL. Blank rows, fields with a NOT NULL constraint, and trailing delimiters on a line will still generate an error.
JSONL_OPTION

Optional. When you specify FORMAT: jsonl, you may choose to provide the JSONL_OPTION properties.

NEWLINE: newline_str
A string that specifies the new line character(s) that end each JSON record. The default newline is "\n".
CUSTOM_OPTION

Optional. When you specify FORMAT: custom, you are required to provide the CUSTOM_OPTION properties. This block identifies the name and the arguments of a custom formatter user-defined function.

NAME: udf_name
The name of the custom formatter user-defined function.
PARAMSTR: udf_parameter_string
A string specifying the comma-separated list of arguments to pass to the custom formatter user-defined function.
DELIMITED_OPTION

Optional. When you specify FORMAT: delimited, you may choose to provide the DELIMITER_OPTION properties.

DELIMITER: delimiter_string
When you specify the delimited format, delimiter_string is required and must identify the data element delimiter. delimiter_string may be a multi-byte value, and up to 32 bytes in length. It may not contain quote and escape characters.
EOL_PREFIX: prefix_string
Specifies the prefix before the end of line character ( \n) that indicates the end of a row. The default prefix is empty.
QUOTE: quote_char
Specifies the single ASCII quotation character. The default quote character is empty.
If you do not specify a quotation character, GPSS assumes that all columns are unquoted. If you do not specify a quotation character and do specify an escape character, GPSS assumes that all columns are unquoted and escapes the delimiter, end-of-line prefix, and escape itself.
When you specify a quotation character, you must specify an escape character. GPSS reads any content between quote characters as-is, except for escaped characters.
ESCAPE: escape_char
Specifies the single ASCII character used to escape special characters (for example, the delimiter, end-of-line prefix, quote, or escape itself). Therdefault escape character is empty.
When you specify an escape character and do not specify a quotation character, GPSS escapes only the delimiter, end-of-line prefix, and escape itself.
When you specify both an escape character and a quotation character, GPSS escapes only these characters.
META:

The field name, type, and format of the Kafka meta data. META must specify a single json or jsonb (Greenplum 6 only) type column and FORMAT: json. The available Kafka meta data properties include:

  • topic (text) - the Kafka topic name
  • partition (int) - the partition number
  • offset (bigint) - the record location within the partition
  • timestamp (bigint) - the time that the message was appended to the Kafka log

You can load any of these properties into the target table with a MAPPING, or use a property in the update or merge criteria for a load operation.

TRANSFORMER:

Input data transform block. An input data transformer is a plugin, a set of go functions that transform the data after it is read from the source. The semantics of the transform are function-specific. You specify the library and function names in this block, as well as the properties that GPSS passes to these functions:

PATH: path_to_plugin_transform_library
The file system location of the plugin transformer library on the Tanzu Greenplum streaming server server host.
ON_INIT: plugin_transform_init_name
The name of an initialization function that GPSS calls when it loads the transform library.
TRANSFORM: plugin_transform_name
The name of the transform function. GPSS invokes this function for every message it reads.
PROPERTIES: plugin_transform_property_name: property_value
One or more property name and value pairs that GPSS passes to plugin_transform_init_name and plugin_transform_name.
FILTER: filter_string
The filter to apply to the Kafka input messages before GPSS loads the data into Tanzu Greenplum. If the filter evaluates to true, GPSS loads the message. If the filter evaluates to false, the message is dropped. filter_string must be a valid SQL conditional expression and may reference one or more KEY, VALUE, or META column names.
ENCODING: char_set
The source data encoding. You can specify an encoding character set when the source data is of the csv, custom, delimited, or json format. GPSS supports the character sets identified in Character Set Support in the Tanzu Greenplum documentation.
ERROR_LIMIT: { num_errors | percentage_errors }
The error threshold, specified as either an absolute number or a percentage. gpkafka load exits when this limit is reached. The default ERROR_LIMIT is zero; GPSS deactivates error logging and stops the load operation when it encounters the first error. Due to a limitation of the Tanzu Greenplum external table framework, GPSS does not accept ERROR_LIMIT: 1.

KAFKA:OUTPUT: Options

Note

You must specify only one of the OUTPUT or OUTPUTS blocks. You cannot specify both.

SCHEMA: output_schema_name
The name of the Tanzu Greenplum schema in which table_name resides. Optional, the default schema is the public schema.
TABLE: table_name
The name of the Tanzu Greenplum table into which GPSS loads the Kafka data.
FILTER: output_filter_string
The filter to apply to the output data before GPSS loads the data into Tanzu Greenplum. If the filter evaluates to true, GPSS loads the message. If the filter evaluates to false, the message is dropped. output_filter_string must be a valid SQL conditional expression and may reference one or more META or VALUE column names.
MODE: mode

The table load mode. Valid mode values are INSERT, MERGE, or UPDATE. The default value is INSERT.

UPDATE - Updates the target table columns that are listed in UPDATE_COLUMNS when the input columns identified in MATCH_COLUMNS match the named target table columns and the optional UPDATE_CONDITION is true.

UPDATE is not supported if the target table column name is a reserved keyword, has capital letters, or includes any character that requires quotes (" ") to identify the column.

MERGE - Inserts new rows and updates existing rows when:

  • columns are listed in UPDATE_COLUMNS,
  • the MATCH_COLUMNS target table column values are equal to the input data, and
  • an optional UPDATE_CONDITION is specified and met.

Deletes rows when:

  • the MATCH_COLUMNS target table column values are equal to the input data, and
  • an optional DELETE_CONDITION is specified and met.

New rows are identified when the MATCH_COLUMNS value in the source data does not have a corresponding value in the existing data of the target table. In those cases, the entire row from the source file is inserted, not only the MATCH_COLUMNS and UPDATE_COLUMNS. If there are multiple new MATCH_COLUMNS values in the input data that are the same, GPSS inserts or updates the target table using a random matching input row. When you specify ORDER_COLUMNS, GPSS sorts the input data on the specified column(s) and inserts or updates from the input row with the largest value.

MERGE is not supported if the target table column name is a reserved keyword, has capital letters, or includes any character that requires quotes (" ") to identify the column.
MATCH_COLUMNS:

Required if MODE is MERGE or UPDATE.

match_column_name
Specifies the column(s) to use as the join condition for the update. The attribute value in the specified target column(s) must be equal to that of the corresponding source data column(s) in order for the row to be updated in the target table.
ORDER_COLUMNS:

Optional. May be specified in MERGE MODE to sort the input data rows.

order_column_name
Specify the column(s) by which GPSS sorts the rows. When multiple matching rows exist in a batch, ORDER_COLUMNS is used with MATCH_COLUMNS to determine the input row with the largest value; GPSS uses that row to write/update the target.
UPDATE_COLUMNS:

Required if MODE is MERGE or UPDATE.

update_column_name
Specifies the column(s) to update for the rows that meet the MATCH_COLUMNS criteria and the optional UPDATE_CONDITION.
UPDATE_CONDITION: update_condition
Optional. Specifies a boolean condition, similar to that which you would declare in a WHERE clause, that must be met in order for a row in the target table to be updated (or inserted, in the case of a MERGE).
DELETE_CONDITION: delete_condition
Optional. In MERGE MODE, specifies a boolean condition, similar to that which you would declare in a WHERE clause, that must be met for GPSS to delete rows in the target table that meet the MATCH_COLUMNS criteria.
TRANSFORMER:

Optional. Output data transform block. An output data transformer is a user-defined function (UDF) that transforms the data before it is loaded into Tanzu Greenplum. The semantics of the UDF are transform-specific.

Note

GPSS currently supports specifying only one of the MAPPING or (UDF) TRANSFORMER blocks in the load configuration file, not both.

TRANSFORM: udf_transform_udf_name
The name of the output transform UDF. GPSS invokes this function for every batch of data it writes to Tanzu Greenplum.
PROPERTIES: udf_transform_property_name: property_value
One or more property name and value pairs that GPSS passes to udf_transform_udf_name.
COLUMNS: udf_transform_column_name
The name of one or more columns involved in the transform.
MAPPING:

Optional. Overrides the default source-to-target column mapping. GPSS supports two mapping syntaxes.

Note

GPSS currently supports specifying only one of the MAPPING or (UDF) TRANSFORMER blocks in the load configuration file, not both.

Note

When you specify a MAPPING, ensure that you provide a mapping for all Kafka message key and value elements of interest. GPSS does not automatically match column names when you provide a MAPPING.

NAME: target_column_name
Specifies the target Tanzu Greenplum table column name.
EXPRESSION: { source_column_name | expression }
Specifies a Kafka COLUMNS:NAME (source_column_name) or an expression. When you specify an expression, you may provide a value expression that you would specify in the SELECT list of a query, such as a constant value, a column reference, an operator invocation, a built-in or user-defined function call, and so on.
target_column_name: { source_column_name | expression }
When you use this MAPPING syntax, specify the target_column_name and {source_column_name | expression} as described above.

KAFKA:OUTPUTS: Options

Note

You must specify only one of the OUTPUT or OUTPUTS blocks. You cannot specify both.

TABLE: table_name
The name of a Tanzu Greenplum table into which GPSS loads the Kafka data.
other options
As specified in the KAFKA:OUTPUT: Options section.

KAFKA:METADATA: Options

SCHEMA: metadata_schema_name
The name of the Tanzu Greenplum schema in which GPSS creates external and history tables. The default metadata_schema_name is KAFKA:OUTPUT:SCHEMA.

Tanzu Greenplum COMMIT: Options

COMMIT:

Controls how gpkafka load commits a batch of data to Tanzu Greenplum. You may specify both MAX_ROW and MINIMAL_INTERVAL as long as both values are not zero (0). Try setting and tuning MINIMAL_INTERVAL to your environment; introduce a MAX_ROW setting only if you encounter high memory usage associated with message buffering.

SAVE_FAILING_BATCH: boolean
Determines whether or not GPSS saves data into a backup table before it writes the data to Tanzu Greenplum. Saving the data in this manner aids recovery when GPSS encounters errors during the evaluation of expressions. The default is false; GPSS does not use a backup table, and returns immediately when it encounters an expression error. When you set this property to true, GPSS writes both the good and the bad data in the batch to a backup table named gpssbackup_<jobhash>, and continues to process incoming Kafka messages. You must then manually load the good data from the backup table into Greenplum or set RECOVER_FAILING_BATCH (Beta) to true to have GPSS automatically reload the good data.
Note

Using a backup table to hedge against mapping errors may impact performance, especially when the data that you are loading has not been cleaned.

RECOVER_FAILING_BATCH: boolean (Beta)
When set to true and SAVE_FAILING_BATCH is also true, GPSS automatically reloads the good data in the batch and retains only the error data in the backup table. The default value is false; GPSS does not process the backup table.
Note

Enabling this property requires that GPSS has the Tanzu Greenplum privileges to create a function.

MAX_ROW: number_of_rows
The number of rows to batch before triggering an INSERT operation on the Tanzu Greenplum table. The default value of MAX_ROW is 0, which instructs GPSS to ignore this commit trigger condition.
MINIMAL_INTERVAL: wait_time
The minimum amount of time to wait (milliseconds) between each INSERT operation on the table. The default value is 5000.
CONSISTENCY: { strong | at-least | at-most | none }
Specify how GPSS should manage message offsets when it acts as a high-level consumer. Valid values are strong, at-least, at-most, and none. The default value is strong. Refer to Understanding Kafka Message Offset Management for more detailed information.
IDLE_DURATION: idle_time

The maximum amount of time to wait (milliseconds) for the first batch of Kafka data. When you use this property to enable lazy load, GPSS waits until Kafka data is available before locking the target Greenplum table. You can specify:

  • 0 (lazy load is deactivates)
  • -1 (lazy load is activated, the job never stops), or
  • a positive value (lazy load is activated, the job stops after idle_time duration of no data in the Kafka topic) The default value is 0.

Kafka POLL: Options

Note

The POLL properties are deprecated and ignored by GPSS.

POLL:

Controls the polling time period and batch size when reading Kafka data.

BATCHSIZE: num_records
The number of Kafka records in a batch. BATCHSIZE should be smaller than COMMIT:MAX_ROW. The default batch size is 200.
TIMEOUT: poll_time
The maximum time, in milliseconds, to wait in a polling cycle if Kafka data is not available. You must specify a TIMEOUT greater than 100 milliseconds and less than COMMIT:MINIMAL_INTERVAL. The default poll timeout is 1000 milliseconds.

Tanzu Greenplum TASK: Options

TASK:

Controls the execution and scheduling of a periodic (maintenance) task.

POST_BATCH_SQL: udf_or_sql_to_run
The user-defined function or SQL command(s) that you want to run after the specified number of batches are read from Kafka. The default is null.
BATCH_INTERVAL: num_batches
The number of batches to read before running udf_or_sql_to_run. The default batch interval is 0.
PREPARE_SQL: udf_or_sql_to_run
The user-defined function or SQL command(s) that you want GPSS to run before it executes the job. The default is null, no command to run.
TEARDOWN_SQL: udf_or_sql_to_run
The user-defined function or SQL command(s) that you want GPSS to run after the job stops. GPSS runs the function or command(s) on job success and job failure. The default is null, no command to run.

Kafka PROPERTIES: Options

PROPERTIES:

Kafka consumer configuration property names and values.

kafka_property_name
The name of a Kafka property.
kafka_property_value
The Kafka property value.

Job SCHEDULE: Options

SCHEDULE:

Controls the frequency and interval of restarting jobs.

RETRY_INTERVAL: retry_time
The period of time that GPSS waits before retrying a failed job. You can specify the time interval in day ( d), hour ( h), minute ( m), second ( s), or millisecond ( ms) integer units; do not mix units. The default retry interval is 5m (5 minutes).
MAX_RETRIES: num_retries
The maximum number of times GPSS attempts to retry a failed job. The default is 0, do not retry. If you specify a negative value, GPSS retries the job indefinitely.
RUNNING_DURATION: run_time
The amount of time after which GPSS automatically stops a job. GPSS does not automatically stop a job by default.
AUTO_STOP_RESTART_INTERVAL: restart_time
The amount of time after which GPSS restarts a job that it stopped due to reaching RUNNING_DURATION.
MAX_RESTART_TIMES: num_restarts
The maximum number of times that GPSS restarts a job that it stopped due to reaching RUNNING_DURATION. The default is 0, do not restart the job. If you specify the value -1, GPSS restarts the job indefinitely. You may use gpsscli stop to stop the jobs from being restarted indefinitely.
QUIT_AT_EOF_AFTER: clock_time
The clock time after which GPSS stops a job every day when it encounters an EOF. By default, GPSS does not automatically stop a job that reaches EOF. GPSS never stops a job when the current time is before clock_time, even when GPSS encounters an EOF.
Job ALERT: Options

Controls notification when a job is stopped for any reason (success, completion, error, user-initiated stop).

COMMAND: command_to_run
The program that the GPSS server runs on the GPSS server host, including arguments. The command must be executable by GPSS.
command_to_run has access to job-related environment variables that GPSS sets, including: $GPSSJOB_NAME, $GPSSJOB_STATUS, and $GPSSJOB_DETAIL.
WORKDIR: directory
The working directory for command_to_run. The default working directory is the directory from which you started the GPSS server process. If you specify a relative path, it is relative to the directory from which you started the GPSS server process.
TIMEOUT: alert_time
The amount of time after a job stops, prompting GPSS to trigger the alert (and run command_to_run). You can specify the time interval in day ( d), hour ( h), minute ( m), or second ( s) integer units; do not mix units. The default alert timeout is -1s (no timeout).

Template Variables

GPSS supports using template variables to specify property values in the load configuration file.

You specify a template variable value in the load configuration file as follows:

<PROPERTY>: {{<template_var>}}

For example:

MAX_RETRIES: {{numretries}}

GPSS substitutes the template variable with a value that you specify via the -p | --property <template_var=value> option to the gpsscli dryrun, gpsscli submit, gpsscli load, or gpkafka load command.

For example, if the command line specifies:

--property numretries=10

GPSS substitutes occurrences of {{numretries}} in the load configuration file with the value 10 before submitting the job, and uses that value while the job is running.

Notes

If you created a database object name using a double-quoted identifier (delimited identifier), you must specify the delimited name within single quotes in the gpkafka.yaml configuration file. For example, if you create a table as follows:

CREATE TABLE "MyTable" ("MyColumn" text);

Your gpkafka.yaml YAML configuration file would refer to the above table and column names as:

  COLUMNS:
     - name: '"MyColumn"'
       type: text
OUTPUT:
   TABLE: '"MyTable"'

GPSS requires Kafka version 0.11 or newer for exactly-once delivery assurance. You can run with an older version of Kafka (but lose the exactly-once guarantee) by adding the following PROPERTIES block to your gpkafka-v2.yaml load configuration file:

PROPERTIES:
      api.version.request: false
      broker.version.fallback: 0.8.2.1

You can specify backslash escape sequences in the CSV DELIMITER, QUOTE, and ESCAPE options. GPSS supports the standard backslash escape sequences for backspace, form feed, newline, carriage return, and tab, as well as escape sequences that you specify in hexadecimal format (prefaced with \x). Refer to Backslash Escape Sequences in the PostgreSQL documentation for more information.

Examples

Load data from Kafka as defined in the Version 2 configuration file named kafka2greenplumv2.yaml:

gpkafka load kafka2greenplumv2.yaml

Example kafka2greenplumv2.yaml configuration file:

DATABASE: ops
USER: gpadmin
HOST: mdw-1
PORT: 5432
VERSION: 2
KAFKA:
   INPUT:
      SOURCE:
         BROKERS: kbrokerhost1:9092
         TOPIC: customer_expenses2
         PARTITIONS: (2, 5...7, 13)
      VALUE:
         COLUMNS:
           - NAME: c1
             TYPE: json
         FORMAT: avro
         AVRO_OPTION:
           SCHEMA_REGISTRY_ADDR: http://localhost:8081
      KEY:
         COLUMNS:
           - NAME: key
             TYPE: json
         FORMAT: avro
         AVRO_OPTION:
           SCHEMA_REGISTRY_ADDR: http://localhost:8081
      META:
         COLUMNS:
           - NAME: meta
             TYPE: json
         FORMAT: json
      ERROR_LIMIT: 25
   OUTPUT:
      SCHEMA: payables
      TABLE: expenses2
      MAPPING:
        - NAME: customer_id
          EXPRESSION: (c1->>'cust_id')::int
        - NAME: newcust
          EXPRESSION: ((c1->>'cust_id')::int > 5000000)::boolean
        - NAME: expenses
          EXPRESSION: (c1->>'expenses')::decimal
        - NAME: tax_due
          EXPRESSION: ((c1->>'expenses')::decimal * .075)::decimal
        - NAME: t
          EXPRESSION: (meta->>'topic')::text
   METADATA:
      SCHEMA: gpkafka_internal
   COMMIT:
      MINIMAL_INTERVAL: 2000

See Also

Loading Avro Data from Kafka, gpkafka.yaml, gpkafka load, gpsscli load, gpsscli submit

check-circle-line exclamation-circle-line close-line
Scroll to top icon