gpkafka configuration file (version 1).
DATABASE: <db_name>
USER: <user_name>
PASSWORD: <password>
HOST: <host>
PORT: <greenplum_port>
[VERSION: 1]
KAFKA:
INPUT:
SOURCE:
BROKERS: <kafka_broker_host:broker_port> [, ... ]
TOPIC: <kafka_topic>
[COLUMNS:
- NAME: { <column_name> | __IGNORED__ }
TYPE: <column_data_type>
[ ... ]]
FORMAT: <data_format>
[[DELIMITED_OPTION:
DELIMITER: <delimiter_string>] |
[AVRO_OPTION:
[SCHEMA_REGISTRY_ADDR: <http://schemareg_host:schemareg_port> [, ... ]]
[BYTES_TO_BASE64: <boolean>]] |
[CUSTOM_OPTION:
NAME: <udf_name>
PARAMSTR: <udf_parameter_string>]]
[FILTER: <filter_string>]
[ERROR_LIMIT: { <num_errors> | <percentage_errors> }]
OUTPUT:
[SCHEMA: <output_schema_name>]
TABLE: <table_name>
[MODE: <mode>]
[MATCH_COLUMNS:
- <match_column_name>
[ ... ]]
[ORDER_COLUMNS:
- <order_column_name>
[ ... ]]
[UPDATE_COLUMNS:
- <update_column_name>
[ ... ]]
[UPDATE_CONDITION: <update_condition>]
[DELETE_CONDITION: <delete_condition>]
[MAPPING:
- NAME: <target_column_name>
EXPRESSION: { <source_column_name> | <expression> }
[ ... ]
|
<target_column_name> : { <source_column_name> | <expression> }
[ ... ] ]
[METADATA:
[SCHEMA: <metadata_schema_name>]]
COMMIT:
MAX_ROW: <num_rows>
MINIMAL_INTERVAL: <wait_time>
[POLL:
BATCHSIZE: <num_records>
TIMEOUT: <poll_time>]
[TASK:
POST_BATCH_SQL: <udf_or_sql_to_run>
BATCH_INTERVAL: <num_batches>]
[PROPERTIES:
<kafka_property_name>: <kafka_property_value>
[ ... ]]
NoteThe
gpkafka.yaml
Version 1 configuration file format is deprecated and may be removed in a future release. Use the version 2 or version 3 (Beta) configuration file format to configure a Kafka load job.
You specify load configuration parameters for the gpkafka
utilities in a YAML-formatted configuration file. (This reference page uses the name gpkafka.yaml
when referring to this file; you may choose your own name for the file.) Load parameters include Greenplum Database connection and target table information, Kafka broker and topic information, and error and commit thresholds.
NoteVersion 1 of the
gpkafka.yaml
configuration file syntax does not supportKEY
andVALUE
blocks.
The gpkafka
utility processes the YAML configuration file in order, using indentation (spaces) to determine the document hierarchy and the relationships between the sections. The use of white space in the file is significant, and keywords are case-sensitive.
Greenplum Database Connection Options
KAFKA:INPUT: Options
Kafka input configuration parameters.
The column names and data types. You must specify all Kafka data elements in the order in which they appear in the Kafka message. Optional when the column names and types match the target Greenplum Database table definition.
The default source-to-target data mapping behaviour of GPSS is to match a column name as defined in COLUMNS:NAME
with a column name in the target Greenplum Database TABLE
. You can override the default mapping by specifying a MAPPING
block.
__IGNORED__
to omit this Kafka message data element from the load operation.
The format of the Kafka message value data. You may specify a FORMAT
of avro
, binary
, csv
, custom
, delimited
, or json
.
avro
data format, you must define only a single
json
type column in
COLUMNS
. If the Kafka message value schema is registered in a Confluent Schema Registry, you must also provide the
AVRO_OPTION
.
binary
data format, you must define only a single
bytea
type column in
COLUMNS
.
csv
data format, the message content cannot contain line ending characters (CR and LF).
custom
data format, you must provide a
CUSTOM_OPTION
.
delimited
data format, you must provide a
DELIMITED_OPTION
.
json
data format, you must define only a single
json
type column in
COLUMNS
.
Optional. When you specify avro
as the FORMAT
, you may provide AVRO_OPTION
s that identify a schema registry location and whether or not you want GPSS to convert Avro bytes
fields into base64-encoded strings.
avro
as the
FORMAT
and the Avro schema of the JSON data you want to load is registered in the Confluent Schema Registry, you must identify the host name and port number of each Confluent Schema Registry server in your Kafka cluster. You may specify more than one address, and at least one of the addresses must be legal.
true
, GPSS converts Avro
bytes
fields into base64-encoded strings. The default value is
false
, GPSS does not perform the conversion.
Optional. When you specify custom
as the FORMAT
, CUSTOM_OPTION
is required. This block identifies the name and the arguments of a custom formatter user-defined function.
delimited
as the
FORMAT
, delimiter_string is required and must identify the Kafka message data element delimiter. delimiter_string may be a multi-byte value, and up to 32 bytes in length. It may not contain quote and escape characters.
true
, GPSS loads the message. If the filter evaluates to
false
, the message is dropped. filter_string must be a valid SQL conditional expression and may reference one or more
COLUMNS
names.
gpkafka load
exits when this limit is reached. The default
ERROR_LIMIT
is zero; GPSS deactivates error logging, and stops the load operation when it encounters the first error. Due to a limitation of the Greenplum Database external table framework, GPSS does not accept
ERROR_LIMIT: 1
.
KAFKA:OUTPUT: Options
public
schema.
The table load mode. Valid mode values are INSERT
, MERGE
, or UPDATE
. The default value is INSERT
.
UPDATE
- Updates the target table columns that are listed in UPDATE_COLUMNS
when the input columns identified in MATCH_COLUMNS
match the named target table columns and the optional UPDATE_CONDITION
is true.
UPDATE
is not supported if the target table column name is a reserved keyword, has capital letters, or includes any character that requires quotes (" ") to identify the column.
MERGE
- Inserts new rows and updates existing rows when:
UPDATE_COLUMNS
,MATCH_COLUMNS
target table column values are equal to the input data, andUPDATE_CONDITION
is specified and met.Deletes rows when:
MATCH_COLUMNS
target table column values are equal to the input data, andDELETE_CONDITION
is specified and met.New rows are identified when the MATCH_COLUMNS
value in the source data does not have a corresponding value in the existing data of the target table. In those cases, the entire row from the source file is inserted, not only the MATCH_COLUMNS
and UPDATE_COLUMNS
. If there are multiple new MATCH_COLUMNS
values in the input data that are the same, GPSS inserts or updates the target table using a random matching input row. When you specify ORDER_COLUMNS
, GPSS sorts the input data on the specified column(s) and inserts or updates from the input row with the largest value.
MERGE
is not supported if the target table column name is a reserved keyword, has capital letters, or includes any character that requires quotes (" ") to identify the column.
Required if MODE
is MERGE
or UPDATE
.
Optional. May be specified in MERGE
MODE
to sort the input data rows.
ORDER_COLUMNS
is used with
MATCH_COLUMNS
to determine the input row with the largest value; GPSS uses that row to write/update the target.
Required if MODE
is MERGE
or UPDATE
.
MATCH_COLUMNS
criteria and the optional
UPDATE_CONDITION
.
WHERE
clause, that must be met in order for a row in the target table to be updated (or inserted, in the case of a
MERGE
).
MERGE
MODE
, specifies a boolean condition, similar to that which you would declare in a
WHERE
clause, that must be met for GPSS to delete rows in the target table that meet the
MATCH_COLUMNS
criteria.
Optional. Overrides the default source-to-target column mapping. GPSS supports two mapping syntaxes.
NoteWhen you specify a
MAPPING
, ensure that you provide a mapping for all Kafka data elements of interest. GPSS does not automatically match column names when you provide aMAPPING
.
COLUMNS:NAME
(source_column_name) or an expression. When you specify an expression, you may provide a value expression that you would specify in the
SELECT
list of a query, such as a constant value, a column reference, an operator invocation, a built-in or user-defined function call, and so on.
MAPPING
syntax, specify the target_column_name and {source_column_name | expression} as described above.
KAFKA:METADATA: Options
KAFKA:OUTPUT:SCHEMA
.
Greenplum Database COMMIT: Options
Controls how gpkafka load
commits data to Greenplum Database. You must specify one of MAX_ROW
or MINIMAL_INTERVAL
. You may specify both configuration parameters as long as both values are not zero (0
). Try setting and tuning MINIMAL_INTERVAL
to your environment; introduce a MAX_ROW
setting only if you encounter high memory usage associated with message buffering.
INSERT
operation on the Greenplum Database table. The default value of
MAX_ROW
is
0
, which instructs GPSS to ignore this commit trigger condition.
INSERT
operation on the table. The default value is
0
, wait forever.
Kafka POLL: Options
NoteThe
POLL
properties are deprecated and ignored by GPSS.
Controls the polling time period and batch size when reading Kafka data.
BATCHSIZE
must be smaller than
COMMIT:MAX_ROW
. The default batch size is 200.
TIMEOUT
greater than
100
milliseconds and less than
COMMIT:MINIMAL_INTERVAL
. The default poll timeout is 1000 milliseconds.
Greenplum Database TASK: Options
Controls the execution and scheduling of a periodic (maintenance) task.
Kafka PROPERTIES: Options
Kafka consumer configuration property names and values.
If you created a database object name using a double-quoted identifier (delimited identifier), you must specify the delimited name within single quotes in the gpkafka.yaml
configuration file. For example, if you create a table as follows:
CREATE TABLE "MyTable" ("MyColumn" text);
Your gpkafka.yaml
YAML configuration file would refer to the above table and column names as:
COLUMNS:
- name: '"MyColumn"'
type: text
OUTPUT:
TABLE: '"MyTable"'
GPSS requires Kafka version 0.11 or newer for exactly-once delivery assurance. You can run with an older version of Kafka (but lose the exactly-once guarantee) by adding the following PROPERTIES
block to your gpkafka.yaml
load configuration file:
PROPERTIES:
api.version.request: false
broker.version.fallback: 0.8.2.1
Load data from Kafka as defined in the Version 1 configuration file named kafka2greenplum.yaml
:
gpkafka load kafka2greenplum.yaml
Example kafka2greenplum.yaml
configuration file:
DATABASE: ops
USER: gpadmin
HOST: mdw-1
PORT: 5432
KAFKA:
INPUT:
SOURCE:
BROKERS: kbrokerhost1:9092
TOPIC: customer_expenses
COLUMNS:
- NAME: cust_id
TYPE: int
- NAME: month
TYPE: int
- NAME: expenses
TYPE: decimal(9,2)
FORMAT: delimited
DELIMITED_OPTION:
DELIMITER: '|'
ERROR_LIMIT: 25
OUTPUT:
SCHEMA: payables
TABLE: expenses
MAPPING:
- NAME: customer_id
EXPRESSION: cust_id
- NAME: newcust
EXPRESSION: cust_id > 5000000
- NAME: expenses
EXPRESSION: expenses
- NAME: tax_due
EXPRESSION: expenses * .0725
METADATA:
SCHEMA: gpkafka_internal
COMMIT:
MINIMAL_INTERVAL: 2000