GPSS load configuration file for a RabbitMQ data source (version 2).

Synopsis

DATABASE: <db_name>
USER: <user_name>
PASSWORD: <password>
HOST: <host>
PORT: <greenplum_port>
VERSION: 2
RABBITMQ:
   INPUT:
      SOURCE:
        SERVER: <rmq_user>:<rmq_password>@<rmq_host>:<rmq_port>
        VIRTUALHOST: <gpss_vhost>
        { STREAM: <name> | QUEUE: <name> }
      DATA:
        COLUMNS:
           - NAME: { <column_name> | __IGNORED__ }
             TYPE: <column_data_type>
           [ ... ]
         FORMAT: <value_data_format>
         [[DELIMITED_OPTION:
            DELIMITER: <delimiter_string>
            [EOL_PREFIX: <prefix_string>]
            [QUOTE: <quote_char>]
            [ESCAPE: <escape_char>] ] |
         [CSV_OPTION:
            [DELIMITER: <delim_char>]
            [QUOTE: <quote_char>]
            [NULL_STRING: <nullstr_val>]
            [ESCAPE: <escape_char>]
            [FORCE_NOT_NULL: <columns>]
            [FILL_MISSING_FIELDS: <boolean>]] |
         [JSONL_OPTION:
            [NEWLINE: <newline_str>]] |
         [CUSTOM_OPTION:
            NAME: <udf_name>
            PARAMSTR: <udf_parameter_string>]]
      [META:
         COLUMNS:
            - NAME: <meta_column_name>
              TYPE: { json | jsonb }
         FORMAT: json]
      [FILTER: <filter_string>]
      [ERROR_LIMIT: { <num_errors> | <percentage_errors> }]
   OUTPUT:
      [SCHEMA: <output_schema_name>]
      TABLE: <table_name>
      [MODE: <mode>]
      [MATCH_COLUMNS: 
         - <match_column_name>
         [ ... ]]
      [ORDER_COLUMNS: 
         - <order_column_name>
         [ ... ]]
      [UPDATE_COLUMNS: 
         - <update_column_name>
         [ ... ]]
      [UPDATE_CONDITION: <update_condition>]
      [DELETE_CONDITION: <delete_condition>]
      [MAPPING: 
         - NAME: <target_column_name>
           EXPRESSION: { <source_column_name> | <expression> } 
         [ ... ]
           |
         <target_column_name> : { <source_column_name> | <expression> }
         [ ... ] ]
   [METADATA:
      [SCHEMA: <metadata_schema_name>]]
   [COMMIT:
      SAVE_FAILING_BATCH: <boolean>
      RECOVER_FAILING_BATCH: <boolean> (Beta)
      MAX_ROW: <num_rows>
      MINIMAL_INTERVAL: <wait_time>
      CONSISTENCY: { at-least | at-most | none }
      IDLE_DURATION: <idle_time> ]
   [TASK:
      POST_BATCH_SQL: <udf_or_sql_to_run>
      BATCH_INTERVAL: <num_batches>
      PREPARE_SQL: <udf_or_sql_to_run>
      TEARDOWN_SQL: <udf_or_sql_to_run> ]
   [PROPERTIES:
      <rmq_property_name>: <rmq_property_value>
      [ ... ]]
[SCHEDULE:
   RETRY_INTERVAL: <retry_time>
   MAX_RETRIES: <num_retries>
   RUNNING_DURATION: <run_time>
   AUTO_STOP_RESTART_INTERVAL: <restart_time>
   MAX_RESTART_TIMES: <num_restarts>
   QUIT_AT_EOF_AFTER: <clock_time>]

Where you may specify any property value with a template variable that GPSS substitutes at runtime using the following syntax:

<PROPERTY:> {{<template_var>}}

Description

You specify load configuration parameters for the gpsscli utilities in a YAML-formatted configuration file. (This reference page uses the name rabbitmq-v2.yaml when referring to this file; you may choose your own name for the file.) Load parameters include Greenplum Database connection and target table information, RabbitMQ data source information, and error and commit thresholds.

The gpsscli utility processes the YAML configuration file in order, using indentation (spaces) to determine the document hierarchy and the relationships between the sections. The use of white space in the file is significant, and keywords are case-sensitive.

Keywords and Values

Greenplum Database Options

DATABASE: db_name
The name of the Greenplum database.
USER: user_name
The name of the Greenplum Database user/role. This user_name must have permissions as described in the Greenplum Streaming Server documentation.
PASSWORD: password
The password for the Greenplum Database user/role.
HOST: host
The host name or IP address of the Greenplum Database master host.
PORT: greenplum_port
The port number of the Greenplum Database server on the master host.
VERSION: 2
The version of the GPSS configuration file. You must specify VERSION: 2 when you configure the DATA block in the file.

