GPSS load configuration file for a RabbitMQ data source (version 3).
version: v3
targets:
- gpdb:
host: <host>
port: <greenplum_port>
user: <user_name>
password: <password>
database: <db_name>
work_schema: <work_schema_name>
error_limit: <num_errors> | <percentage_errors>
filter_expression: <filter_string>
tables:
- table: <table_name>
schema: <schema_name>
mode:
# specify a single mode property block (described below)
insert: {}
update:
<mode_specific_property>: <value>
...
merge:
<mode_specific_property>: <value>
...
transformer:
transform: <udf_transform_udf_name>
properties:
<udf_transform_property_name>: <property_value>
...
columns:
- <udf_transform_column_name>
...
mapping:
<target_column_name> : <source_column_name> | <expression>
...
filter: <output_filter_string>
sources:
- rabbitmq:
server: <rmq_user>:<rmq_password>@<rmq_host>:<rmq_port>
vhost: <gpss_vhost>
stream: <name> | queue: <name>
consistency: strong | at-least | at-most | none
fallback_offset: earliest | latest
save_failing_batch: <boolean>
recover_failing_batch: <boolean> (Beta)
data_content:
<data_format>:
<column_spec>
<other_props>
meta:
json:
column:
name: meta
type: json
encoding: <char_set>
transformer:
path: <path_to_plugin_transform_library>
on_init: <plugin_transform_init_name>
transform: <plugin_transform_name>
properties:
<plugin_transform_property_name>: <property_value>
...
properties:
<rmq_property_name>: <rmq_property_value>
...
task:
batch_size:
max_count: <number_of_rows>
interval_ms: <wait_time>
idle_duration_ms: <idle_time>
window_size: <num_batches>
window_statement: <udf_or_sql_to_run>
prepare_statement: <udf_or_sql_to_run>
teardown_statement: <udf_or_sql_to_run>
option:
schedule:
max_retries: <num_retries>
retry_interval: <retry_time>
running_duration: <run_time>
auto_stop_restart_interval: <restart_time>
max_restart_times: <num_restarts>
quit_at_eof_after: <clock_time>
alert:
command: <command_to_run>
workdir: <directory>
timeout: <alert_time>
Where the mode_specific_propertys that you can specify for update
and merge
mode follow:
update:
match_columns: [<match_column_names>]
order_columns: [<order_column_names>]
update_columns: [<update_column_names>]
update_condition: <update_condition>
merge:
match_columns: [<match_column_names>]
update_columns: [<update_column_names>]
order_columns: [<order_column_names>]
update_condition: <update_condition>
delete_condition: <delete_condition>
Where data_format, column_spec, and other_props are one of the following blocks
binary:
source_column_name: <column_name>
csv:
columns:
- name: <column_name>
type: <column_data_type>
...
delimiter: <delim_char>
quote: <quote_char>
null_string: <nullstr_val>
escape: <escape_char>
force_not_null: <columns>
fill_missing_fields: <boolean>
custom:
columns:
- name: <column_name>
type: <column_data_type>
...
name: <formatter_name>
options:
- <optname>=<optvalue>
...
delimited:
columns:
- name: <column_name>
type: <column_data_type>
...
delimiter: <delimiter_string>
eol_prefix: <prefix_string>
quote: <quote_char>
escape: <escape_char>
json:
column:
name: <column_name>
type: json | jsonb
is_jsonl: <boolean>
newline: <newline_str>
And where you may specify any property value with a template variable that GPSS substitutes at runtime using the following syntax:
<property:> {{<template_var>}}
NoteVersion 3 of the GPSS load configuration file is different in both content and format than previous versions of the file. Certain symbols used in the GPSS version 1 and 2 configuration file reference page syntax have different meanings in version 3 syntax:
- Brackets
[]
are literal and are used to specify a list in version 3. They are no longer used to signify the optionality of a property.- Curly braces
{}
are literal and are used to specify YAML mappings in version 3 syntax. They are no longer used with the pipe symbol (|
) to identify a list of choices.
You specify load configuration properties for a Greenplum Streaming Server (GPSS) RabbitMQ load job in a YAML-formatted configuration file. (This reference page uses the name rabbitmq-v3.yaml
when referring to this file; you may choose your own name for the file.) Load properties include Greenplum Database connection and data import properties, RabbitMQ data source information, and properies specific to the GPSS job.
The gpsscli
utilities process the YAML configuration file in order, using indentation (spaces) to determine the document hierarchy and the relationships between the sections. The use of white space in the file is significant. Keywords are not case-sensitive.
version Property
version: v3
.
targets:gpdb Properties
public
.
true
, GPSS loads the message. If the filter evaluates to
false
, the message is dropped. filter_string must be a valid SQL conditional expression and may reference one or more source value or meta column names.
NoteGPSS supports loading from a RabbitMQ data source into a single Greenplum Database table only.
public
schema.
insert
,
merge
, or
update
. The default mode is
insert
.
Note
update
andmerge
are not supported if the target table column name is a reserved keyword, has capital letters, or includes any character that requires quotes (" ") to identify the column.
update_columns
when the input columns identified in
match_columns
match the named target table columns and the optional
update_condition
is true.
update_columns
,match_columns
target table column values are equal to the input data, andupdate_condition
is specified and met.Deletes rows when:
match_columns
target table column values are equal to the input data, anddelete_condition
is specified and met.New rows are identified when the match_columns
value in the source data does not have a corresponding value in the existing data of the target table. In those cases, the entire row from the source file is inserted, not only the match_columns
and update_columns
. If there are multiple new match_columns
values in the input data that are the same, GPSS inserts or updates the target table using a random matching input row. When you specify order_columns
, GPSS sorts the input data on the specified column(s) and inserts or updates from the input row with the largest value.
mode
supports one or more of the following properties as specified in the Synopsis.
mode
is
merge
or
update
.
order_columns
is used with
match_columns
to determine the input row with the largest value; GPSS uses that row to write/update the target.
merge
mode
to sort the input data rows.
match_columns
criteria and the optional
update_condition
.
mode
is
merge
or
update
.
WHERE
clause, that must be met in order for a row in the target table to be updated (or inserted, in the case of a
merge
). Optional.
merge
mode
, specifies a boolean condition, similar to that which you would declare in a
WHERE
clause, that must be met for GPSS to delete rows in the target table that meet the
match_columns
criteria. Optional.
Optional. Output data transform block. An output data transformer is a user-defined function (UDF) that transforms the data before it is loaded into Greenplum Database. The semantics of the UDF are transform-specific.
NoteGPSS currently supports specifying only one of the
mapping
or (UDF)transformer
blocks in the load configuration file, not both.
Optional. Overrides the default source-to-target column mapping.
NoteGPSS currently supports specifying only one of the
mapping
or (UDF)transformer
blocks in the load configuration file, not both.
NoteWhen you specify a
mapping
, ensure that you provide a mapping for all source data elements of interest. GPSS does not automatically match column names when you provide amapping
block.
SELECT
list of a query, such as a constant value, a column reference, an operator invocation, a built-in or user-defined function call, and so on.
true
, GPSS loads the message. If the filter evaluates to
false
, the message is dropped. output_filter_string must be a valid SQL conditional expression and may reference one or more
META
or
VALUE
column names.
sources:rabbitmq: Options
stream
or
queue
.
stream
or
queue
.
at-least
(GPSS stores the offsets before commit),
at-most
(GPSS stores the offsets after commit), and
none
. For streams, GPSS also supports
strong
consistency. The default value is
at-least
. Refer to
Understanding RabbitMQ Message Offset Management for more detailed information.
earliest
, GPSS automatically resumes a load operation from the earliest available published message. When set to
latest
, GPSS loads only new messages to the RabbitMQ stream.
false
; GPSS does not use a backup table, and returns immediately when it encounters an expression error. When you set this property to
true
, GPSS writes both the good and the bad data in the batch to a backup table named
gpssbackup_<jobhash>
, and continues to process incoming data. You must then manually load the good data from the backup table into Greenplum
or set
recover_failing_batch
(Beta) to
true
to have GPSS automatically reload the good data.
NoteUsing a backup table to hedge against mapping errors may impact performance, especially when the data that you are loading has not been cleaned.
true
and
save_failing_batch
is also
true
, GPSS automatically reloads the good data in the batch and retains only the error data in the backup table. The default value is
false
; GPSS does not process the backup table.
NoteEnabling this property requires that GPSS has the Greenplum Database privileges to create a function.
The source to Greenplum column mapping. The supported column specification differs for different data formats as described below.
source_column_name
,
column:name
, or
columns:name
with a column name in the target Greenplum Database
table
. You can override the default mapping by specifying a
mapping:
block.
binary
,
csv
,
custom
,
delimited
, or
json
for the value, with some restrictions.
binary
data format, GPSS reads the data into a single
bytea
-type column.
bytea
-type column into which GPSS reads the value data.
csv
data format, GPSS reads the data into the list of columns that you specify. The message content cannot contain line ending characters (CR and LF).
[]
specifies all columns.
,
).
null_string
(nothing between two delimiters), missing values are evaluated as zero-length strings.
false
, GPSS returns an error when it encounters a row with missing trailing field values.
If set to true
, GPSS sets missing trailing field values to NULL
. Blank rows, fields with a NOT NULL
constraint, and trailing delimiters on a line will still generate an error.
custom
data format, GPSS uses the custom formatter that you specify to process the input data before writing it to Greenplum Database.
[]
specifies all columns.
custom
data format, formatter_name is required and must identify the name of the formatter user-defined function that GPSS should use when loading the data.
delimited
data format, GPSS reads the data into the list of columns that you specify. You must specify the data
delimiter
.
[]
specifies all columns.
delimited
data format, delimiter_string is required and must identify the data element delimiter. delimiter_string may be a multi-byte value, and up to 32 bytes in length. It may not contain quote and escape characters.
\n
) that indicates the end of a row. The default prefix is empty.
delimiter
,
eol_prefix
,
quote
, or
escape
itself). Therdefault escape character is empty.
json
data format, GPSS can read the data as a single JSON object or as a single JSON record per line.
false
, GPSS reads the JSON data as a single object.
"\n"
.
meta:
must specify the
json
or
jsonb
(Greenplum 6 only) data format, and a single
json
-type column.
The available RabbitMQ meta data properties for a streaming source include:
stream
(text) - the RabbitMQ stream nameoffset
(bigint) - the message offsetThe available RabbitMQ meta data properties for a queue source include:
queue
(text) - the RabbitMQ queue namemessageId
(text) - the message identifiercorrelationId
(text) - the correlation identifiertimestamp
(bitint) - the time that the message was added to the RabbitMQ queueYou can load any of these properties into the target table with a mapping
, or use a property in the update or merge criteria for a load operation.
csv
,
custom
,
delimited
, or
json
format. GPSS supports the character sets identified in
Character Set Support in the Greenplum Database documentation.
Input data transform block. An input data transformer is a plugin, a set of go
functions that transform the data after it is read from the source. The semantics of the transform are function-specific. You specify the library and function names in this block, as well as the properties that GPSS passes to these functions:
RabbitMQ configuration property names and values.
The batch size and commit window.
max_count
and
interval_ms
as long as both values are not zero (
0
). Try setting and tuning
interval_ms
to your environment; introduce a
max_count
setting only if you encounter high memory usage associated with message buffering.
INSERT
operation on the Greenplum Database table. The default value of
max_count
is
0
, which instructs GPSS to ignore this commit trigger condition.
INSERT
operation on the table. The default value is
5000
.
0
(lazy load is deactivated)-1
(lazy load is activated, the job never stops), orThe default value is 0
.
window_statement
. The default batch interval is 0.
window_size
number of batches. The default is null, no command to run.
option: Properties
Controls the frequency and interval of restarting jobs.
d
), hour (
h
), minute (
m
), second (
s
), or millisecond (
ms
) integer units; do not mix units. The default retry interval is
5m
(5 minutes).
running_duration
.
running_duration
. The default is 0, do not restart the job.
clock_time
, even when GPSS encounters an EOF.
Controls notification when a job is stopped for any reason (success, completion, error, user-initiated stop).
$GPSSJOB_NAME
,
$GPSSJOB_STATUS
, and
$GPSSJOB_DETAIL
.
d
), hour (
h
), minute (
m
), or second (
s
) integer units; do not mix units. The default alert timeout is
-1s
(no timeout).
GPSS supports using template variables to specify property values in the load configuration file.
You specify a template variable value in the load configuration file as follows:
<property>: {{<template_var>}}
For example:
max_retries: {{numretries}}
GPSS substitutes the template variable with a value that you specify via the -p | --property <template_var=value>
option to the gpsscli dryrun
, gpsscli submit
, or gpsscli load
command.
For example, if the command line specifies:
--property numretries=10
GPSS substitutes occurrences of {{numretries}}
in the load configuration file with the value 10
before submitting the job, and uses that value while the job is running.
If you created a database object name using a double-quoted identifier (delimited identifier), you must specify the delimited name within single quotes in the load configuration file. For example, if you create a table as follows:
CREATE TABLE "MyTable" (c1 text);
Your YAML configuration file would refer to the table name as:
targets:
- gpdb:
tables:
- table: '"MyTable"'
You can specify backslash escape sequences in the CSV delimiter
, quote
, and escape
options. GPSS supports the standard backslash escape sequences for backspace, form feed, newline, carriage return, and tab, as well as escape sequences that you specify in hexadecimal format (prefaced with \x
). Refer to Backslash Escape Sequences in the PostgreSQL documentation for more information.
Load data from RabbitMQ as defined in the Version 3 configuration file named loadfromrmq_v3.yaml
:
gpsscli load loadfromrmq_v3.yaml
Example loadfromrmq_v3.yaml
configuration file:
version: v3
targets:
- gpdb:
host: mdw-1
port: 15432
user: gpadmin
password: changeme
database: testdb
work_schema: public
error_limit: 25
tables:
- table: tbl1
schema: public
mode:
insert {}
sources:
- rabbitmq:
server: gpadmin:changeme@mdw-1:5672
queue: test
vhost: gpadmin
data_content:
csv:
columns: []
delimiter: ","
quote: "'"
null_string: "NA"
escape: '\'
force_not_null: "c1,c2"
fill_missing_fields: true
properties:
eof.when.idle: 1500
option:
schedule: {}