The GPSS RabbitMQ data source loads data from a RabbitMQ queue (the traditional AMQP implementation) or a RabbitMQ stream (persistent and replicated structure available in RabbitMQ version 3.9 and later) into Greenplum Database.

You can use the gpsscli utility to load RabbitMQ data/messages into Greenplum Database. The GPSS server, gpss, is a RabbitMQ consumer. It ingests streaming data from a single RabbitMQ queue or stream using Greenplum Database readable external tables to transform and insert or update the data into a target Greenplum table. You identify the RabbitMQ server, queue or stream name, virtual host, data format, and the Greenplum connection options and target table definition in a YAML-formatted load configuration file that you provide to the utility.

Load Procedure

You will perform the following tasks when you use the Greenplum Streaming Server to load RabbitMQ message data into a Greenplum Database table:

  1. Ensure that you meet the Prerequisites.
  2. Register the Greenplum Streaming Server extension.
  3. Identify the format of the RabbitMQ data.
  4. Construct the load configuration file.
  5. Create the target Greenplum Database table.
  6. Assign Greenplum Database role permissions to the table, if required.
  7. Run the gpsscli Client Commands to load the data into Greenplum Database.
  8. Check for load errors.

Prerequisites

Before using the gpsscli utility to load RabbitMQ data to Greenplum Database, ensure that you:

  • Meet the Prerequisites documented for the Greenplum Streaming Server, and configure and start the server.
  • Have access to a running RabbitMQ cluster, and that you can identify the hostname and port number of the RabbitMQ server serving the data.
  • Can identify the name of the RabbitMQ queue or stream of interest.
  • Can run the command on a host that has connectivity to:
    • Each RabbitMQ host in the RabbitMQ cluster.
    • The Greenplum Database coordinator and all segment hosts.

About Supported Message Data Formats

The Greenplum Streaming Server supports RabbitMQ message value data in the following formats:

Format Description
binary Binary format data. GPSS reads binary data from RabbitMQ only as a single bytea-type column.
csv Comma-delimited text format data.
custom Data of a custom format, parsed by a custom formatter function.
delimited Text data separated by a configurable delimiter.
json, jsonl (version 2 only) JSON- or JSONB-format data. Specify the json format when the file is in JSON or JSONB format. GPSS can read JSON data as a single object or can read a single JSON record per line. You must define a mapping if you want GPSS to write the data into specific columns in the target Greenplum Database table.
Note: GPSS supports JSONB-format data only when loading to Greenplum 6.
Note: Specify FORMAT: jsonl in version 2 format load configuration files. Specify json with is_jsonl: true in version 3 (Beta) format load configuration files.

To write RabbitMQ message data into a Greenplum Database table, you must identify the data format in the load configuration file.

Binary

Use the binary format when your RabbitMQ message data is a stream of bytes. GPSS reads binary data from RabbitMQ and loads it into a single bytea-type column.

CSV

Use the csv format when your RabbitMQ message data is comma-delimited text and conforms to RFC 4180. The message content may not contain line ending characters (CR and LF).

Data in csv format may appear in RabbitMQ messages as follows:

"1313131","12","backorder","1313.13"
"3535353","11","shipped","761.35"
"7979797","11","partial","18.72"

Custom

The Greenplum Streaming Server provides a custom data formatter plug-in framework for RabbitMQ messages using user-defined functions. The type of RabbitMQ message data processed by a custom formatter is formatter-specific. For example, a custom formatter may process compressed or complex data.

Delimited Text

The Greenplum Streaming Server supports loading RabbitMQ data delimited by one or more characters that you specify. Use the delimited format for such data. The delimiter may be a multi-byte value and up to 32 bytes in length.You can also specify quote and escape characters, and an end-of-line prefix.

Note

The delimiter may not contain the quote or escape characters.

When you specify a quote character:

  • The left and right quotes are the same.
  • Each data element must be quoted. GPSS does not support mixed quoted and unquoted content.
  • You must also define an escape character.
  • GPSS keeps the original format of any character between the quotes, except the quote and escape characters. This especially applies to the delimiter and \n, which do not require additional escape if they are quoted.
  • The quote character is presented as the escape character plus the quote character (for example, \”).
  • The escape character is presented as the escape character plus the escape character (for example, \)
  • GPSS parses multiple escape characters from left to right.

When you do not specify a quote character:

  • The escape character is optional.

  • If you do not specify an escape character, GPSS treats the delimiter as the column separator, and treats any end-of-line prefix plus \n as the row separator.

  • If you do specify an escape character:

    • GPSS uses the escape character plus the delimiter as the column separator.
    • GPSS uses the escape character plus the end-of-line prefix plus \n as the row separator.
    • The escape character plus the escape character is the escape character itself.
    • GPSS parses multiple escape characters from left to right.