RABBITMQ:INPUT: Options

SOURCE

RabbitMQ input configuration parameters.

SERVER: rmq_user:rmq_password@rmq_host:rmq_port
The RabbitMQ server connection string; includes the user name with which RabbitMQ logs in to the broker, the password for rmq_user, the hostname or IP address of the RabbitMQ server, and the port number on which the RabbitMQ server is listening.
VIRTUALHOST: gpss_vhost
The RabbitMQ virtual host that represents the GPSS server.
STREAM: name
The name of the RabbitMQ stream from which to read the data. You may specify only one of STREAM or QUEUE.
QUEUE: name
The name of the RabbitMQ queue from which to read the data. You may specify only one of STREAM or QUEUE.
DATA:
The RabbitMQ message value field names, data types, and format. You must specify all RabbitMQ data elements in the order in which they appear in the RabbitMQ message.
COLUMNS:NAME: column_name

The name of a data column. column_name must match the column name of the target Greenplum Database table. Specify __IGNORED__ to omit this RabbitMQ message data element from the load operation.

The default source-to-target data mapping behaviour of GPSS is to match a column name as defined in COLUMNS:NAME with a column name in the target Greenplum Database TABLE. You can override the default mapping by specifying a MAPPING block.
COLUMNS:TYPE: data_type
The data type of the column. You must specify an equivalent data type for each non-ignored RabbitMQ message data element and the associated Greenplum Database table column.
FORMAT: data_format

The format of the RabbitMQ message data. You may specify a FORMAT of binary, csv, custom, delimited, json, or jsonl for the data, with some restrictions.

binary
When you specify the binary data format, you must define only a single bytea type column in COLUMNS.
csv
When you specify the csv data format, the message content cannot contain line ending characters (CR and LF).
custom
When you specify the custom data format, you must provide a CUSTOM_OPTION.
delimited
When you specify the delimited data format, you must provide a DELIMITED_OPTION.
json
When you specify the json data format, you must define only a single json type column in COLUMNS.
jsonl
When you specify the jsonl data format, you may provide a JSONL_OPTION to define a newline character.
CSV_OPTION

When you specify FORMAT: csv, you may provide the following options:

DELIMITER: delim_char
Specifies a single ASCII character that separates columns within each message or row of data. The default delimiter is a comma ( ,).
QUOTE: quote_char
Specifies the quotation character. Because GPSS does not provide a default value for this property, you must specify a value.
NULL_STRING: nullstr_val
Specifies the string that represents the null value. Because GPSS does not specify a default value for this property, you must specify a value.
ESCAPE: escape_char
Specifies the single character that is used for escaping data characters in the content that might otherwise be interpreted as row or column delimiters. Make sure to choose an escape character that is not used anywhere in your actual column data. Because GPSS does not provide a default value for this property. you must specify a value.
FORCE_NOT_NULL: columns
Specifies a comma-separated list of column names to process as though each column were quoted and hence not a NULL value. For the default null_string (nothing between two delimiters), missing values are evaluated as zero-length strings.
FILL_MISSING_FIELDS: boolean
Specifies the action of GPSS when it reads a row of data that has missing trailing field values (the row has missing data fields at the end of a line or row). The default value is false, GPSS returns an error when it encounters a row with missing trailing field values.
If set to true, GPSS sets missing trailing field values to NULL. Blank rows, fields with a NOT NULL constraint, and trailing delimiters on a line will still generate an error.
CUSTOM_OPTION

Optional. When you specify FORMAT: custom, you are required to provide the CUSTOM_OPTION properties. This block identifies the name and the arguments of a custom formatter user-defined function.

NAME: udf_name
The name of the custom formatter user-defined function.
PARAMSTR: udf_parameter_string
A string specifying the comma-separated list of arguments to pass to the custom formatter user-defined function.
JSONL_OPTION

Optional. When you specify FORMAT: jsonl, you may choose to provide the JSONL_OPTION properties.

NEWLINE: newline_str
A string that specifies the new line character(s) that end each JSON record. The default newline is "\n".
DELIMITED_OPTION

Optional. When you specify FORMAT: delimited, you may choose to provide the DELIMITER_OPTION properties.

