gpkafka configuration file (version 1).

Synopsis

DATABASE: <db_name>
USER: <user_name>
PASSWORD: <password>
HOST: <host>
PORT: <greenplum_port>
[VERSION: 1]
KAFKA:
   INPUT:
      SOURCE:
        BROKERS: <kafka_broker_host:broker_port> [, ... ]
        TOPIC: <kafka_topic>
      [COLUMNS:
         - NAME: { <column_name> | __IGNORED__ }
           TYPE: <column_data_type>
         [ ... ]]
      FORMAT: <data_format>
      [[DELIMITED_OPTION:
         DELIMITER: <delimiter_string>] |
      [AVRO_OPTION:
         [SCHEMA_REGISTRY_ADDR: <http://schemareg_host:schemareg_port> [, ... ]] 
         [BYTES_TO_BASE64: <boolean>]] |
      [CUSTOM_OPTION:
         NAME: <udf_name>
         PARAMSTR: <udf_parameter_string>]]
      [FILTER: <filter_string>]
      [ERROR_LIMIT: { <num_errors> | <percentage_errors> }]
   OUTPUT:
      [SCHEMA: <output_schema_name>]
      TABLE: <table_name>
      [MODE: <mode>]
      [MATCH_COLUMNS:
         - <match_column_name>
         [ ... ]]
      [ORDER_COLUMNS:
         - <order_column_name>
         [ ... ]]
      [UPDATE_COLUMNS:
         - <update_column_name>
         [ ... ]]
      [UPDATE_CONDITION: <update_condition>]
      [DELETE_CONDITION: <delete_condition>]
      [MAPPING:
         - NAME: <target_column_name>
           EXPRESSION: { <source_column_name> | <expression> }
         [ ... ]
           |
         <target_column_name> : { <source_column_name> | <expression> }
         [ ... ] ]
   [METADATA:
      [SCHEMA: <metadata_schema_name>]]
   COMMIT:
      MAX_ROW: <num_rows>
      MINIMAL_INTERVAL: <wait_time>
   [POLL:
      BATCHSIZE: <num_records>
      TIMEOUT: <poll_time>]
   [TASK:
      POST_BATCH_SQL: <udf_or_sql_to_run>
      BATCH_INTERVAL: <num_batches>]
    [PROPERTIES:
      <kafka_property_name>: <kafka_property_value>
      [ ... ]]

Description

Note

The gpkafka.yaml Version 1 configuration file format is deprecated and may be removed in a future release. Use the version 2 or version 3 (Beta) configuration file format to configure a Kafka load job.

You specify load configuration parameters for the gpkafka utilities in a YAML-formatted configuration file. (This reference page uses the name gpkafka.yaml when referring to this file; you may choose your own name for the file.) Load parameters include Greenplum Database connection and target table information, Kafka broker and topic information, and error and commit thresholds.

Note

Version 1 of the gpkafka.yaml configuration file syntax does not support KEY and VALUE blocks.

The gpkafka utility processes the YAML configuration file in order, using indentation (spaces) to determine the document hierarchy and the relationships between the sections. The use of white space in the file is significant, and keywords are case-sensitive.

Keywords and Values

Greenplum Database Connection Options

DATABASE: db_name
The name of the Greenplum database.
USER: user_name
The name of the Greenplum Database user/role. This user_name must have permissions as described in the Greenplum Streaming Server documentation.
PASSWORD: password
The password for the Greenplum Database user/role.
HOST: host
The host name or IP address of the Greenplum Database coordinator host.
PORT: greenplum_port
The port number of the Greenplum Database server on the coordinator host.
VERSION: 1
Optional. The version of the load configuration file. The default version is Version 1.

KAFKA:INPUT: Options

SOURCE

Kafka input configuration parameters.

BROKERS: kafka_broker_host:broker_port
The host and port identifying the Kafka broker.
TOPIC: kafka_topic
The name of the Kafka topic from which to load data. The topic must exist.
COLUMNS:

The column names and data types. You must specify all Kafka data elements in the order in which they appear in the Kafka message. Optional when the column names and types match the target Greenplum Database table definition.

The default source-to-target data mapping behaviour of GPSS is to match a column name as defined in COLUMNS:NAME with a column name in the target Greenplum Database TABLE. You can override the default mapping by specifying a MAPPING block.

NAME: column_name
The name of a column. column_name must match the column name of the target Greenplum Database table. Specify __IGNORED__ to omit this Kafka message data element from the load operation.
TYPE: data_type
The data type of the column. You must specify an equivalent data type for each non-ignored Kafka message data element and the associated Greenplum Database table column.
FORMAT: data_format

The format of the Kafka message value data. You may specify a FORMAT of avro, binary, csv, custom, delimited, or json.

avro
When you specify the avro data format, you must define only a single json type column in COLUMNS. If the Kafka message value schema is registered in a Confluent Schema Registry, you must also provide the AVRO_OPTION.
binary
When you specify the binary data format, you must define only a single bytea type column in COLUMNS.
csv
When you specify the csv data format, the message content cannot contain line ending characters (CR and LF).
custom
When you specify the custom data format, you must provide a CUSTOM_OPTION.
delimited
When you specify the delimited data format, you must provide a DELIMITED_OPTION.
json
When you specify the json data format, you must define only a single json type column in COLUMNS.
AVRO_OPTION

Optional. When you specify avro as the FORMAT, you may provide AVRO_OPTIONs that identify a schema registry location and whether or not you want GPSS to convert Avro bytes fields into base64-encoded strings.

SCHEMA_REGISTRY_ADDR: http://schemareg_host:schemareg_port
Optional. When you specify avro as the FORMAT and the Avro schema of the JSON data you want to load is registered in the Confluent Schema Registry, you must identify the host name and port number of each Confluent Schema Registry server in your Kafka cluster. You may specify more than one address, and at least one of the addresses must be legal.
BYTES_TO_BASE64: boolean
When true, GPSS converts Avro bytes fields into base64-encoded strings. The default value is false, GPSS does not perform the conversion.
CUSTOM_OPTION

Optional. When you specify custom as the FORMAT, CUSTOM_OPTION is required. This block identifies the name and the arguments of a custom formatter user-defined function.

NAME: udf_name
The name of the custom formatter user-defined function.
PARAMSTR: udf_parameter_string
A string specifying the comma-separated list of arguments to pass to the custom formatter user-defined function.
DELIMITED_OPTION:DELIMITER: delimiter_string
Optional. When you specify delimited as the FORMAT, delimiter_string is required and must identify the Kafka message data element delimiter. delimiter_string may be a multi-byte value, and up to 32 bytes in length. It may not contain quote and escape characters.
FILTER: filter_string
The filter to apply to the Kafka input messages before GPSS loads the data into Greenplum Database. If the filter evaluates to true, GPSS loads the message. If the filter evaluates to false, the message is dropped. filter_string must be a valid SQL conditional expression and may reference one or more COLUMNS names.
ERROR_LIMIT: { num_errors | percentage_errors }
The error threshold, specified as either an absolute number or a percentage. gpkafka load exits when this limit is reached. The default ERROR_LIMIT is zero; GPSS deactivates error logging, and stops the load operation when it encounters the first error. Due to a limitation of the Greenplum Database external table framework, GPSS does not accept ERROR_LIMIT: 1.

KAFKA:OUTPUT: Options

SCHEMA: output_schema_name
The name of the Greenplum Database schema in which table_name resides. Optional, the default schema is the public schema.
TABLE: table_name
The name of the Greenplum Database table into which GPSS loads the Kafka data.
MODE: mode

The table load mode. Valid mode values are INSERT, MERGE, or UPDATE. The default value is INSERT.

UPDATE - Updates the target table columns that are listed in UPDATE_COLUMNS when the input columns identified in MATCH_COLUMNS match the named target table columns and the optional UPDATE_CONDITION is true.

UPDATE is not supported if the target table column name is a reserved keyword, has capital letters, or includes any character that requires quotes (" ") to identify the column.

MERGE - Inserts new rows and updates existing rows when:

  • columns are listed in UPDATE_COLUMNS,
  • the MATCH_COLUMNS target table column values are equal to the input data, and
  • an optional UPDATE_CONDITION is specified and met.

Deletes rows when:

  • the MATCH_COLUMNS target table column values are equal to the input data, and
  • an optional DELETE_CONDITION is specified and met.

New rows are identified when the MATCH_COLUMNS value in the source data does not have a corresponding value in the existing data of the target table. In those cases, the entire row from the source file is inserted, not only the MATCH_COLUMNS and UPDATE_COLUMNS. If there are multiple new MATCH_COLUMNS values in the input data that are the same, GPSS inserts or updates the target table using a random matching input row. When you specify ORDER_COLUMNS, GPSS sorts the input data on the specified column(s) and inserts or updates from the input row with the largest value.

MERGE is not supported if the target table column name is a reserved keyword, has capital letters, or includes any character that requires quotes (" ") to identify the column.
MATCH_COLUMNS:

Required if MODE is MERGE or UPDATE.

match_column_name
Specifies the column(s) to use as the join condition for the update. The attribute value in the specified target column(s) must be equal to that of the corresponding source data column(s) in order for the row to be updated in the target table.
ORDER_COLUMNS:

Optional. May be specified in MERGE MODE to sort the input data rows.

order_column_name
Specify the column(s) by which GPSS sorts the rows. When multiple matching rows exist in a batch, ORDER_COLUMNS is used with MATCH_COLUMNS to determine the input row with the largest value; GPSS uses that row to write/update the target.
UPDATE_COLUMNS:

Required if MODE is MERGE or UPDATE.

update_column_name
Specifies the column(s) to update for the rows that meet the MATCH_COLUMNS criteria and the optional UPDATE_CONDITION.
UPDATE_CONDITION: update_condition
Optional. Specifies a boolean condition, similar to that which you would declare in a WHERE clause, that must be met in order for a row in the target table to be updated (or inserted, in the case of a MERGE).
DELETE_CONDITION: delete_condition
Optional. In MERGE MODE, specifies a boolean condition, similar to that which you would declare in a WHERE clause, that must be met for GPSS to delete rows in the target table that meet the MATCH_COLUMNS criteria.
MAPPING:

Optional. Overrides the default source-to-target column mapping. GPSS supports two mapping syntaxes.

Note

When you specify a MAPPING, ensure that you provide a mapping for all Kafka data elements of interest. GPSS does not automatically match column names when you provide a MAPPING.

NAME: target_column_name
Specifies the target Greenplum Database table column name.
EXPRESSION: { source_column_name | expression }
Specifies a Kafka COLUMNS:NAME (source_column_name) or an expression. When you specify an expression, you may provide a value expression that you would specify in the SELECT list of a query, such as a constant value, a column reference, an operator invocation, a built-in or user-defined function call, and so on.
target_column_name: { source_column_name | expression }
When you use this MAPPING syntax, specify the target_column_name and {source_column_name | expression} as described above.

KAFKA:METADATA: Options

SCHEMA: metadata_schema_name
The name of the Greenplum Database schema in which GPSS creates external and history tables. The default metadata_schema_name is KAFKA:OUTPUT:SCHEMA.

Greenplum Database COMMIT: Options

COMMIT:

Controls how gpkafka load commits data to Greenplum Database. You must specify one of MAX_ROW or MINIMAL_INTERVAL. You may specify both configuration parameters as long as both values are not zero (0). Try setting and tuning MINIMAL_INTERVAL to your environment; introduce a MAX_ROW setting only if you encounter high memory usage associated with message buffering.

MAX_ROW: number_of_rows
The number of rows to batch before triggering an INSERT operation on the Greenplum Database table. The default value of MAX_ROW is 0, which instructs GPSS to ignore this commit trigger condition.
MINIMAL_INTERVAL: wait_time
The minimum amount of time to wait (milliseconds) between each INSERT operation on the table. The default value is 0, wait forever.

Kafka POLL: Options

Note

The POLL properties are deprecated and ignored by GPSS.

POLL:

Controls the polling time period and batch size when reading Kafka data.

BATCHSIZE: num_records
The number of Kafka records in a batch. BATCHSIZE must be smaller than COMMIT:MAX_ROW. The default batch size is 200.
TIMEOUT: poll_time
The maximum time, in milliseconds, to wait in a polling cycle if Kafka data is not available. You must specify a TIMEOUT greater than 100 milliseconds and less than COMMIT:MINIMAL_INTERVAL. The default poll timeout is 1000 milliseconds.

Greenplum Database TASK: Options

TASK:

Controls the execution and scheduling of a periodic (maintenance) task.

POST_BATCH_SQL: udf_or_sql_to_run
The user-defined function or SQL command(s) that you want to run after the specified number of batches are read from Kafka. The default is null.
BATCH_INTERVAL: num_batches
The number of batches to read before running udf_or_sql_to_run. The default batch interval is 0.

Kafka PROPERTIES: Options

PROPERTIES:

Kafka consumer configuration property names and values.

kafka_property_name
The name of a Kafka property.
kafka_property_value
The Kafka property value.

Notes

If you created a database object name using a double-quoted identifier (delimited identifier), you must specify the delimited name within single quotes in the gpkafka.yaml configuration file. For example, if you create a table as follows:

CREATE TABLE "MyTable" ("MyColumn" text);

Your gpkafka.yaml YAML configuration file would refer to the above table and column names as:

  COLUMNS:
     - name: '"MyColumn"'
       type: text
OUTPUT:
   TABLE: '"MyTable"'

GPSS requires Kafka version 0.11 or newer for exactly-once delivery assurance. You can run with an older version of Kafka (but lose the exactly-once guarantee) by adding the following PROPERTIES block to your gpkafka.yaml load configuration file:

PROPERTIES:
      api.version.request: false
      broker.version.fallback: 0.8.2.1

Examples

Load data from Kafka as defined in the Version 1 configuration file named kafka2greenplum.yaml:

gpkafka load kafka2greenplum.yaml

Example kafka2greenplum.yaml configuration file:

DATABASE: ops
USER: gpadmin
HOST: mdw-1
PORT: 5432
KAFKA:
   INPUT:
      SOURCE:
         BROKERS: kbrokerhost1:9092
         TOPIC: customer_expenses
      COLUMNS:
         - NAME: cust_id
           TYPE: int
         - NAME: month
           TYPE: int
         - NAME: expenses
           TYPE: decimal(9,2)
      FORMAT: delimited
      DELIMITED_OPTION:
         DELIMITER: '|'
      ERROR_LIMIT: 25
   OUTPUT:
      SCHEMA: payables
      TABLE: expenses
      MAPPING:
        - NAME: customer_id 
          EXPRESSION: cust_id
        - NAME: newcust
          EXPRESSION: cust_id > 5000000
        - NAME: expenses 
          EXPRESSION: expenses
        - NAME: tax_due 
          EXPRESSION: expenses * .0725
   METADATA:
      SCHEMA: gpkafka_internal
   COMMIT:
      MINIMAL_INTERVAL: 2000

See Also

gpkafka-v2.yaml, gpkafka load, gpss, gpss.json

check-circle-line exclamation-circle-line close-line
Scroll to top icon