GPSS load configuration file for a Kafka data source (version 3).
version: v3
targets:
- gpdb:
host: <host>
port: <greenplum_port>
user: <user_name>
password: <password>
database: <db_name>
work_schema: <work_schema_name>
error_limit: <num_errors> | <percentage_errors>
filter_expression: <filter_string>
tables:
- table: <table_name>
schema: <schema_name>
mode:
# specify a single mode property block (described below)
insert: {}
update:
<mode_specific_property>: <value>
...
merge:
<mode_specific_property>: <value>
...
transformer:
transform: <udf_transform_udf_name>
properties:
<udf_transform_property_name>: <property_value>
...
columns:
- <udf_transform_column_name>
...
mapping:
<target_column_name> : <source_column_name> | <expression>
...
filter: <output_filter_string>
...
sources:
- kafka:
topic: <kafka_topic>
brokers: <kafka_broker_host:broker_port> %, ...%
partitions: (<partition_numbers>)
key_content:
<data_format>:
<column_spec>
<other_props>
value_content:
<data_format>:
<column_spec>
<other_props>
meta:
json:
column:
name: meta
type: json
encoding: <char_set>
transformer:
path: <path_to_plugin_transform_library>
on_init: <plugin_transform_init_name>
transform: <plugin_transform_name>
properties:
<plugin_transform_property_name>: <property_value>
...
rdkafka_prop:
<kafka_property_name>: <kafka_property_value>
...
task:
batch_size:
max_count: <number_of_rows>
interval_ms: <wait_time>
idle_duration_ms: <idle_time>
window_size: <num_batches>
window_statement: <udf_or_sql_to_run>
prepare_statement: <udf_or_sql_to_run>
teardown_statement: <udf_or_sql_to_run>
save_failing_batch: <boolean>
recover_failing_batch: <boolean> (Beta)
consistency: strong | at-least | at-most | none
fallback_offset: earliest | latest
option:
schedule:
max_retries: <num_retries>
retry_interval: <retry_time>
running_duration: <run_time>
auto_stop_restart_interval: <restart_time>
max_restart_times: <num_restarts>
quit_at_eof_after: <clock_time>
alert:
command: <command_to_run>
workdir: <directory>
timeout: <alert_time>
Where the mode_specific_propertys that you can specify for update
and merge
mode follow:
update:
match_columns: [<match_column_names>]
order_columns: [<order_column_names>]
update_columns: [<update_column_names>]
update_condition: <update_condition>
merge:
match_columns: [<match_column_names>]
update_columns: [<update_column_names>]
order_columns: [<order_column_names>]
update_condition: <update_condition>
delete_condition: <delete_condition>
Where data_format, column_spec, and other_props are one of the following blocks
avro:
source_column_name: <column_name>
schema_url: <http://schemareg_host:schemareg_port> %, ...%
bytes_to_base64: <boolean>
schema_ca_on_gpdb: <sr_ca_file_path>
schema_cert_on_gpdb: <sr_cert_file_path>
schema_key_on_gpdb: <sr_key_file_path>
schema_min_tls_version: <minimum_version>
schema_path_on_gpdb: <path_to_file>
binary:
source_column_name: <column_name>
csv:
columns:
- name: <column_name>
type: <column_data_type>
...
delimiter: <delim_char>
quote: <quote_char>
null_string: <nullstr_val>
escape: <escape_char>
force_not_null: <columns>
fill_missing_fields: <boolean>
custom:
columns:
- name: <column_name>
type: <column_data_type>
...
name: <formatter_name>
options:
- <optname>=<optvalue>
...
delimited:
columns:
- name: <column_name>
type: <column_data_type>
...
delimiter: <delimiter_string>
eol_prefix: <prefix_string>
quote: <quote_char>
escape: <escape_char>
json:
column:
name: <column_name>
type: json | jsonb
is_jsonl: <boolean>
newline: <newline_str>
And where you may specify any property value with a template variable that GPSS substitutes at runtime using the following syntax:
<property:> {{<template_var>}}
NoteVersion 3 of the GPSS load configuration file is different in both content and format than previous versions of the file. Certain symbols used in the GPSS version 1 and 2 configuration file reference page syntax have different meanings in version 3 syntax:
- Brackets
[]
are literal and are used to specify a list in version 3. They are no longer used to signify the optionality of a property.- Curly braces
{}
are literal and are used to specify YAML mappings in version 3 syntax. They are no longer used with the pipe symbol (|
) to identify a list of choices.
You specify load configuration properties for a Greenplum Streaming Server (GPSS) Kafka load job in a YAML-formatted configuration file. (This reference page uses the name gpkafka-v3.yaml
when referring to this file; you may choose your own name for the file.) Load properties include Greenplum Database connection and data import properties, Kafka broker, topic, and message format information, and properies specific to the GPSS job.
The gpsscli
and gpkafka load
utilities processes the YAML configuration file in order, using indentation (spaces) to determine the document hierarchy and the relationships between the sections. The use of white space in the file is significant. Keywords are not case-sensitive.
version Property
version: v3
.
targets:gpdb Properties
public
.
true
, GPSS loads the message. If the filter evaluates to
false
, the message is dropped. filter_string must be a valid SQL conditional expression and may reference one or more source value, key, or meta column names.
The Greenplum Database tables, and the data that GPSS will load into each.
public
schema.
insert
,
merge
, or
update
. The default mode is
insert
.
Note
update
andmerge
are not supported if the target table column name is a reserved keyword, has capital letters, or includes any character that requires quotes (" ") to identify the column.
update_columns
when the input columns identified in
match_columns
match the named target table columns and the optional
update_condition
is true.
update_columns
,match_columns
target table column values are equal to the input data, andupdate_condition
is specified and met.Deletes rows when:
match_columns
target table column values are equal to the input data, anddelete_condition
is specified and met.New rows are identified when the match_columns
value in the source data does not have a corresponding value in the existing data of the target table. In those cases, the entire row from the source file is inserted, not only the match_columns
and update_columns
. If there are multiple new match_columns
values in the input data that are the same, GPSS inserts or updates the target table using a random matching input row. When you specify order_columns
, GPSS sorts the input data on the specified column(s) and inserts or updates from the input row with the largest value.
mode
supports one or more of the following properties as specified in the Synopsis.
mode
is
merge
or
update
.
order_columns
is used with
match_columns
to determine the input row with the largest value; GPSS uses that row to write/update the target.
merge
mode
to sort the input data rows.
match_columns
criteria and the optional
update_condition
.
mode
is
merge
or
update
.
WHERE
clause, that must be met in order for a row in the target table to be updated (or inserted, in the case of a
merge
). Optional.
merge
mode
, specifies a boolean condition, similar to that which you would declare in a
WHERE
clause, that must be met for GPSS to delete rows in the target table that meet the
match_columns
criteria. Optional.
Optional. Output data transform block. An output data transformer is a user-defined function (UDF) that transforms the data before it is loaded into Greenplum Database. The semantics of the UDF are transform-specific.
NoteGPSS currently supports specifying only one of the
mapping
or (UDF)transformer
blocks in the load configuration file, not both.
NoteGPSS currently supports specifying only one of the
mapping
or (UDF)transformer
blocks in the load configuration file, not both.
NoteWhen you specify a
mapping
, ensure that you provide a mapping for all source data elements of interest. GPSS does not automatically match column names when you provide amapping
block.
SELECT
list of a query, such as a constant value, a column reference, an operator invocation, a built-in or user-defined function call, and so on.
true
, GPSS loads the message. If the filter evaluates to
false
, the message is dropped. output_filter_string must be a valid SQL conditional expression and may reference one or more
META
or
VALUE
column names.
sources:kafka: Options
M...N
syntax includes both the range start and end values. By default, GPSS reads messages from all partitions of the Kafka topic.
NoteEnsure that you do not configure multiple jobs that specify overlapping partition numbers in the same topic; GPSS behavior is undefined.
value_content
block; GPSS ignores the Kafka message key in this circumstance.
key_content
block; GPSS ignores the Kafka message value in this circumstance.
NoteYou must not provide a
value_content
block when you specifycsv
format for thekey_content
block. Similarly, you must not provide akey_content
block when you specifycsv
format for avalue_content
block.
The source to Greenplum column mapping. The supported column specification differs for different data formats as described below.
source_column_name
,
column:name
, or
columns:name
with a column name in the target Greenplum Database
table
. You can override the default mapping by specifying a
mapping:
block.
avro
,
binary
,
csv
,
custom
,
delimited
, or
json
for the key and value, with some restrictions.
avro
data format for a key or value, GPSS reads the data into a single
json
-type column. You may specify a schema registery location and optional SSL certificates and keys, and whether or not you want GPSS to convert
bytes
fields into base64-encoded strings.
json
-type column into which GPSS reads the key or value data.
avro
format and the Avro schema of the JSON data that you want to load is registered in the Confluent Schema Registry, you must identify the host name and port number of each Confluent Schema Registry server in your Kafka cluster. You may specify more than one address, and at least one of the addresses must be legal.
true
, GPSS converts Avro
bytes
fields into base64-encoded strings. The default value is
false
, GPSS does not perform the conversion.
1.0
,
1.1
,
1.2
, or
1.3
. The default minimum TLS version is
1.0
.
avro
format and the Avro schema of the JSON key or value data that you want to load is specified in a separate
.avsc
file, you must identify the file system location in path_to_file, and the file must reside in this location on every Greenplum Database segment host.
NoteGPSS does not cache the schema. GPSS must reload the schema for every batch of Kafka data. Also, GPSS supports providing the schema for either the key or the value, but not both.
binary
data format, GPSS reads the data into a single
bytea
-type column.
bytea
-type column into which GPSS reads the key or value data.
csv
data format, GPSS reads the data into the list of columns that you specify. The message content cannot contain line ending characters (CR and LF).
[]
specifies all columns.
,
).
null_string
(nothing between two delimiters), missing values are evaluated as zero-length strings.
false
, GPSS returns an error when it encounters a row with missing trailing field values.
If set to true
, GPSS sets missing trailing field values to NULL
. Blank rows, fields with a NOT NULL
constraint, and trailing delimiters on a line will still generate an error.
custom
data format, GPSS uses the custom formatter that you specify to process the input data before writing it to Greenplum Database.
[]
specifies all columns.
custom
data format, formatter_name is required and must identify the name of the formatter user-defined function that GPSS should use when loading the data.
delimited
data format, GPSS reads the data into the list of columns that you specify. You must specify the data
delimiter
.
[]
specifies all columns.
delimited
data format, delimiter_string is required and must identify the data element delimiter. delimiter_string may be a multi-byte value, and up to 32 bytes in length. It may not contain quote and escape characters.
\n
) that indicates the end of a row. The default prefix is empty.
delimiter
,
eol_prefix
,
quote
, or
escape
itself). Therdefault escape character is empty.
json
data format, GPSS can read the data as a single JSON object or as a single JSON record per line.
false
, GPSS reads the JSON data as a single object.
"\n"
.
The data type and field name of the Kafka meta data. meta:
must specify the json
or jsonb
(Greenplum 6 only) data format, and a single json
-type column. The available Kafka meta data properties include:
topic
(text) - the Kafka topic namepartition
(int) - the partition numberoffset
(bigint) - the record location within the partitiontimestamp
(bigint) - the time that the message was appended to the Kafka logYou can load any of these properties into the target table with a mapping
, or use a property in the update or merge criteria for a load operation.
csv
,
custom
,
delimited
, or
json
format. GPSS supports the character sets identified in
Character Set Support in the Greenplum Database documentation.
Input data transform block. An input data transformer is a plugin, set of go
functions that transform the data after it is read from the source. The semantics of the transform are function-specific. You specify the library and function names in this block, as well as the properties that GPSS passes to these functions:
Kafka consumer configuration property names and values.
The batch size and commit window.
max_count
and
interval_ms
as long as both values are not zero (
0
). Try setting and tuning
interval_ms
to your environment; introduce a
max_count
setting only if you encounter high memory usage associated with message buffering.
INSERT
operation on the Greenplum Database table. The default value of
max_count
is
0
, which instructs GPSS to ignore this commit trigger condition.
INSERT
operation on the table. The default value is
5000
.
0
(lazy load is deactivated)-1
(lazy load is activated, the job never stops), orThe default value is 0
.
window_statement
. The default batch interval is 0.
window_size
number of batches. The default is null, no command to run.
false
; GPSS does not use a backup table, and returns immediately when it encounters an expression error. When you set this property to
true
, GPSS writes both the good and the bad data in the batch to a backup table named
gpssbackup_<jobhash>
, and continues to process incoming data. You must then manually load the good data from the backup table into Greenplum
or set
recover_failing_batch
(Beta) to
true
to have GPSS automatically reload the good data.
NoteUsing a backup table to hedge against mapping errors may impact performance, especially when the data that you are loading has not been cleaned.
true
and
save_failing_batch
is also
true
, GPSS automatically reloads the good data in the batch and retains only the error data in the backup table. The default value is
false
; GPSS does not process the backup table.
NoteEnabling this property requires that GPSS has the Greenplum Database privileges to create a function.
strong
,
at-least
,
at-most
, and
none
. The default value is
strong
. Refer to
Understanding Kafka Message Offset Management for more detailed information.
earliest
, GPSS automatically resumes a load operation from the earliest available published message. When set to
latest
, GPSS loads only new messages to the Kafka topic. If this property is not set, GPSS returns an error.
option: Properties
Controls the frequency and interval of restarting jobs.
d
), hour (
h
), minute (
m
), second (
s
), or millisecond (
ms
) integer units; do not mix units. The default retry interval is
5m
(5 minutes).
running_duration
.
running_duration
. The default is 0, do not restart the job.
clock_time
, even when GPSS encounters an EOF.
Controls notification when a job is stopped for any reason (success, completion, error, user-initiated stop).
$GPSSJOB_NAME
,
$GPSSJOB_STATUS
, and
$GPSSJOB_DETAIL
.
d
), hour (
h
), minute (
m
), or second (
s
) integer units; do not mix units. The default alert timeout is
-1s
(no timeout).
GPSS supports using template variables to specify property values in the load configuration file.
You specify a template variable value in the load configuration file as follows:
<property>: {{<template_var>}}
For example:
max_retries: {{numretries}}
GPSS substitutes the template variable with a value that you specify via the -p | --property <template_var=value>
option to the gpsscli dryrun
, gpsscli submit
, gpsscli load
, or gpkafka load
command.
For example, if the command line specifies:
--property numretries=10
GPSS substitutes occurrences of {{numretries}}
in the load configuration file with the value 10
before submitting the job, and uses that value while the job is running.
If you created a database object name using a double-quoted identifier (delimited identifier), you must specify the delimited name within single quotes in the load configuration file. For example, if you create a table as follows:
CREATE TABLE "MyTable" (c1 text);
Your YAML configuration file would refer to the table name as:
targets:
- gpdb:
tables:
- table: '"MyTable"'
You can specify backslash escape sequences in the CSV delimiter
, quote
, and escape
options. GPSS supports the standard backslash escape sequences for backspace, form feed, newline, carriage return, and tab, as well as escape sequences that you specify in hexadecimal format (prefaced with \x
). Refer to Backslash Escape Sequences in the PostgreSQL documentation for more information.
GPSS requires Kafka version 0.11 or newer for exactly-once delivery assurance. You can run with an older version of Kafka (but lose the exactly-once guarantee) by adding the following rdkafka_prop
block to your gpkafka-v3.yaml
load configuration file:
rdkafka_prop:
api.version.request: false
broker.version.fallback: 0.8.2.1
Load data from Kafka as defined in the Version 3 configuration file named loadfromkafka_v3.yaml
:
gpkafka load loadfromkafka_v3.yaml
Example loadfromkafka_v3.yaml
configuration file:
version: v3
targets:
- gpdb:
host: mdw-1
port: 15432
user: gpadmin
password: changeme
database: testdb
work_schema: public
error_limit: 25
tables:
- table: tbl_order_merge
schema: public
mode:
insert {}
mapping:
data: (value->>'data')::text
o: (meta->>'offset')::bigint
p: (meta->>'partition')::int
pk: (value->>'pk')::int
ts: (meta->>'timestamp')::bigint
sources:
- kafka:
topic: daily_orders
brokers: localhost:9092
key_content:
binary:
source_column_name: key
value_content:
json:
column:
name: value
type: JSON
meta:
json:
column:
name: meta
type: JSON
task:
batch_size:
interval_ms: 5000
max_count: 1
window_size: 5
option:
schedule:
running_duration: 2s
auto_stop_restart_interval : 2s
max_restart_times : 1