gpkafka configuration file (version 2).
DATABASE: <db_name>
USER: <user_name>
PASSWORD: <password>
HOST: <host>
PORT: <greenplum_port>
VERSION: 2
KAFKA:
INPUT:
SOURCE:
BROKERS: <kafka_broker_host:broker_port> [, ... ]
TOPIC: <kafka_topic>
[PARTITIONS: (<partition_numbers>)]
[FALLBACK_OFFSET: { earliest | latest }]
[VALUE:
COLUMNS:
- NAME: { <column_name> | __IGNORED__ }
TYPE: <column_data_type>
[ ... ]
FORMAT: <value_data_format>
[[DELIMITED_OPTION:
DELIMITER: <delimiter_string>
[EOL_PREFIX: <prefix_string>]
[QUOTE: <quote_char>]
[ESCAPE: <escape_char>] ] |
[AVRO_OPTION:
[SCHEMA_REGISTRY_ADDR: <http://schemareg_host:schemareg_port> [, ... ]]
[SCHEMA_CA_ON_GPDB: <sr_ca_file_path>]
[SCHEMA_CERT_ON_GPDB: <sr_cert_file_path>]
[SCHEMA_KEY_ON_GPDB: <sr_key_file_path>]
[SCHEMA_MIN_TLS_VERSION: <minimum_version>]
[SCHEMA_PATH_ON_GPDB: <path_to_file>]r
[BYTES_TO_BASE64: <boolean>]] |
[CSV_OPTION:
[DELIMITER: <delim_char>]
[QUOTE: <quote_char>]
[NULL_STRING: <nullstr_val>]
[ESCAPE: <escape_char>]
[FORCE_NOT_NULL: <columns>]
[FILL_MISSING_FIELDS: <boolean>]] |
[JSONL_OPTION:
[NEWLINE: <newline_str>]] |
[CUSTOM_OPTION:
NAME: <udf_name>
PARAMSTR: <udf_parameter_string>]]
[KEY:
COLUMNS:
- NAME: { <column_name> | __IGNORED__ }
TYPE: <column_data_type>
[ ... ]
FORMAT: <key_data_format>
[[DELIMITED_OPTION:
DELIMITER: <delimiter_string> |
[EOL_PREFIX: <prefix_string>]
[QUOTE: <quote_char>]
[ESCAPE: <escape_char>] ] |
[AVRO_OPTION:
[SCHEMA_REGISTRY_ADDR: <http://schemareg_host:schemareg_port> [, ... ]]
[SCHEMA_CA_ON_GPDB: <sr_ca_file_path>]
[SCHEMA_CERT_ON_GPDB: <sr_cert_file_path>]
[SCHEMA_KEY_ON_GPDB: <sr_key_file_path>]
[SCHEMA_MIN_TLS_VERSION: <minimum_version>]
[SCHEMA_PATH_ON_GPDB: <path_to_file>]
[BYTES_TO_BASE64: <boolean>]] |
[CSV_OPTION:
[DELIMITER: <delim_char>]
[QUOTE: <quote_char>]
[NULL_STRING: <nullstr_val>]
[ESCAPE: <escape_char>]
[FORCE_NOT_NULL: <columns>]
[FILL_MISSING_FIELDS: <boolean>] |
[CUSTOM_OPTION:
NAME: <udf_name>
PARAMSTR: <udf_parameter_string>]]
[META:
COLUMNS:
- NAME: <meta_column_name>
TYPE: { json | jsonb }
FORMAT: json]
[TRANSFORMER:
PATH: <path_to_plugin_transform_library>
ON_INIT: <plugin_transform_init_name>
TRANSFORM: <plugin_transform_name>
PROPERTIES:
<plugin_transform_property_name>: <property_value>
[ ... ] ]
[FILTER: <filter_string>]
[ENCODING: <char_set>]
[ERROR_LIMIT: { <num_errors> | <percentage_errors> }]
{ OUTPUT:
[SCHEMA: <output_schema_name>]
TABLE: <table_name>
[FILTER: <output_filter_string>]
[MODE: <mode>]
[MATCH_COLUMNS:
- <match_column_name>
[ ... ]]
[ORDER_COLUMNS:
- <order_column_name>
[ ... ]]
[UPDATE_COLUMNS:
- <update_column_name>
[ ... ]]
[UPDATE_CONDITION: <update_condition>]
[DELETE_CONDITION: <delete_condition>]
[TRANSFORMER:
TRANSFORM: <udf_transform_udf_name>
PROPERTIES:
<udf_transform_property_name>: <property_value>
[ ... ]
COLUMNS:
- <udf_transform_column_name>
[ ... ] ]
[MAPPING:
- NAME: <target_column_name>
EXPRESSION: { <source_column_name> | <expression> }
[ ... ]
|
<target_column_name> : { <source_column_name> | <expression> }
[ ... ] ] |
OUTPUTS:
- TABLE: <table_name>
[MODE: <mode>]
[MATCH_COLUMNS:
- <match_column_name>
[ ... ]]
[ORDER_COLUMNS:
- <order_column_name>
[ ... ]]
[UPDATE_COLUMNS:
- <update_column_name>
[ ... ]]
[UPDATE_CONDITION: <update_condition>]
[DELETE_CONDITION: <delete_condition>]
[TRANSFORMER:
TRANSFORM: <udf_transform_udf_name>
PROPERTIES:
<udf_transform_property_name>: <property_value>
[ ... ]
COLUMNS:
- <udf_transform_column_name>
[ ... ] ]
[MAPPING:
- NAME: <target_column_name>
EXPRESSION: { <source_column_name> | <expression> }
[ ... ]
|
<target_column_name> : { <source_column_name> | <expression> }
[ ... ] ]
[...] }
[METADATA:
[SCHEMA: <metadata_schema_name>]]
COMMIT:
SAVE_FAILING_BATCH: <boolean>
RECOVER_FAILING_BATCH: <boolean> (Beta)
MAX_ROW: <num_rows>
MINIMAL_INTERVAL: <wait_time>
CONSISTENCY: { strong | at-least | at-most | none }
IDLE_DURATION: <idle_time>
[POLL:
BATCHSIZE: <num_records>
TIMEOUT: <poll_time>]
[TASK:
POST_BATCH_SQL: <udf_or_sql_to_run>
BATCH_INTERVAL: <num_batches>
PREPARE_SQL: <udf_or_sql_to_run>
TEARDOWN_SQL: <udf_or_sql_to_run> ]
[PROPERTIES:
<kafka_property_name>: <kafka_property_value>
[ ... ]]
[SCHEDULE:
RETRY_INTERVAL: <retry_time>
MAX_RETRIES: <num_retries>
RUNNING_DURATION: <run_time>
AUTO_STOP_RESTART_INTERVAL: <restart_time>
MAX_RESTART_TIMES: <num_restarts>
QUIT_AT_EOF_AFTER: <clock_time>]
[ALERT:
COMMAND: <command_to_run>
WORKDIR: <directory>
TIMEOUT: <alert_time>]
Where you may specify any property value with a template variable that GPSS substitutes at runtime using the following syntax:
<PROPERTY:> {{<template_var>}}
You specify load configuration parameters for the gpsscli
and gpkafka
utilities in a YAML-formatted configuration file. (This reference page uses the name gpkafka.yaml
when referring to this file; you may choose your own name for the file.) Load parameters include Greenplum Database connection and target table information, Kafka broker and topic information, and error and commit thresholds.
NoteVersion 2 of the
gpkafka.yaml
configuration file syntax supportsKEY
andVALUE
blocks. Version 1 does not.
The gpsscli
and gpkafka
utilities process the YAML configuration file in order, using indentation (spaces) to determine the document hierarchy and the relationships between the sections. The use of white space in the file is significant, and keywords are case-sensitive.
Greenplum Database Options
VERSION: 2
when you configure
VALUE
and/or
KEY
blocks in the file.
KAFKA:INPUT: Options
Kafka input configuration parameters.
M...N
syntax includes both the range start and end values. By default, GPSS reads messages from all partitions of the Kafka topic.
NoteEnsure that you do not configure multiple jobs that specify overlapping partition numbers in the same topic; GPSS behavior is undefined.
earliest
, GPSS automatically resumes a load operation from the earliest available published message. When set to
latest
, GPSS loads only new messages to the Kafka topic. If this property is not set, GPSS returns an error.
KEY
block; GPSS ignores the Kafka message value in this circumstance.
VALUE
block; GPSS ignores the Kafka message key in this circumstance.
The name of a key or value column. column_name must match the column name of the target Greenplum Database table. Specify __IGNORED__
to omit this Kafka message data element from the load operation.
COLUMNS:NAME
with a column name in the target Greenplum Database
TABLE
. You can override the default mapping by specifying a
MAPPING
block.
The format of the Kafka message key or value data. You may specify a FORMAT
of avro
, binary
, csv
, custom
, delimited
, json
, or jsonl
for the key and value, with some restrictions.
avro
data format, you must define only a single
json
type column in
COLUMNS
. If the Kafka message key or value schema is registered in a Confluent Schema Registry, you must also provide the
AVRO_OPTION
.
binary
data format, you must define only a single
bytea
type column in
COLUMNS
.
csv
data format, the message content cannot contain line ending characters (CR and LF).
You must not provide a VALUE
block when you specify csv
format for a KEY
block. Similarly, you must not provide a KEY
block when you specify csv
format for a VALUE
block.
custom
data format, you must provide a
CUSTOM_OPTION
.
delimited
data format, you must provide a
DELIMITED_OPTION
.
json
data format, you must define only a single
json
type column in
COLUMNS
.
jsonl
data format, you may provide a
JSONL_OPTION
to define a newline character.
Optional. When you specify avro
as the FORMAT
, you may provide AVRO_OPTION
s that identify a schema registry location and optional SSL certificates and keys, and whether or not you want GPSS to convert Avro bytes
fields into base64-encoded strings.
FORMAT: avro
and the Avro schema of the JSON data you want to load is registered in the Confluent Schema Registry, you must identify the host name and port number of each Confluent Schema Registry server in your Kafka cluster. You may specify more than one address, and at least one of the addresses must be legal.
1.0
,
1.1
,
1.2
, or
1.3
. The default minimum TLS version is
1.0
.
avro
format and the Avro schema of the JSON key or value data that you want to load is specified in a separate
.avsc
file, you must identify the file system location in path_to_file, and the file must reside in this location on every Greenplum Database segment host.
NoteGPSS does not cache the schema. GPSS must reload the schema for every batch of Kafka data. Also, GPSS supports providing the schema for either the key or the value, but not both.
true
, GPSS converts Avro
bytes
fields into base64-encoded strings. The default value is
false
, GPSS does not perform the conversion.
When you specify FORMAT: csv
, you may provide the following options:
,
).
null_string
(nothing between two delimiters), missing values are evaluated as zero-length strings.
false
, GPSS returns an error when it encounters a row with missing trailing field values.
true
, GPSS sets missing trailing field values to
NULL
. Blank rows, fields with a
NOT NULL
constraint, and trailing delimiters on a line will still generate an error.
Optional. When you specify FORMAT: jsonl
, you may choose to provide the JSONL_OPTION
properties.
"\n"
.
Optional. When you specify FORMAT: custom
, you are required to provide the CUSTOM_OPTION
properties. This block identifies the name and the arguments of a custom formatter user-defined function.
Optional. When you specify FORMAT: delimited
, you may choose to provide the DELIMITER_OPTION
properties.
delimited
format, delimiter_string is required and must identify the data element delimiter. delimiter_string may be a multi-byte value, and up to 32 bytes in length. It may not contain quote and escape characters.
\n
) that indicates the end of a row. The default prefix is empty.
The field name, type, and format of the Kafka meta data. META
must specify a single json
or jsonb
(Greenplum 6 only) type column and FORMAT: json
. The available Kafka meta data properties include:
topic
(text) - the Kafka topic namepartition
(int) - the partition numberoffset
(bigint) - the record location within the partitiontimestamp
(bigint) - the time that the message was appended to the Kafka logYou can load any of these properties into the target table with a MAPPING
, or use a property in the update or merge criteria for a load operation.
Input data transform block. An input data transformer is a plugin, a set of go
functions that transform the data after it is read from the source. The semantics of the transform are function-specific. You specify the library and function names in this block, as well as the properties that GPSS passes to these functions:
true
, GPSS loads the message. If the filter evaluates to
false
, the message is dropped. filter_string must be a valid SQL conditional expression and may reference one or more
KEY
,
VALUE
, or
META
column names.
csv
,
custom
,
delimited
, or
json
format. GPSS supports the character sets identified in
Character Set Support in the Greenplum Database documentation.
gpkafka load
exits when this limit is reached. The default
ERROR_LIMIT
is zero; GPSS deactivates error logging and stops the load operation when it encounters the first error. Due to a limitation of the Greenplum Database external table framework, GPSS does not accept
ERROR_LIMIT: 1
.
KAFKA:OUTPUT: Options
NoteYou must specify only one of the
OUTPUT
orOUTPUTS
blocks. You cannot specify both.
public
schema.
true
, GPSS loads the message. If the filter evaluates to
false
, the message is dropped. output_filter_string must be a valid SQL conditional expression and may reference one or more
META
or
VALUE
column names.
The table load mode. Valid mode values are INSERT
, MERGE
, or UPDATE
. The default value is INSERT
.
UPDATE
- Updates the target table columns that are listed in UPDATE_COLUMNS
when the input columns identified in MATCH_COLUMNS
match the named target table columns and the optional UPDATE_CONDITION
is true.
UPDATE
is not supported if the target table column name is a reserved keyword, has capital letters, or includes any character that requires quotes (" ") to identify the column.
MERGE
- Inserts new rows and updates existing rows when:
UPDATE_COLUMNS
,MATCH_COLUMNS
target table column values are equal to the input data, andUPDATE_CONDITION
is specified and met.Deletes rows when:
MATCH_COLUMNS
target table column values are equal to the input data, andDELETE_CONDITION
is specified and met.New rows are identified when the MATCH_COLUMNS
value in the source data does not have a corresponding value in the existing data of the target table. In those cases, the entire row from the source file is inserted, not only the MATCH_COLUMNS
and UPDATE_COLUMNS
. If there are multiple new MATCH_COLUMNS
values in the input data that are the same, GPSS inserts or updates the target table using a random matching input row. When you specify ORDER_COLUMNS
, GPSS sorts the input data on the specified column(s) and inserts or updates from the input row with the largest value.
MERGE
is not supported if the target table column name is a reserved keyword, has capital letters, or includes any character that requires quotes (" ") to identify the column.
Required if MODE
is MERGE
or UPDATE
.
Optional. May be specified in MERGE
MODE
to sort the input data rows.
ORDER_COLUMNS
is used with
MATCH_COLUMNS
to determine the input row with the largest value; GPSS uses that row to write/update the target.
Required if MODE
is MERGE
or UPDATE
.
MATCH_COLUMNS
criteria and the optional
UPDATE_CONDITION
.
WHERE
clause, that must be met in order for a row in the target table to be updated (or inserted, in the case of a
MERGE
).
MERGE
MODE
, specifies a boolean condition, similar to that which you would declare in a
WHERE
clause, that must be met for GPSS to delete rows in the target table that meet the
MATCH_COLUMNS
criteria.
Optional. Output data transform block. An output data transformer is a user-defined function (UDF) that transforms the data before it is loaded into Greenplum Database. The semantics of the UDF are transform-specific.
NoteGPSS currently supports specifying only one of the
MAPPING
or (UDF)TRANSFORMER
blocks in the load configuration file, not both.
Optional. Overrides the default source-to-target column mapping. GPSS supports two mapping syntaxes.
NoteGPSS currently supports specifying only one of the
MAPPING
or (UDF)TRANSFORMER
blocks in the load configuration file, not both.
NoteWhen you specify a
MAPPING
, ensure that you provide a mapping for all Kafka message key and value elements of interest. GPSS does not automatically match column names when you provide aMAPPING
.
COLUMNS:NAME
(source_column_name) or an expression. When you specify an expression, you may provide a value expression that you would specify in the
SELECT
list of a query, such as a constant value, a column reference, an operator invocation, a built-in or user-defined function call, and so on.
MAPPING
syntax, specify the target_column_name and {source_column_name | expression} as described above.
KAFKA:OUTPUTS: Options
NoteYou must specify only one of the
OUTPUT
orOUTPUTS
blocks. You cannot specify both.
KAFKA:METADATA: Options
KAFKA:OUTPUT:SCHEMA
.
Greenplum Database COMMIT: Options
Controls how gpkafka load
commits a batch of data to Greenplum Database. You may specify both MAX_ROW
and MINIMAL_INTERVAL
as long as both values are not zero (0
). Try setting and tuning MINIMAL_INTERVAL
to your environment; introduce a MAX_ROW
setting only if you encounter high memory usage associated with message buffering.
false
; GPSS does not use a backup table, and returns immediately when it encounters an expression error. When you set this property to
true
, GPSS writes both the good and the bad data in the batch to a backup table named
gpssbackup_<jobhash>
, and continues to process incoming Kafka messages. You must then manually load the good data from the backup table into Greenplum
or set
RECOVER_FAILING_BATCH
(Beta) to
true
to have GPSS automatically reload the good data.
NoteUsing a backup table to hedge against mapping errors may impact performance, especially when the data that you are loading has not been cleaned.
true
and
SAVE_FAILING_BATCH
is also
true
, GPSS automatically reloads the good data in the batch and retains only the error data in the backup table. The default value is
false
; GPSS does not process the backup table.
NoteEnabling this property requires that GPSS has the Greenplum Database privileges to create a function.
INSERT
operation on the Greenplum Database table. The default value of
MAX_ROW
is
0
, which instructs GPSS to ignore this commit trigger condition.
INSERT
operation on the table. The default value is
5000
.
strong
,
at-least
,
at-most
, and
none
. The default value is
strong
. Refer to
Understanding Kafka Message Offset Management for more detailed information.
The maximum amount of time to wait (milliseconds) for the first batch of Kafka data. When you use this property to enable lazy load, GPSS waits until Kafka data is available before locking the target Greenplum table. You can specify:
0
(lazy load is deactivates)-1
(lazy load is activated, the job never stops), or0
.Kafka POLL: Options
NoteThe
POLL
properties are deprecated and ignored by GPSS.
Controls the polling time period and batch size when reading Kafka data.
BATCHSIZE
should be smaller than
COMMIT:MAX_ROW
. The default batch size is 200.
TIMEOUT
greater than
100
milliseconds and less than
COMMIT:MINIMAL_INTERVAL
. The default poll timeout is 1000 milliseconds.
Greenplum Database TASK: Options
Controls the execution and scheduling of a periodic (maintenance) task.
Kafka PROPERTIES: Options
Kafka consumer configuration property names and values.
Job SCHEDULE: Options
Controls the frequency and interval of restarting jobs.
d
), hour (
h
), minute (
m
), second (
s
), or millisecond (
ms
) integer units; do not mix units. The default retry interval is
5m
(5 minutes).
RUNNING_DURATION
.
RUNNING_DURATION
. The default is 0, do not restart the job.
clock_time
, even when GPSS encounters an EOF.
Controls notification when a job is stopped for any reason (success, completion, error, user-initiated stop).
$GPSSJOB_NAME
,
$GPSSJOB_STATUS
, and
$GPSSJOB_DETAIL
.
d
), hour (
h
), minute (
m
), or second (
s
) integer units; do not mix units. The default alert timeout is
-1s
(no timeout).
GPSS supports using template variables to specify property values in the load configuration file.
You specify a template variable value in the load configuration file as follows:
<PROPERTY>: {{<template_var>}}
For example:
MAX_RETRIES: {{numretries}}
GPSS substitutes the template variable with a value that you specify via the -p | --property <template_var=value>
option to the gpsscli dryrun
, gpsscli submit
, gpsscli load
, or gpkafka load
command.
For example, if the command line specifies:
--property numretries=10
GPSS substitutes occurrences of {{numretries}}
in the load configuration file with the value 10
before submitting the job, and uses that value while the job is running.
If you created a database object name using a double-quoted identifier (delimited identifier), you must specify the delimited name within single quotes in the gpkafka.yaml
configuration file. For example, if you create a table as follows:
CREATE TABLE "MyTable" ("MyColumn" text);
Your gpkafka.yaml
YAML configuration file would refer to the above table and column names as:
COLUMNS:
- name: '"MyColumn"'
type: text
OUTPUT:
TABLE: '"MyTable"'
GPSS requires Kafka version 0.11 or newer for exactly-once delivery assurance. You can run with an older version of Kafka (but lose the exactly-once guarantee) by adding the following PROPERTIES
block to your gpkafka-v2.yaml
load configuration file:
PROPERTIES:
api.version.request: false
broker.version.fallback: 0.8.2.1
You can specify backslash escape sequences in the CSV DELIMITER
, QUOTE
, and ESCAPE
options. GPSS supports the standard backslash escape sequences for backspace, form feed, newline, carriage return, and tab, as well as escape sequences that you specify in hexadecimal format (prefaced with \x
). Refer to Backslash Escape Sequences in the PostgreSQL documentation for more information.
Load data from Kafka as defined in the Version 2 configuration file named kafka2greenplumv2.yaml
:
gpkafka load kafka2greenplumv2.yaml
Example kafka2greenplumv2.yaml
configuration file:
DATABASE: ops
USER: gpadmin
HOST: mdw-1
PORT: 5432
VERSION: 2
KAFKA:
INPUT:
SOURCE:
BROKERS: kbrokerhost1:9092
TOPIC: customer_expenses2
PARTITIONS: (2, 5...7, 13)
VALUE:
COLUMNS:
- NAME: c1
TYPE: json
FORMAT: avro
AVRO_OPTION:
SCHEMA_REGISTRY_ADDR: http://localhost:8081
KEY:
COLUMNS:
- NAME: key
TYPE: json
FORMAT: avro
AVRO_OPTION:
SCHEMA_REGISTRY_ADDR: http://localhost:8081
META:
COLUMNS:
- NAME: meta
TYPE: json
FORMAT: json
ERROR_LIMIT: 25
OUTPUT:
SCHEMA: payables
TABLE: expenses2
MAPPING:
- NAME: customer_id
EXPRESSION: (c1->>'cust_id')::int
- NAME: newcust
EXPRESSION: ((c1->>'cust_id')::int > 5000000)::boolean
- NAME: expenses
EXPRESSION: (c1->>'expenses')::decimal
- NAME: tax_due
EXPRESSION: ((c1->>'expenses')::decimal * .075)::decimal
- NAME: t
EXPRESSION: (meta->>'topic')::text
METADATA:
SCHEMA: gpkafka_internal
COMMIT:
MINIMAL_INTERVAL: 2000
Loading Avro Data from Kafka, gpkafka.yaml, gpkafka load, gpsscli load, gpsscli submit