Sample data using a pipe ('|') delimiter character follows:

1313131|12|backorder|1313.13
3535353|11|shipped|761.35
7979797|11|partial|18.72

JSON (single object)

Specify the json format when your RabbitMQ message data is in JSON or JSONB format and you want GPSS to read JSON data from RabbiMQ as a single object into a single column (per the JSON specification, newlines and white space are ignored). You must define a mapping if you want GPSS to write the data into specific columns in the target Greenplum Database table.

Note

GPSS supports JSONB-format data only when loading to Greenplum 6.

JSON (single record per line)

Specify FORMAT: jsonl in version 2 format load configuration files or specify json with is_jsonl: true in version 3 (Beta) format load configuration files when your RabbitMQ message data is in JSON format, single JSON record per line. You must define a mapping if you want GPSS to write the data into specific columns in the target Greenplum Database table.

Sample JSON message data:

{ "cust_id": 1313131, "month": 12, "amount_paid":1313.13 }
{ "cust_id": 3535353, "month": 11, "amount_paid":761.35 }
{ "cust_id": 7979797, "month": 11, "amount_paid":18.82 }

Registering a Custom Formatter

A custom data formatter for RabbitMQ messages is a user-defined function. If you are using a custom formatter, you must create the formatter function and register it in each database in which you will use the function to write RabbitMQ data to Greenplum tables.

Constructing the rabbitmq.yaml Configuration File

You configure a data load operation from RabbitMQ to Greenplum Database via a YAML-formatted configuration file. This configuration file includes parameters that identify the source RabbitMQ data and information about the Greenplum Database connection and target table, as well as error and commit thresholds for the operation.

When loading from RabbitMQ, the Greenplum Streaming Server supports two versions of the YAML configuration file: version 2 and version 3 (Beta).

Refer to the rabbitmq-v2.yaml reference page for Version 2 configuration file format and the configuration parameters that this version supports. rabbitmq-v3.yaml describes the Version 3 (Beta) format.

Contents of a sample gpsscli Version 2 YAML configuration file named loadcfgrmq2.yaml follows:

DATABASE: testdb
USER: gpadmin
PASSWORD: password
HOST: localhost
PORT: 15432
VERSION: 2
RABBITMQ:
  INPUT:
    SOURCE:
      SERVER: gpdmin:changeme@localhost:5552
      STREAM: test_stream
      VIRTUALHOST: vhost_for_gpss
    DATA:
      COLUMNS:
        - NAME: c1
          TYPE: int
        - NAME: c2
          TYPE: int
        - NAME: path
          TYPE: text
      FORMAT: CSV
  OUTPUT:
    SCHEMA: "public"
    TABLE: tbl_int_text_column
    MODE: MERGE
    MATCH_COLUMNS:
      - c1
    UPDATE_COLUMNS:
      - c2
    ORDER_COLUMNS:
      - path
    UPDATE_CONDITION: c2 = 11
    DELETE_CONDITION: c1 = 0
    MAPPING:
      - NAME: c1
        EXPRESSION: c1::int
      - NAME: c2
        EXPRESSION: c2::int
      - NAME: path
        EXPRESSION: path::text
  METADATA:
    SCHEMA: staging_schema
  COMMIT:
    MINIMAL_INTERVAL: 200
    CONSISTENCY: at-least
  PROPERTIES:
    eof.when.idle: 1500

Greenplum Database Options (Version 2-Focused)

You identify the Greenplum Database connection options via the DATABASE, USER, PASSWORD, HOST, and PORT parameters.

The VERSION parameter identifies the version of the GPSS YAML configuration file. You must specify version 2 or version v3.

RABBITMQ:INPUT Options

Specify the RabbitMQ server, virtual host, and queue or stream of interest using the SOURCE block.

The DATA block that you provide must specify the COLUMNS and FORMAT parameters. The DATA:COLUMNS block includes the name and type of each data element in the RabbitMQ message. The default source-to-target data mapping behaviour of GPSS is to match a column name as defined in COLUMNS:NAME with a column name in the target Greenplum Database OUTPUT:TABLE:

  • You must identify the RabbitMQ data elements in the order in which they appear in the RabbitMQ message.
  • You may specify NAME: __IGNORED__ to omit a RabbitMQ message value data element from the load operation.
  • You must provide the same name for each non-ignored RabbitMQ data element and its associated Greenplum Database table column.
  • You must specify an equivalent data type for each non-ignored RabbitMQ data element and its associated Greenplum Database table column.

The DATA:FORMAT keyword identifies the format of the RabbitMQ message value. GPSS supports comma-delimited text format (csv) and data that is separated by a configurable delimiter (delimited). GPSS also supports binary (binary), single object or single record per line JSON/JSONB (json or jsonl), and custom (custom) format value data.