DELIMITER: delimiter_string
When you specify the delimited format, delimiter_string is required and must identify the data element delimiter. delimiter_string may be a multi-byte value, and up to 32 bytes in length. It may not contain quote and escape characters.
EOL_PREFIX: prefix_string
Specifies the prefix before the end of line character ( \n) that indicates the end of a row. The default prefix is empty.
QUOTE: quote_char
Specifies the single ASCII quotation character. The default quote character is empty.
If you do not specify a quotation character, GPSS assumes that all columns are unquoted. If you do not specify a quotation character and do specify an escape character, GPSS assumes that all columns are unquoted and escapes the delimiter, end-of-line prefix, and escape itself.
When you specify a quotation character, you must specify an escape character. GPSS reads any content between quote characters as-is, except for escaped characters.
ESCAPE: escape_char
Specifies the single ASCII character used to escape special characters (for example, the delimiter, end-of-line prefix, quote, or escape itself). Therdefault escape character is empty.
When you specify an escape character and do not specify a quotation character, GPSS escapes only the delimiter, end-of-line prefix, and escape itself.
When you specify both an escape character and a quotation character, GPSS escapes only these characters.
META:
The field name, type, and format of the RabbitMQ meta data. META must specify a single json or jsonb (Greenplum 6 only) type column and FORMAT: json.

The available RabbitMQ meta data properties for a streaming source include:

  • stream (text) - the RabbitMQ stream name
  • offset (bigint) - the message offset

The available RabbitMQ meta data properties for a queue source include:

  • queue (text) - the RabbitMQ queue name
  • messageId (text) - the message identifier
  • correlationId (text) - the correlation identifier
  • timestamp (bitint) - the time that the message was added to the RabbitMQ queue
FILTER: filter_string
The filter to apply to the RabbitMQ input messages before GPSS loads the data into Greenplum Database. If the filter evaluates to true, GPSS loads the message. If the filter evaluates to false, the message is dropped. filter_string must be a valid SQL conditional expression and may reference one or more DATA column names.
ERROR_LIMIT: { num_errors | percentage_errors }
The error threshold, specified as either an absolute number or a percentage. gpsscli load exits when this limit is reached. The default ERROR_LIMIT is zero; GPSS disables error logging and stops the load operation when it encounters the first error. Due to a limitation of the Greenplum Database external table framework, GPSS does not accept ERROR_LIMIT: 1.

RABBITMQ:OUTPUT: Options

SCHEMA: output_schema_name
The name of the Greenplum Database schema in which table_name resides. Optional, the default schema is the public schema.
TABLE: table_name
The name of the Greenplum Database table into which GPSS loads the RabbitMQ data.
MODE: mode

The table load mode. Valid mode values are INSERT, MERGE, or UPDATE. The default value is INSERT.

UPDATE - Updates the target table columns that are listed in UPDATE_COLUMNS when the input columns identified in MATCH_COLUMNS match the named target table columns and the optional UPDATE_CONDITION is true.

UPDATE is not supported if the target table column name is a reserved keyword, has capital letters, or includes any character that requires quotes (" ") to identify the column.

MERGE - Inserts new rows and updates existing rows when:

  • columns are listed in UPDATE_COLUMNS,
  • the MATCH_COLUMNS target table column values are equal to the input data, and
  • an optional UPDATE_CONDITION is specified and met.

Deletes rows when:

  • the MATCH_COLUMNS target table column values are equal to the input data, and
  • an optional DELETE_CONDITION is specified and met.

New rows are identified when the MATCH_COLUMNS value in the source data does not have a corresponding value in the existing data of the target table. In those cases, the entire row from the source file is inserted, not only the MATCH_COLUMNS and UPDATE_COLUMNS. If there are multiple new MATCH_COLUMNS values in the input data that are the same, GPSS inserts or updates the target table using a random matching input row. When you specify ORDER_COLUMNS, GPSS sorts the input data on the specified column(s) and inserts or updates from the input row with the largest value.

MERGE is not supported if the target table column name is a reserved keyword, has capital letters, or includes any character that requires quotes (" ") to identify the column.
MATCH_COLUMNS:

Required if MODE is MERGE or UPDATE.

match_column_name
Specifies the column(s) to use as the join condition for the update. The attribute value in the specified target column(s) must be equal to that of the corresponding source data column(s) in order for the row to be updated in the target table.
ORDER_COLUMNS:

Optional. May be specified in MERGE MODE to sort the input data rows.

order_column_name
Specify the column(s) by which GPSS sorts the rows. When multiple matching rows exist in a batch, ORDER_COLUMNS is used with MATCH_COLUMNS to determine the input row with the largest value; GPSS uses that row to write/update the target.
UPDATE_COLUMNS:

Required if MODE is MERGE or UPDATE.

