GPSS load configuration file for a RabbitMQ data source (version 2).
DATABASE: <db_name>
USER: <user_name>
PASSWORD: <password>
HOST: <host>
PORT: <greenplum_port>
VERSION: 2
RABBITMQ:
INPUT:
SOURCE:
SERVER: <rmq_user>:<rmq_password>@<rmq_host>:<rmq_port>
VIRTUALHOST: <gpss_vhost>
{ STREAM: <name> | QUEUE: <name> }
[FALLBACK_OFFSET: { earliest | latest }]
DATA:
COLUMNS:
- NAME: { <column_name> | __IGNORED__ }
TYPE: <column_data_type>
[ ... ]
FORMAT: <value_data_format>
[[DELIMITED_OPTION:
DELIMITER: <delimiter_string>
[EOL_PREFIX: <prefix_string>]
[QUOTE: <quote_char>]
[ESCAPE: <escape_char>] ] |
[CSV_OPTION:
[DELIMITER: <delim_char>]
[QUOTE: <quote_char>]
[NULL_STRING: <nullstr_val>]
[ESCAPE: <escape_char>]
[FORCE_NOT_NULL: <columns>]
[FILL_MISSING_FIELDS: <boolean>]] |
[JSONL_OPTION:
[NEWLINE: <newline_str>]] |
[CUSTOM_OPTION:
NAME: <udf_name>
PARAMSTR: <udf_parameter_string>]]
[META:
COLUMNS:
- NAME: <meta_column_name>
TYPE: { json | jsonb }
FORMAT: json]
[TRANSFORMER:
PATH: <path_to_plugin_transform_library>
ON_INIT: <plugin_transform_init_name>
TRANSFORM: <plugin_transform_name>
PROPERTIES:
<plugin_transform_property_name>: <property_value>
[ ... ] ]
[FILTER: <filter_string>]
[ENCODING: <char_set>]
[ERROR_LIMIT: { <num_errors> | <percentage_errors> }]
OUTPUT:
[SCHEMA: <output_schema_name>]
TABLE: <table_name>
[FILTER: <output_filter_string>]
[MODE: <mode>]
[MATCH_COLUMNS:
- <match_column_name>
[ ... ]]
[ORDER_COLUMNS:
- <order_column_name>
[ ... ]]
[UPDATE_COLUMNS:
- <update_column_name>
[ ... ]]
[UPDATE_CONDITION: <update_condition>]
[DELETE_CONDITION: <delete_condition>]
[TRANSFORMER:
TRANSFORM: <udf_transform_udf_name>
PROPERTIES:
<udf_transform_property_name>: <property_value>
[ ... ]
COLUMNS:
- <udf_transform_column_name>
[ ... ] ]
[MAPPING:
- NAME: <target_column_name>
EXPRESSION: { <source_column_name> | <expression> }
[ ... ]
|
<target_column_name> : { <source_column_name> | <expression> }
[ ... ] ]
[METADATA:
[SCHEMA: <metadata_schema_name>]]
[COMMIT:
SAVE_FAILING_BATCH: <boolean>
RECOVER_FAILING_BATCH: <boolean> (Beta)
MAX_ROW: <num_rows>
MINIMAL_INTERVAL: <wait_time>
CONSISTENCY: { strong | at-least | at-most | none }
IDLE_DURATION: <idle_time> ]
[TASK:
POST_BATCH_SQL: <udf_or_sql_to_run>
BATCH_INTERVAL: <num_batches>
PREPARE_SQL: <udf_or_sql_to_run>
TEARDOWN_SQL: <udf_or_sql_to_run> ]
[PROPERTIES:
<rmq_property_name>: <rmq_property_value>
[ ... ]]
[SCHEDULE:
RETRY_INTERVAL: <retry_time>
MAX_RETRIES: <num_retries>
RUNNING_DURATION: <run_time>
AUTO_STOP_RESTART_INTERVAL: <restart_time>
MAX_RESTART_TIMES: <num_restarts>
QUIT_AT_EOF_AFTER: <clock_time>]
[ALERT:
COMMAND: <command_to_run>
WORKDIR: <directory>
TIMEOUT: <alert_time>]
Where you may specify any property value with a template variable that GPSS substitutes at runtime using the following syntax:
<PROPERTY:> {{<template_var>}}
You specify load configuration parameters for the gpsscli
utilities in a YAML-formatted configuration file. (This reference page uses the name rabbitmq-v2.yaml
when referring to this file; you may choose your own name for the file.) Load parameters include Greenplum Database connection and target table information, RabbitMQ data source information, and error and commit thresholds.
The gpsscli
utility processes the YAML configuration file in order, using indentation (spaces) to determine the document hierarchy and the relationships between the sections. The use of white space in the file is significant, and keywords are case-sensitive.
Greenplum Database Options
VERSION: 2
when you configure the
DATA
block in the file.
RABBITMQ:INPUT: Options
RabbitMQ input configuration parameters.
STREAM
or
QUEUE
.
STREAM
or
QUEUE
.
earliest
, GPSS automatically resumes a load operation from the earliest available published message. When set to
latest
, GPSS loads only new messages to the RabbitMQ stream.
The name of a data column. column_name must match the column name of the target Greenplum Database table. Specify __IGNORED__
to omit this RabbitMQ message data element from the load operation.
COLUMNS:NAME
with a column name in the target Greenplum Database
TABLE
. You can override the default mapping by specifying a
MAPPING
block.
The format of the RabbitMQ message data. You may specify a FORMAT
of binary
, csv
, custom
, delimited
, json
, or jsonl
for the data, with some restrictions.
binary
data format, you must define only a single
bytea
type column in
COLUMNS
.
csv
data format, the message content cannot contain line ending characters (CR and LF).
custom
data format, you must provide a
CUSTOM_OPTION
.
delimited
data format, you must provide a
DELIMITED_OPTION
.
json
data format, you must define only a single
json
type column in
COLUMNS
.
jsonl
data format, you may provide a
JSONL_OPTION
to define a newline character.
When you specify FORMAT: csv
, you may provide the following options:
,
).
null_string
(nothing between two delimiters), missing values are evaluated as zero-length strings.
false
, GPSS returns an error when it encounters a row with missing trailing field values.
true
, GPSS sets missing trailing field values to
NULL
. Blank rows, fields with a
NOT NULL
constraint, and trailing delimiters on a line will still generate an error.
Optional. When you specify FORMAT: custom
, you are required to provide the CUSTOM_OPTION
properties. This block identifies the name and the arguments of a custom formatter user-defined function.
Optional. When you specify FORMAT: jsonl
, you may choose to provide the JSONL_OPTION
properties.
"\n"
.
Optional. When you specify FORMAT: delimited
, you may choose to provide the DELIMITER_OPTION
properties.
delimited
format, delimiter_string is required and must identify the data element delimiter. delimiter_string may be a multi-byte value, and up to 32 bytes in length. It may not contain quote and escape characters.
\n
) that indicates the end of a row. The default prefix is empty.
json
or
jsonb
(Greenplum 6 only) type column and
FORMAT: json
.
The available RabbitMQ meta data properties for a streaming source include:
stream
(text) - the RabbitMQ stream nameoffset
(bigint) - the message offsetThe available RabbitMQ meta data properties for a queue source include:
queue
(text) - the RabbitMQ queue namemessageId
(text) - the message identifiercorrelationId
(text) - the correlation identifiertimestamp
(bitint) - the time that the message was added to the RabbitMQ queueInput data transform block. An input data transformer is a plugin, a set of go
functions that transform the data after it is read from the source. The semantics of the transform are function-specific. You specify the library and function names in this block, as well as the properties that GPSS passes to these functions:
true
, GPSS loads the message. If the filter evaluates to
false
, the message is dropped. filter_string must be a valid SQL conditional expression and may reference one or more
DATA
column names.
csv
,
custom
,
delimited
, or
json
format. GPSS supports the character sets identified in
Character Set Support in the Greenplum Database documentation.
gpsscli load
exits when this limit is reached. The default
ERROR_LIMIT
is zero; GPSS deactivates error logging and stops the load operation when it encounters the first error. Due to a limitation of the Greenplum Database external table framework, GPSS does not accept
ERROR_LIMIT: 1
.
RABBITMQ:OUTPUT: Options
public
schema.
true
, GPSS loads the message. If the filter evaluates to
false
, the message is dropped. output_filter_string must be a valid SQL conditional expression and may reference one or more
META
or
VALUE
column names.
The table load mode. Valid mode values are INSERT
, MERGE
, or UPDATE
. The default value is INSERT
.
UPDATE
- Updates the target table columns that are listed in UPDATE_COLUMNS
when the input columns identified in MATCH_COLUMNS
match the named target table columns and the optional UPDATE_CONDITION
is true.
UPDATE
is not supported if the target table column name is a reserved keyword, has capital letters, or includes any character that requires quotes (" ") to identify the column.
MERGE
- Inserts new rows and updates existing rows when:
UPDATE_COLUMNS
,MATCH_COLUMNS
target table column values are equal to the input data, andUPDATE_CONDITION
is specified and met.Deletes rows when:
MATCH_COLUMNS
target table column values are equal to the input data, andDELETE_CONDITION
is specified and met.New rows are identified when the MATCH_COLUMNS
value in the source data does not have a corresponding value in the existing data of the target table. In those cases, the entire row from the source file is inserted, not only the MATCH_COLUMNS
and UPDATE_COLUMNS
. If there are multiple new MATCH_COLUMNS
values in the input data that are the same, GPSS inserts or updates the target table using a random matching input row. When you specify ORDER_COLUMNS
, GPSS sorts the input data on the specified column(s) and inserts or updates from the input row with the largest value.
MERGE
is not supported if the target table column name is a reserved keyword, has capital letters, or includes any character that requires quotes (" ") to identify the column.
Required if MODE
is MERGE
or UPDATE
.
Optional. May be specified in MERGE
MODE
to sort the input data rows.
ORDER_COLUMNS
is used with
MATCH_COLUMNS
to determine the input row with the largest value; GPSS uses that row to write/update the target.
Required if MODE
is MERGE
or UPDATE
.
MATCH_COLUMNS
criteria and the optional
UPDATE_CONDITION
.
WHERE
clause, that must be met in order for a row in the target table to be updated (or inserted, in the case of a
MERGE
).
MERGE
MODE
, specifies a boolean condition, similar to that which you would declare in a
WHERE
clause, that must be met for GPSS to delete rows in the target table that meet the
MATCH_COLUMNS
criteria.
Optional. Output data transform block. An output data transformer is a user-defined function (UDF) that transforms the data before it is loaded into Greenplum Database. The semantics of the UDF are transform-specific.
NoteGPSS currently supports specifying only one of the
MAPPING
or (UDF)TRANSFORMER
blocks in the load configuration file, not both.
Optional. Overrides the default source-to-target column mapping. GPSS supports two mapping syntaxes.
NoteGPSS currently supports specifying only one of the
MAPPING
or (UDF)TRANSFORMER
blocks in the load configuration file, not both.
NoteWhen you specify a
MAPPING
, ensure that you provide a mapping for all RabbitMQ message data elements of interest. GPSS does not automatically match column names when you provide aMAPPING
.
COLUMNS:NAME
(source_column_name) or an expression. When you specify an expression, you may provide a value expression that you would specify in the
SELECT
list of a query, such as a constant value, a column reference, an operator invocation, a built-in or user-defined function call, and so on.
MAPPING
syntax, specify the target_column_name and {source_column_name | expression} as described above.
RABBITMQ:METADATA: Options
RABBITMQ:OUTPUT:SCHEMA
.
Greenplum Database COMMIT: Options
Controls how GPSS commits a batch of data to Greenplum Database. You may specify both MAX_ROW
and MINIMAL_INTERVAL
as long as both values are not zero (0
). Try setting and tuning MINIMAL_INTERVAL
to your environment; introduce a MAX_ROW
setting only if you encounter high memory usage associated with message buffering.
false
; GPSS does not use a backup table, and returns immediately when it encounters an expression error. When you set this property to
true
, GPSS writes both the good and the bad data in the batch to a backup table named
gpssbackup_<jobhash>
, and continues to process incoming messages. You must then manually load the good data from the backup table into Greenplum
or set
RECOVER_FAILING_BATCH
(Beta) to
true
to have GPSS automatically reload the good data.
NoteUsing a backup table to hedge against mapping errors may impact performance, especially when the data that you are loading has not been cleaned.
true
and
SAVE_FAILING_BATCH
is also
true
, GPSS automatically reloads the good data in the batch and retains only the error data in the backup table. The default value is
false
; GPSS does not process the backup table.
NoteEnabling this property requires that GPSS has the Greenplum Database privileges to create a function.
INSERT
operation on the Greenplum Database table. The default value of
MAX_ROW
is
0
, which instructs GPSS to ignore this commit trigger condition.
INSERT
operation on the table. The default value is
5000
.
at-least
(GPSS stores the offsets before commit),
at-most
(GPSS stores the offsets after commit), and
none
. For streams, GPSS also supports
strong
consistency. The default value is
at-least
. Refer to
Understanding RabbitMQ Message Offset Management for more detailed information.
The maximum amount of time to wait (milliseconds) for the first batch of data. When you use this property to enable lazy load, GPSS waits until RabbitMQ data is available before locking the target Greenplum table. You can specify:
0
(lazy load is deactivated)-1
(lazy load is activated, the job never stops), or0
.Greenplum Database TASK: Options
Controls the running and scheduling of a periodic (maintenance) task.
RabbitMQ PROPERTIES: Options
Job SCHEDULE: Options
Controls the frequency and interval of restarting jobs.
d
), hour (
h
), minute (
m
), second (
s
), or millisecond (
ms
) integer units; do not mix units. The default retry interval is
5m
(5 minutes).
RUNNING_DURATION
.
RUNNING_DURATION
. The default is 0, do not restart the job.
clock_time
, even when GPSS encounters an EOF.
Controls notification when a job is stopped for any reason (success, completion, error, user-initiated stop).
$GPSSJOB_NAME
,
$GPSSJOB_STATUS
, and
$GPSSJOB_DETAIL
.
d
), hour (
h
), minute (
m
), or second (
s
) integer units; do not mix units. The default alert timeout is
-1s
(no timeout).
GPSS supports using template variables to specify property values in the load configuration file.
You specify a template variable value in the load configuration file as follows:
<PROPERTY>: {{<template_var>}}
For example:
MAX_RETRIES: {{numretries}}
GPSS substitutes the template variable with a value that you specify via the -p | --property template\_var=value
option to the gpsscli dryrun
, gpsscli submit
, or gpsscli load
command.
For example, if the command line specifies:
--property numretries=10
GPSS substitutes occurrences of {{numretries}}
in the load configuration file with the value 10
before submitting the job, and uses that value while the job is running.
If you created a database object name using a double-quoted identifier (delimited identifier), you must specify the delimited name within single quotes in the rabbitmq-v2.yaml
configuration file. For example, if you create a table as follows:
CREATE TABLE "MyTable" ("MyColumn" text);
Your rabbitmq-v2.yaml
YAML configuration file would refer to the above table and column names as:
COLUMNS:
- name: '"MyColumn"'
type: text
OUTPUT:
TABLE: '"MyTable"'
You can specify backslash escape sequences in the CSV DELIMITER
, QUOTE
, and ESCAPE
options. GPSS supports the standard backslash escape sequences for backspace, form feed, newline, carriage return, and tab, as well as escape sequences that you specify in hexadecimal format (prefaced with \x
). Refer to Backslash Escape Sequences in the PostgreSQL documentation for more information.
Load data from RabbitMQ as defined in the Version 2 configuration file named rmq2greenplumv2.yaml
:
gpsscli load rmq2greenplumv2.yaml
Example rmq2greenplumv2.yaml
configuration file:
DATABASE: testdb
USER: gpadmin
PASSWORD: changeme
HOST: mdw-1
PORT: 15432
VERSION: 2
RABBITMQ:
INPUT:
SOURCE:
SERVER: gpdmin:changeme@localhost:5672
QUEUE: test
VIRTUALHOST: gpadmin
DATA:
COLUMNS:
- NAME: c1
TYPE: int
- NAME: c2
TYPE: int
FORMAT: CSV
CSV_OPTION:
DELIMITER: ","
QUOTE: "'"
NULL_STRING: "NA"
ESCAPE: '\'
FORCE_NOT_NULL: "c1,c2"
FILL_MISSING_FIELDS: true
ERROR_LIMIT: 25
OUTPUT:
SCHEMA: "public"
TABLE: tbl_int_text_column
MODE: INSERT
MAPPING:
- NAME: c1
EXPRESSION: c1::int
- NAME: c2
EXPRESSION: c2::int
METADATA:
SCHEMA: staging_schema
COMMIT:
MAX_ROW: 1000
MINIMAL_INTERVAL: 200
PROPERTIES:
eof.when.idle: 1500
qos.prefetch.count: 10