GPSS load configuration file for a RabbitMQ data source (version 2).
DATABASE: <db_name>
USER: <user_name>
PASSWORD: <password>
HOST: <host>
PORT: <greenplum_port>
VERSION: 2
RABBITMQ:
INPUT:
SOURCE:
SERVER: <rmq_user>:<rmq_password>@<rmq_host>:<rmq_port>
VIRTUALHOST: <gpss_vhost>
{ STREAM: <name> | QUEUE: <name> }
[FALLBACK_OFFSET: { earliest | latest }]
DATA:
COLUMNS:
- NAME: { <column_name> | __IGNORED__ }
TYPE: <column_data_type>
[ ... ]
FORMAT: <value_data_format>
[[DELIMITED_OPTION:
DELIMITER: <delimiter_string>
[EOL_PREFIX: <prefix_string>]
[QUOTE: <quote_char>]
[ESCAPE: <escape_char>] ] |
[CSV_OPTION:
[DELIMITER: <delim_char>]
[QUOTE: <quote_char>]
[NULL_STRING: <nullstr_val>]
[ESCAPE: <escape_char>]
[FORCE_NOT_NULL: <columns>]
[FILL_MISSING_FIELDS: <boolean>]] |
[JSONL_OPTION:
[NEWLINE: <newline_str>]] |
[CUSTOM_OPTION:
NAME: <udf_name>
PARAMSTR: <udf_parameter_string>]]
[META:
COLUMNS:
- NAME: <meta_column_name>
TYPE: { json | jsonb }
FORMAT: json]
[TRANSFORMER:
PATH: <path_to_plugin_transform_library>
ON_INIT: <plugin_transform_init_name>
TRANSFORM: <plugin_transform_name>
PROPERTIES:
<plugin_transform_property_name>: <property_value>
[ ... ] ]
[FILTER: <filter_string>]
[ENCODING: <char_set>]
[ERROR_LIMIT: { <num_errors> | <percentage_errors> }]
{ OUTPUT:
[SCHEMA: <output_schema_name>]
TABLE: <table_name>
[FILTER: <output_filter_string>]
[MODE: <mode>]
[MATCH_COLUMNS:
- <match_column_name>
[ ... ]]
[ORDER_COLUMNS:
- <order_column_name>
[ ... ]]
[UPDATE_COLUMNS:
- <update_column_name>
[ ... ]]
[UPDATE_CONDITION: <update_condition>]
[DELETE_CONDITION: <delete_condition>]
[TRANSFORMER:
TRANSFORM: <udf_transform_udf_name>
PROPERTIES:
<udf_transform_property_name>: <property_value>
[ ... ]
COLUMNS:
- <udf_transform_column_name>
[ ... ] ]
[MAPPING:
- NAME: <target_column_name>
EXPRESSION: { <source_column_name> | <expression> }
[ ... ]
|
<target_column_name> : { <source_column_name> | <expression> }
[ ... ] ] |
OUTPUTS:
- TABLE: <table_name>
[MODE: <mode>]
[MATCH_COLUMNS:
- <match_column_name>
[ ... ]]
[ORDER_COLUMNS:
- <order_column_name>
[ ... ]]
[UPDATE_COLUMNS:
- <update_column_name>
[ ... ]]
[UPDATE_CONDITION: <update_condition>]
[DELETE_CONDITION: <delete_condition>]
[TRANSFORMER:
TRANSFORM: <udf_transform_udf_name>
PROPERTIES:
<udf_transform_property_name>: <property_value>
[ ... ]
COLUMNS:
- <udf_transform_column_name>
[ ... ] ]
[MAPPING:
- NAME: <target_column_name>
EXPRESSION: { <source_column_name> | <expression> }
[ ... ]
|
<target_column_name> : { <source_column_name> | <expression> }
[ ... ] ]
[...] }
[METADATA:
[SCHEMA: <metadata_schema_name>]]
[COMMIT:
SAVE_FAILING_BATCH: <boolean>
RECOVER_FAILING_BATCH: <boolean> (Beta)
MAX_ROW: <num_rows>
MINIMAL_INTERVAL: <wait_time>
CONSISTENCY: { strong | at-least | at-most | none }
IDLE_DURATION: <idle_time> ]
[TASK:
POST_BATCH_SQL: <udf_or_sql_to_run>
BATCH_INTERVAL: <num_batches>
PREPARE_SQL: <udf_or_sql_to_run>
TEARDOWN_SQL: <udf_or_sql_to_run> ]
[PROPERTIES:
<rmq_property_name>: <rmq_property_value>
[ ... ]]
[SCHEDULE:
RETRY_INTERVAL: <retry_time>
MAX_RETRIES: <num_retries>
RUNNING_DURATION: <run_time>
AUTO_STOP_RESTART_INTERVAL: <restart_time>
MAX_RESTART_TIMES: <num_restarts>
QUIT_AT_EOF_AFTER: <clock_time>]
[ALERT:
COMMAND: <command_to_run>
WORKDIR: <directory>
TIMEOUT: <alert_time>]
Where you may specify any property value with a template variable that GPSS substitutes at runtime using the following syntax:
<PROPERTY:> {{<template_var>}}
You specify load configuration parameters for the gpsscli
utilities in a YAML-formatted configuration file. (This reference page uses the name rabbitmq-v2.yaml
when referring to this file; you may choose your own name for the file.) Load parameters include VMware Tanzu Greenplum connection and target table information, RabbitMQ data source information, and error and commit thresholds.
The gpsscli
utility processes the YAML configuration file in order, using indentation (spaces) to determine the document hierarchy and the relationships between the sections. The use of white space in the file is significant, and keywords are case-sensitive.
Tanzu Greenplum Options
DATABASE: <db_name>
USER: <user_name>
PASSWORD: <password>
HOST: <host>
PORT: <greenplum_port>
VERSION: 2
VERSION: 2
when you configure the
DATA
block in the file.
RABBITMQ:INPUT: Options
SOURCE
SERVER: <rmq_user:rmq_password@rmq_host:rmq_port>
VIRTUALHOST: <gpss_vhost>
STREAM: <name>
STREAM
or
QUEUE
.
QUEUE: <name>
STREAM
or
QUEUE
.
FALLBACK_OFFSET: { earliest | latest }
earliest
, GPSS automatically resumes a load operation from the earliest available published message. When set to
latest
, GPSS loads only new messages to the RabbitMQ stream.
DATA
COLUMNS:NAME: <column_name>
The name of a data column. column_name
must match the column name of the target Tanzu Greenplum table. Specify __IGNORED__
to omit this RabbitMQ message data element from the load operation.
COLUMNS:NAME
with a column name in the target Tanzu Greenplum
TABLE
. You can override the default mapping by specifying a
MAPPING
block.
COLUMNS:TYPE: <data_type>
FORMAT: <data_format>
FORMAT
of
binary
,
csv
,
custom
,
delimited
,
json
, or
jsonl
for the data, with some restrictions.
binary
: When you specify the binary
data format, you must define only a single bytea
type column in COLUMNS
.csv
: When you specify the csv
data format, the message content cannot contain line ending characters (CR and LF).custom
: When you specify the custom
data format, you must provide a CUSTOM_OPTION
.delimited
: When you specify the delimited
data format, you must provide a DELIMITED_OPTION
.json
: When you specify the json
data format, you must define only a single json
type column in COLUMNS
.jsonl
: When you specify the jsonl
data format, you may provide a JSONL_OPTION
to define a newline character.CSV_OPTION
FORMAT: csv
, you may provide the following options:
DELIMITER: <delim_char>
,
).
QUOTE: <quote_char>
NULL_STRING: <nullstr_val>
ESCAPE: <escape_char>
FORCE_NOT_NULL: <columns>
null_string
(nothing between two delimiters), missing values are evaluated as zero-length strings.
FILL_MISSING_FIELDS: <boolean>
false
, GPSS returns an error when it encounters a row with missing trailing field values.
true
, GPSS sets missing trailing field values to
NULL
. Blank rows, fields with a
NOT NULL
constraint, and trailing delimiters on a line will still generate an error.
CUSTOM_OPTION
FORMAT: custom
, you are required to provide the
CUSTOM_OPTION
properties. This block identifies the name and the arguments of a custom formatter user-defined function.
NAME: <udf_name>
PARAMSTR: <udf_parameter_string>
JSONL_OPTION
FORMAT: jsonl
, you may choose to provide the
JSONL_OPTION
properties.
NEWLINE: <newline_str>
"\n"
.
DELIMITED_OPTION
FORMAT: delimited
, you may choose to provide the
DELIMITER_OPTION
properties.
DELIMITER: <delimiter_string>
delimited
format, delimiter_string is required and must identify the data element delimiter. delimiter_string may be a multi-byte value, and up to 32 bytes in length. It may not contain quote and escape characters.
EOL_PREFIX: <prefix_string>
\n
) that indicates the end of a row. The default prefix is empty.
QUOTE: <quote_char>
ESCAPE: <escape_char>
META:
json
or
jsonb
(Greenplum 6 only) type column and
FORMAT: json
.
The available RabbitMQ meta data properties for a streaming source include:
stream
(text) - the RabbitMQ stream nameoffset
(bigint) - the message offsetThe available RabbitMQ meta data properties for a queue source include:
queue
(text) - the RabbitMQ queue namemessageId
(text) - the message identifiercorrelationId
(text) - the correlation identifiertimestamp
(bitint) - the time that the message was added to the RabbitMQ queueTRANSFORMER:
go
functions that transform the data after it is read from the source. The semantics of the transform are function-specific. You specify the library and function names in this block, as well as the properties that GPSS passes to these functions:
PATH: <path_to_plugin_transform_library>
ON_INIT: <plugin_transform_init_name>
TRANSFORM: <plugin_transform_name>
PROPERTIES: <plugin_transform_property_name: property_value>
FILTER: <filter_string>
true
, GPSS loads the message. If the filter evaluates to
false
, the message is dropped. filter_string must be a valid SQL conditional expression and may reference one or more
DATA
column names.
ENCODING: <char_set>
csv
,
custom
,
delimited
, or
json
format. GPSS supports the character sets identified in
Character Set Support in the Tanzu Greenplum documentation.
ERROR_LIMIT: { <num_errors> | <percentage_errors> }
gpsscli load
exits when this limit is reached. The default
ERROR_LIMIT
is zero; GPSS deactivates error logging and stops the load operation when it encounters the first error. Due to a limitation of the Tanzu Greenplum external table framework, GPSS does not accept
ERROR_LIMIT: 1
.
RABBITMQ:OUTPUT: Options
NoteYou must specify only one of the
OUTPUT
orOUTPUTS
blocks. You cannot specify both.
SCHEMA: <output_schema_name>
public
schema.
TABLE: <table_name>
FILTER: <output_filter_string>
true
, GPSS loads the message. If the filter evaluates to
false
, the message is dropped.
output_filter_string
must be a valid SQL conditional expression and may reference one or more
META
or
VALUE
column names.
MODE: <mode>
The table load mode. Valid mode values are INSERT
, MERGE
, or UPDATE
. The default value is INSERT
.
UPDATE
- Updates the target table columns that are listed in UPDATE_COLUMNS
when the input columns identified in MATCH_COLUMNS
match the named target table columns and the optional UPDATE_CONDITION
is true.
UPDATE
is not supported if the target table column name is a reserved keyword, has capital letters, or includes any character that requires quotes (" ") to identify the column.
MERGE
- Inserts new rows and updates existing rows when:
UPDATE_COLUMNS
,MATCH_COLUMNS
target table column values are equal to the input data, andUPDATE_CONDITION
is specified and met.Deletes rows when:
MATCH_COLUMNS
target table column values are equal to the input data, andDELETE_CONDITION
is specified and met.New rows are identified when the MATCH_COLUMNS
value in the source data does not have a corresponding value in the existing data of the target table. In those cases, the entire row from the source file is inserted, not only the MATCH_COLUMNS
and UPDATE_COLUMNS
. If there are multiple new MATCH_COLUMNS
values in the input data that are the same, GPSS inserts or updates the target table using a random matching input row. When you specify ORDER_COLUMNS
, GPSS sorts the input data on the specified column(s) and inserts or updates from the input row with the largest value.
MERGE
is not supported if the target table column name is a reserved keyword, has capital letters, or includes any character that requires quotes (" ") to identify the column.
MATCH_COLUMNS:
MODE
is
MERGE
or
UPDATE
.
<match_column_name>
ORDER_COLUMNS:
MERGE
MODE
to sort the input data rows.
<order_column_name>
ORDER_COLUMNS
is used with
MATCH_COLUMNS
to determine the input row with the largest value; GPSS uses that row to write/update the target.
UPDATE_COLUMNS:
MODE
is
MERGE
or
UPDATE
.
<update_column_name>
MATCH_COLUMNS
criteria and the optional
UPDATE_CONDITION
.
UPDATE_CONDITION: <update_condition>
WHERE
clause, that must be met in order for a row in the target table to be updated (or inserted, in the case of a
MERGE
).
DELETE_CONDITION: <delete_condition>
MERGE
MODE
, specifies a boolean condition, similar to that which you would declare in a
WHERE
clause, that must be met for GPSS to delete rows in the target table that meet the
MATCH_COLUMNS
criteria.
TRANSFORMER:
Optional. Output data transform block. An output data transformer is a user-defined function (UDF) that transforms the data before it is loaded into Tanzu Greenplum. The semantics of the UDF are transform-specific.
NoteGPSS currently supports specifying only one of the
MAPPING
or (UDF)TRANSFORMER
blocks in the load configuration file, not both.
TRANSFORM: <udf_transform_udf_name>
PROPERTIES: <udf_transform_property_name: property_value>
COLUMNS: <udf_transform_column_name>
MAPPING:
Optional. Overrides the default source-to-target column mapping. GPSS supports two mapping syntaxes.
NoteGPSS currently supports specifying only one of the
MAPPING
or (UDF)TRANSFORMER
blocks in the load configuration file, not both.
NoteWhen you specify a
MAPPING
, ensure that you provide a mapping for all RabbitMQ message data elements of interest. GPSS does not automatically match column names when you provide aMAPPING
.
NAME: <target_column_name>
EXPRESSION: { <source_column_name> | <expression> }
COLUMNS:NAME
(source_column_name) or an expression. When you specify an expression, you may provide a value expression that you would specify in the
SELECT
list of a query, such as a constant value, a column reference, an operator invocation, a built-in or user-defined function call, and so on.
<target_column_name>: { <source_column_name> | <expression> }
MAPPING
syntax, specify the target_column_name and {source_column_name | expression} as described above.
RABBITMQ:OUTPUTS: Options
NoteYou must specify only one of the
OUTPUT
orOUTPUTS
blocks. You cannot specify both.
TABLE: <table_name>
The name of a Tanzu Greenplum table into which GPSS loads the RabbitMQ data.
other options
RABBITMQ:METADATA: Options
SCHEMA: <metadata_schema_name>
metadata_schema_name
is
RABBITMQ:OUTPUT:SCHEMA
.
Tanzu Greenplum COMMIT: Options
COMMIT:
MAX_ROW
and
MINIMAL_INTERVAL
as long as both values are not zero (
0
). Try setting and tuning
MINIMAL_INTERVAL
to your environment; introduce a
MAX_ROW
setting only if you encounter high memory usage associated with message buffering.
SAVE_FAILING_BATCH: <boolean>
false
; GPSS does not use a backup table, and returns immediately when it encounters an expression error. When you set this property to
true
, GPSS writes both the good and the bad data in the batch to a backup table named
gpssbackup_<jobhash>
, and continues to process incoming messages. You must then manually load the good data from the backup table into Greenplum
or set
RECOVER_FAILING_BATCH
(Beta) to
true
to have GPSS automatically reload the good data.
NoteUsing a backup table to hedge against mapping errors may impact performance, especially when the data that you are loading has not been cleaned.
RECOVER_FAILING_BATCH: <boolean> (Beta)
true
and
SAVE_FAILING_BATCH
is also
true
, GPSS automatically reloads the good data in the batch and retains only the error data in the backup table. The default value is
false
; GPSS does not process the backup table.
NoteEnabling this property requires that GPSS has the Tanzu Greenplum privileges to create a function.
MAX_ROW: <number_of_rows>
INSERT
operation on the Tanzu Greenplum table. The default value of
MAX_ROW
is
0
, which instructs GPSS to ignore this commit trigger condition.
MINIMAL_INTERVAL: <wait_time>
INSERT
operation on the table. The default value is
5000
.
CONSISTENCY: { strong | at-least | at-most | none }
at-least
(GPSS stores the offsets before commit),
at-most
(GPSS stores the offsets after commit), and
none
. For streams, GPSS also supports
strong
consistency. The default value is
at-least
. Refer to
Understanding RabbitMQ Message Offset Management for more detailed information.
IDLE_DURATION: <idle_time>
The maximum amount of time to wait (milliseconds) for the first batch of data. When you use this property to enable lazy load, GPSS waits until RabbitMQ data is available before locking the target Greenplum table. You can specify:
0
(lazy load is deactivated)-1
(lazy load is activated, the job never stops), or0
.Tanzu Greenplum TASK: Options
TASK:
POST_BATCH_SQL: <udf_or_sql_to_run>
BATCH_INTERVAL: <num_batches>
PREPARE_SQL: <udf_or_sql_to_run>
TEARDOWN_SQL: <udf_or_sql_to_run>
RabbitMQ PROPERTIES: Options
PROPERTIES:
<rmq_property_name>
<rmq_property_value>
Job SCHEDULE: Options
SCHEDULE:
RETRY_INTERVAL: <retry_time>
d
), hour (
h
), minute (
m
), second (
s
), or millisecond (
ms
) integer units; do not mix units. The default retry interval is
5m
(5 minutes).
MAX_RETRIES: <num_retries>
RUNNING_DURATION: <run_time>
AUTO_STOP_RESTART_INTERVAL: <restart_time>
RUNNING_DURATION
.
MAX_RESTART_TIMES: <num_restarts>
RUNNING_DURATION
. The default is 0, do not restart the job. If you specify the value
-1
, GPSS restarts the job indefinitely. You may use
gpsscli stop
to stop the jobs from being restarted indefinitely.
QUIT_AT_EOF_AFTER: <clock_time>
clock_time
, even when GPSS encounters an EOF.
COMMAND: <command_to_run>
$GPSSJOB_NAME
,
$GPSSJOB_STATUS
, and
$GPSSJOB_DETAIL
.
WORKDIR: <directory>
TIMEOUT: <alert_time>
d
), hour (
h
), minute (
m
), or second (
s
) integer units; do not mix units. The default alert timeout is
-1s
(no timeout).
GPSS supports using template variables to specify property values in the load configuration file.
You specify a template variable value in the load configuration file as follows:
<PROPERTY>: {{<template_var>}}
For example:
MAX_RETRIES: {{numretries}}
GPSS substitutes the template variable with a value that you specify via the -p | --property template\_var=value
option to the gpsscli dryrun
, gpsscli submit
, or gpsscli load
command.
For example, if the command line specifies:
--property numretries=10
GPSS substitutes occurrences of {{numretries}}
in the load configuration file with the value 10
before submitting the job, and uses that value while the job is running.
If you created a database object name using a double-quoted identifier (delimited identifier), you must specify the delimited name within single quotes in the rabbitmq-v2.yaml
configuration file. For example, if you create a table as follows:
CREATE TABLE "MyTable" ("MyColumn" text);
Your rabbitmq-v2.yaml
YAML configuration file would refer to the above table and column names as:
COLUMNS:
- name: '"MyColumn"'
type: text
OUTPUT:
TABLE: '"MyTable"'
You can specify backslash escape sequences in the CSV DELIMITER
, QUOTE
, and ESCAPE
options. GPSS supports the standard backslash escape sequences for backspace, form feed, newline, carriage return, and tab, as well as escape sequences that you specify in hexadecimal format (prefaced with \x
). Refer to Backslash Escape Sequences in the PostgreSQL documentation for more information.
Load data from RabbitMQ as defined in the Version 2 configuration file named rmq2greenplumv2.yaml
:
gpsscli load rmq2greenplumv2.yaml
Example rmq2greenplumv2.yaml
configuration file:
DATABASE: testdb
USER: gpadmin
PASSWORD: changeme
HOST: mdw-1
PORT: 15432
VERSION: 2
RABBITMQ:
INPUT:
SOURCE:
SERVER: gpdmin:changeme@localhost:5672
QUEUE: test
VIRTUALHOST: gpadmin
DATA:
COLUMNS:
- NAME: c1
TYPE: int
- NAME: c2
TYPE: int
FORMAT: CSV
CSV_OPTION:
DELIMITER: ","
QUOTE: "'"
NULL_STRING: "NA"
ESCAPE: '\'
FORCE_NOT_NULL: "c1,c2"
FILL_MISSING_FIELDS: true
ERROR_LIMIT: 25
OUTPUT:
SCHEMA: "public"
TABLE: tbl_int_text_column
MODE: INSERT
MAPPING:
- NAME: c1
EXPRESSION: c1::int
- NAME: c2
EXPRESSION: c2::int
METADATA:
SCHEMA: staging_schema
COMMIT:
MAX_ROW: 1000
MINIMAL_INTERVAL: 200
PROPERTIES:
eof.when.idle: 1500
qos.prefetch.count: 10