update_column_name
Specifies the column(s) to update for the rows that meet the MATCH_COLUMNS criteria and the optional UPDATE_CONDITION.
UPDATE_CONDITION: update_condition
Optional. Specifies a boolean condition, similar to that which you would declare in a WHERE clause, that must be met in order for a row in the target table to be updated (or inserted, in the case of a MERGE).
DELETE_CONDITION: delete_condition
Optional. In MERGE MODE, specifies a boolean condition, similar to that which you would declare in a WHERE clause, that must be met for GPSS to delete rows in the target table that meet the MATCH_COLUMNS criteria.
MAPPING:
Optional. Overrides the default source-to-target column mapping. GPSS supports two mapping syntaxes.

Note: When you specify a MAPPING, ensure that you provide a mapping for all RabbitMQ message data elements of interest. GPSS does not automatically match column names when you provide a MAPPING.

NAME: target_column_name
Specifies the target Greenplum Database table column name.
EXPRESSION: { source_column_name | expression }
Specifies a RabbitMQ COLUMNS:NAME (source_column_name) or an expression. When you specify an expression, you may provide a value expression that you would specify in the SELECT list of a query, such as a constant value, a column reference, an operator invocation, a built-in or user-defined function call, and so on.
target_column_name: { source_column_name | expression }
When you use this MAPPING syntax, specify the target_column_name and {source_column_name | expression} as described above.

RABBITMQ:METADATA: Options

SCHEMA: metadata_schema_name
The name of the Greenplum Database schema in which GPSS creates external tables. The default metadata_schema_name is RABBITMQ:OUTPUT:SCHEMA.

Greenplum Database COMMIT: Options

COMMIT:

Controls how GPSS commits a batch of data to Greenplum Database. You may specify both MAX_ROW and MINIMAL_INTERVAL as long as both values are not zero (0). Try setting and tuning MINIMAL_INTERVAL to your environment; introduce a MAX_ROW setting only if you encounter high memory usage associated with message buffering.

SAVE_FAILING_BATCH: boolean
Determines whether or not GPSS saves data into a backup table before it writes the data to Greenplum Database. Saving the data in this manner aids recovery when GPSS encounters errors during the evaluation of expressions. The default is false; GPSS does not use a backup table, and returns immediately when it encounters an expression error. When you set this property to true, GPSS writes both the good and the bad data in the batch to a backup table named gpssbackup_<jobhash>, and continues to process incoming messages. You must then manually load the good data from the backup table into Greenplum or set RECOVER_FAILING_BATCH (Beta) to true to have GPSS automatically reload the good data.
Note: Using a backup table to hedge against mapping errors may impact performance, especially when the data that you are loading has not been cleaned.
RECOVER_FAILING_BATCH: boolean (Beta)
When set to true and SAVE_FAILING_BATCH is also true, GPSS automatically reloads the good data in the batch and retains only the error data in the backup table. The default value is false; GPSS does not process the backup table.
Note: Enabling this property requires that GPSS has the Greenplum Database privileges to create a function.
MAX_ROW: number_of_rows
The number of rows to batch before triggering an INSERT operation on the Greenplum Database table. The default value of MAX_ROW is 0, which instructs GPSS to ignore this commit trigger condition.
MINIMAL_INTERVAL: wait_time
The minimum amount of time to wait (milliseconds) between each INSERT operation on the table. The default value is 5000.
CONSISTENCY: { at-least | at-most | none }
Specify how GPSS should manage message offsets when it acts as a high-level consumer of a RabbitMQ stream. Valid values are at-least (GPSS stores the offsets before commit), at-most (GPSS stores the offsets after commit), and none. The default value is at-least.
IDLE_DURATION: idle_time

The maximum amount of time to wait (milliseconds) for the first batch of data. When you use this property to enable lazy load, GPSS waits until RabbitMQ data is available before locking the target Greenplum table. You can specify:

  • 0 (lazy load is disabled)
  • -1 (lazy load is enabled, the job never stops), or
  • a positive value (lazy load is enabled, the job stops after idle_time duration of no data in the RabbitMQ queue or stream) The default value is 0.

Greenplum Database TASK: Options

TASK:

Controls the execution and scheduling of a periodic (maintenance) task.

POST_BATCH_SQL: udf_or_sql_to_run
The user-defined function or SQL command(s) that you want to run after the specified number of batches are read from RabbitMQ. The default is null.
BATCH_INTERVAL: num_batches
The number of batches to read before executing udf_or_sql_to_run. The default batch interval is 0.
PREPARE_SQL: udf_or_sql_to_run
The user-defined function or SQL command(s) that you want GPSS to run before it executes the job. The default is null, no command to execute.
TEARDOWN_SQL: udf_or_sql_to_run
The user-defined function or SQL command(s) that you want GPSS to run after the job completes. The default is null, no command to execute.