When you provide a META block, you must specify a single JSON-type COLUMNS and the FORMAT: json.

The available RabbitMQ meta data properties for a streaming source include:

  • stream (text) - the RabbitMQ stream name
  • offset (bigint) - the message offset

The available RabbitMQ meta data properties for a queue source include:

  • queue (text) - the RabbitMQ queue name
  • messageId (text) - the message identifier
  • correlationId (text) - the correlation identifier
  • timestamp (bitint) - the time that the message was added to the RabbitMQ queue

The FILTER parameter identifies a filter to apply to the RabbitMQ input messages before the data is loaded into Greenplum Database. If the filter evaluates to true, GPSS loads the message. The message is dropped if the filter evaluates to false. The filter string must be a valid SQL conditional expression and may reference one or more DATA column names.

The ERROR_LIMIT parameter identifies the number of errors or the error percentage threshold after which GPSS should exit the load operation. The default ERROR_LIMIT is zero; the load operation is stopped when the first error is encountered.

RABBITMQ:OUTPUT Options

You identify the target Greenplum Database schema name and table name via the RABBITMQ:OUTPUT: SCHEMA and TABLE parameters. You must pre-create the Greenplum Database table before you attempt to load RabbitMQ data.

Note

GPSS supports loading from a RabbitMQ data source into a single Greenplum Database table only.

The default load mode is to insert RabbitMQ data into the Greenplum Database table. GPSS also supports updating and merging RabbitMQ message data into a Greenplum table. You specify the load MODE, the MATCH_COLUMNS and UPDATE_COLUMNS, and any UPDATE_CONDITIONs that must be met to merge or update the data. In MERGE MODE, you can also specify ORDER_COLUMNS to filter out duplicates and a DELETE_CONDITION.

You can override the default mapping of the INPUT DATA:COLUMNS by specifying a MAPPING block in which you identify the association between a specific column in the target Greenplum Database table and a RabbitMQ message value data element.

Note

When you specify a MAPPING block, ensure that you provide entries for all RabbitMQ data elements of interest - GPSS does not automatically match column names when you provide a MAPPING.

About the Merge Load Mode

MERGE mode is similar to an UPSERT operation; GPSS may insert new rows in the database, or may update an existing database row that satisfies match and update conditions. GPSS deletes rows in MERGE mode when the data satisfies an optional DELETE_CONDITION that you specify.

GPSS stages a merge operation in a temporary table, generating the SQL to populate the temp table from the set of OUTPUT configuration properties that you provide.

GPSS uses the following algorithm for MERGE mode processing:

  1. Create a temporary table like the target table.
  2. Generate the SQL to insert the source data into the temporary table.
    1. Add the MAPPINGS.
    2. Add the FILTER.
    3. Use MATCH_COLUMNS and ORDER_COLUMNS to filter out duplicates.
  3. Update the target table from rows in the temporary table that satisfy MATCH_COLUMNS, UPDATE_COLUMNS, and UPDATE_CONDITION.
  4. Insert non-matching rows into the target table.
  5. Delete rows in the target table that satisfy MATCH_COLUMNS and the DELETE_CONDITION.
  6. Truncate the temporary table.

Other Options

The RABBITMQ:METADATA:SCHEMA parameter specifies the name of the Greenplum Database schema in which GPSS creates external tables.

GPSS commits RabbitMQ data to the Greenplum Database table at the row and/or time intervals that you specify in the RABBITMQ:COMMIT: MAX_ROW and/or MINIMAL_INTERVAL parameters. If you do not specify these properties, GPSS commits data at the default MINIMAL_INTERVAL, 5000ms.

Specify a RABBITMQ:PROPERTIES block to set RabbitMQ configuration properties. GPSS sends the property names and values to RabbitMQ when it instantiates a consumer for the load operation.

About the JSON Format and Column Type

When you specify FORMAT: json or FORMAT: jsonl, valid COLUMN:TYPEs for the data include json or jsonb. You can also specify the new GPSS gp_jsonb (Beta) or gp_json (Beta) column types.

  • gp_jsonb is an enhanced JSONB type that adds support for \u escape sequences and unicode. For example, gp_jsonb can escape \uDD8B and \u0000 as text format, but jsonb treats these characters as illegal.
  • gp_json is an enhanced JSON type that can tolerate certain illegal unicode sequences. For example, gp_json automatically escapes incorrect surrogate pairs and processes \u0000 as \\u0000. Note that unicode escape values cannot be used for code point values above 007F when the server encoding is not UTF8.

You can use the gp_jsonb (Beta) and gp_json (Beta) data types as follows:

  • As the COLUMN:TYPE when the target Greenplum Database table column type is json or jsonb.

  • In a MAPPING when the target Greenplum Database column is text or varchar. For example:

    EXPRESSION: (j->>'a')::text
    
  • In a MAPPING when FORMAT: avro and the target Greenplum Database column is json or jsonb. For example:

    EXPRESSION: j::gp_jsonb
    

    or

    EXPRESSION: j::gp_json
    
  • In a MAPPING when FORMAT: avro and the target Greenplum Database column is text or varchar. For example:

    EXPRESSION: (j::gp_jsonb->>'a')::text
    

    or

    EXPRESSION: (j::gp_json->>'a')::text
    
Note

The gp_jsonb (Beta) and gp_json (Beta) data types are defined in an extension named dataflow. You must CREATE EXTENSION dataflow; in each database in which you choose to use these (Beta) data types.

Preserving Ill-Formed JSON Escape Sequences

GPSS exposes a configuration parameter that you can use with the gp_jsonb and gp_json types. The name of this parameter is gpss.json_preserve_ill_formed_prefix. When set, GPSS does not return an error when it encounters an ill-formed JSON escape sequence with these types, but instead prepends it with the prefix that you specify.

For example, if gpss.json_preserve_ill_formed_prefix is set to the string "##" as follows:

SET gpss.json_preserve_ill_formed_prefix = "##";

and GPSS encounters an ill-formed JSON sequence such as the orphaned low surrogate \ude04X, GPSS writes the data as ##\ude04X instead.

About Transforming and Mapping RabbitMQ Input Data

You can define a MAPPING between the RabbitMQ input data (DATA:COLUMNS and META:COLUMNS) and the columns in the target Greenplum Database table. Defining a mapping may be useful when you have a multi-field input column (such as a JSON-type column), and you want to assign individual components of the input field to specific columns in the target table.

You might also use a MAPPING to assign a value expression to a target table column. The expression must be one that you could specify in the SELECT list of a query, and can include a constant value, a column reference, an operator invocation, a built-in or user-defined function call, and so forth.

Creating the Greenplum Table

You must pre-create the Greenplum table before you load RabbitMQ data into Greenplum Database. You use the RABBITMQ:OUTPUT: SCHEMA and TABLE load configuration file parameters to identify the schema and table names.

The target Greenplum table definition must include each column that GPSS will load into the table. The table definition may include additional columns; GPSS ignores these columns, and loads no data into them.

The name and data type that you specify for a column of the target Greenplum Database table must match the name and data type of the related, non-ignored RabbitMQ message element. If you have defined a column mapping, the name of the Greenplum Database column must match the target column name that you specified for the mapping, and the type must match the target column type or expression that you define.

The CREATE TABLE command for the target Greenplum Database table receiving the RabbitMQ message data defined in the loadcfgrmq2.yaml file presented in the Constructing the rabbitmq.yaml Configuration File section follows:

 testdb=# CREATE TABLE public.tbl_int_text_column( c1 int8, c2 int8, path text );

About RabbitMQ Stream Offsets, Message Retention, and Loading

Note

This topic applies only when reading from a RabbitMQ stream.

RabbitMQ assigns each record/message within a stream a unique sequential id number. This id is referred to as an offset. GPSS retains, for each gpsscli load invocation specifying the same RabbitMQ stream and Greenplum Database table names, the last offset consumed by the load operation. Refer to Understanding RabbitMQ Message Offset Management for more detailed information about how GPSS manages RabbitMQ message offsets.

gpsscli load returns an error if its recorded offset for the RabbitMQ stream and Greenplum Database table combination is behind that of the current earliest RabbitMQ message offset for the topic, or when the earliest and latest offsets do not match.

When you receive one of these messages, you can choose to:

  • Resume the load operation from the earliest available message published to the stream by specifying the --force‑reset‑earliest option to gpsscli load:

    $ gpsscli load --force-reset-earliest loadcfg2.yaml
    
  • Load only new messages published to the RabbitMQ stream, by specifying the ‑‑force‑reset‑latest option with the command:

    $ gpsscli load --force-reset-latest loadcfg2.yaml
    
  • Load messages published since a specific timestamp (milliseconds since epoch), by specifying the --force‑reset‑timestamp option to gpsscli load:

    $ gpsscli load --force-reset-timestamp 1571066212000 loadcfg2.yaml
    
Note

Specifying the --force‑reset‑<xxx> options when loading data may result in missing or duplicate messages. Use of these options outside of the offset mismatch scenario is discouraged.

Alternatively, you can provide the FALLBACK_OPTION (version 2) or fallback_option (version 3 (Beta)) property in the load configuration file to instruct GPSS to automatically read from the specified offset when it detects a mismatch.

check-circle-line exclamation-circle-line close-line
Scroll to top icon