RabbitMQ PROPERTIES: Options

PROPERTIES:
RabbitMQ configuration property names and values.
rmq_property_name
The name of a RabbitMQ property.
rmq_property_value
The RabbitMQ property value.

Job SCHEDULE: Options

SCHEDULE:

Controls the frequency and interval of restarting jobs.

RETRY_INTERVAL: retry_time
The period of time that GPSS waits before retrying a failed job. You can specify the time interval in day ( d), hour ( h), minute ( m), second ( s), or millisecond ( ms) integer units; do not mix units. The default retry interval is 5m (5 minutes).
MAX_RETRIES: num_retries
The maximum number of times GPSS attempts to retry a failed job. The default is 0, do not retry. If you specify a negative value, GPSS retries the job indefinitely.
RUNNING_DURATION: run_time
The amount of time after which GPSS automatically stops a job. GPSS does not automatically stop a job by default.
AUTO_STOP_RESTART_INTERVAL: restart_time
The amount of time after which GPSS restarts a job that it stopped due to reaching RUNNING_DURATION.
MAX_RESTART_TIMES: num_restarts
The maximum number of times that GPSS restarts a job that it stopped due to reaching RUNNING_DURATION. The default is 0, do not restart the job.
QUIT_AT_EOF_AFTER: clock_time
The clock time after which GPSS stops a job every day when it encounters an EOF. By default, GPSS does not automatically stop a job that reaches EOF. GPSS never stops a job when the current time is before clock_time, even when GPSS encounters an EOF.

Template Variables

GPSS supports using template variables to specify property values in the load configuration file.

You specify a template variable value in the load configuration file as follows:

<PROPERTY>: {{<template_var>}}

For example:

MAX_RETRIES: {{numretries}}

GPSS substitutes the template variable with a value that you specify via the -p | --property template\_var=value option to the gpsscli submit or gpsscli load command.

For example, if the command line specifies:

--property numretries=10

GPSS substitutes occurrences of {{numretries}} in the load configuration file with the value 10 before submitting the job, and uses that value during job execution.

Notes

If you created a database object name using a double-quoted identifier (delimited identifier), you must specify the delimited name within single quotes in the rabbitmq-v2.yaml configuration file. For example, if you create a table as follows:

CREATE TABLE "MyTable" ("MyColumn" text);

Your rabbitmq-v2.yaml YAML configuration file would refer to the above table and column names as:

  COLUMNS:
     - name: '"MyColumn"'
       type: text
OUTPUT:
   TABLE: '"MyTable"'

You can specify backslash escape sequences in the CSV DELIMITER, QUOTE, and ESCAPE options. GPSS supports the standard backslash escape sequences for backspace, form feed, newline, carriage return, and tab, as well as escape sequences that you specify in hexadecimal format (prefaced with \x). Refer to Backslash Escape Sequences in the PostgreSQL documentation for more information.

Examples

Load data from RabbitMQ as defined in the Version 2 configuration file named rmq2greenplumv2.yaml:

gpsscli load rmq2greenplumv2.yaml

Example rmq2greenplumv2.yaml configuration file:

DATABASE: testdb
USER: gpadmin
PASSWORD: changeme
HOST: mdw-1
PORT: 15432
VERSION: 2
RABBITMQ:
    INPUT:
      SOURCE:
        SERVER: gpdmin:changeme@localhost:5672
        QUEUE: test
        VIRTUALHOST: gpadmin
      DATA:
        COLUMNS:
          - NAME: c1
            TYPE: int
          - NAME: c2
            TYPE: int
        FORMAT: CSV
        CSV_OPTION:
          DELIMITER: ","
          QUOTE: "'"
          NULL_STRING: "NA"
          ESCAPE: '\'
          FORCE_NOT_NULL: "c1,c2"
          FILL_MISSING_FIELDS: true
      ERROR_LIMIT: 25
    OUTPUT:
      SCHEMA: "public"
      TABLE: tbl_int_text_column
      MODE: INSERT
      MAPPING:
        - NAME: c1
          EXPRESSION: c1::int
        - NAME: c2
          EXPRESSION: c2::int
    METADATA:
      SCHEMA: staging_schema
    COMMIT:
      MAX_ROW: 1000
      MINIMAL_INTERVAL: 200

See Also

rabbitmq-v3.yaml, gpsscli load, gpsscli submit

check-circle-line exclamation-circle-line close-line
Scroll to top icon