The GPSS RabbitMQ data source loads data from a RabbitMQ queue (the traditional AMQP implementation) or a RabbitMQ stream (persistent and replicated structure available in RabbitMQ version 3.9 and later) into Greenplum Database.
You can use the gpsscli utility to load RabbitMQ data/messages into Greenplum Database. The GPSS server, gpss
, is a RabbitMQ consumer. It ingests streaming data from a single RabbitMQ queue or stream using Greenplum Database readable external tables to transform and insert or update the data into a target Greenplum table. You identify the RabbitMQ server, queue or stream name, virtual host, data format, and the Greenplum connection options and target table definition in a YAML-formatted load configuration file that you provide to the utility.
You will perform the following tasks when you use the Greenplum Streaming Server to load RabbitMQ message data into a Greenplum Database table:
Before using the gpsscli
utility to load RabbitMQ data to Greenplum Database, ensure that you:
The Greenplum Streaming Server supports RabbitMQ message value data in the following formats:
Format | Description |
---|---|
binary | Binary format data. GPSS reads binary data from RabbitMQ only as a single bytea-type column. |
csv | Comma-delimited text format data. |
custom | Data of a custom format, parsed by a custom formatter function. |
delimited | Text data separated by a configurable delimiter. |
json, jsonl (version 2 only) | JSON- or JSONB-format data. Specify the json format when the file is in JSON or JSONB format. GPSS can read JSON data as a single object or can read a single JSON record per line. You must define a mapping if you want GPSS to write the data into specific columns in the target Greenplum Database table.
Note: GPSS supports JSONB-format data only when loading to Greenplum 6.
Note: Specify
FORMAT: jsonl in version 2 format load configuration files. Specify
json with
is_jsonl: true in version 3 (Beta) format load configuration files.
|
To write RabbitMQ message data into a Greenplum Database table, you must identify the data format in the load configuration file.
Use the binary
format when your RabbitMQ message data is a stream of bytes. GPSS reads binary data from RabbitMQ and loads it into a single bytea-type column.
Use the csv
format when your RabbitMQ message data is comma-delimited text and conforms to RFC 4180. The message content may not contain line ending characters (CR and LF).
Data in csv
format may appear in RabbitMQ messages as follows:
"1313131","12","backorder","1313.13"
"3535353","11","shipped","761.35"
"7979797","11","partial","18.72"
The Greenplum Streaming Server provides a custom data formatter plug-in framework for RabbitMQ messages using user-defined functions. The type of RabbitMQ message data processed by a custom formatter is formatter-specific. For example, a custom formatter may process compressed or complex data.
The Greenplum Streaming Server supports loading RabbitMQ data delimited by one or more characters that you specify. Use the delimited
format for such data. The delimiter may be a multi-byte value and up to 32 bytes in length.You can also specify quote and escape characters, and an end-of-line prefix.
NoteThe delimiter may not contain the quote or escape characters.
When you specify a quote character:
\n
, which do not require additional escape if they are quoted.When you do not specify a quote character:
The escape character is optional.
If you do not specify an escape character, GPSS treats the delimiter as the column separator, and treats any end-of-line prefix plus \n
as the row separator.
If you do specify an escape character:
\n
as the row separator.Sample data using a pipe ('|') delimiter character follows:
1313131|12|backorder|1313.13
3535353|11|shipped|761.35
7979797|11|partial|18.72
Specify the json
format when your RabbitMQ message data is in JSON or JSONB format and you want GPSS to read JSON data from RabbiMQ as a single object into a single column (per the JSON specification, newlines and white space are ignored). You must define a mapping if you want GPSS to write the data into specific columns in the target Greenplum Database table.
NoteGPSS supports JSONB-format data only when loading to Greenplum 6.
Specify FORMAT: jsonl
in version 2 format load configuration files or specify json
with is_jsonl: true
in version 3 (Beta) format load configuration files when your RabbitMQ message data is in JSON format, single JSON record per line. You must define a mapping if you want GPSS to write the data into specific columns in the target Greenplum Database table.
Sample JSON message data:
{ "cust_id": 1313131, "month": 12, "amount_paid":1313.13 }
{ "cust_id": 3535353, "month": 11, "amount_paid":761.35 }
{ "cust_id": 7979797, "month": 11, "amount_paid":18.82 }
A custom data formatter for RabbitMQ messages is a user-defined function. If you are using a custom formatter, you must create the formatter function and register it in each database in which you will use the function to write RabbitMQ data to Greenplum tables.
You configure a data load operation from RabbitMQ to Greenplum Database via a YAML-formatted configuration file. This configuration file includes parameters that identify the source RabbitMQ data and information about the Greenplum Database connection and target table, as well as error and commit thresholds for the operation.
When loading from RabbitMQ, the Greenplum Streaming Server supports two versions of the YAML configuration file: version 2 and version 3 (Beta).
Refer to the rabbitmq-v2.yaml reference page for Version 2 configuration file format and the configuration parameters that this version supports. rabbitmq-v3.yaml describes the Version 3 (Beta) format.
Contents of a sample gpsscli
Version 2 YAML configuration file named loadcfgrmq2.yaml
follows:
DATABASE: testdb
USER: gpadmin
PASSWORD: password
HOST: localhost
PORT: 15432
VERSION: 2
RABBITMQ:
INPUT:
SOURCE:
SERVER: gpdmin:changeme@localhost:5552
STREAM: test_stream
VIRTUALHOST: vhost_for_gpss
DATA:
COLUMNS:
- NAME: c1
TYPE: int
- NAME: c2
TYPE: int
- NAME: path
TYPE: text
FORMAT: CSV
OUTPUT:
SCHEMA: "public"
TABLE: tbl_int_text_column
MODE: MERGE
MATCH_COLUMNS:
- c1
UPDATE_COLUMNS:
- c2
ORDER_COLUMNS:
- path
UPDATE_CONDITION: c2 = 11
DELETE_CONDITION: c1 = 0
MAPPING:
- NAME: c1
EXPRESSION: c1::int
- NAME: c2
EXPRESSION: c2::int
- NAME: path
EXPRESSION: path::text
METADATA:
SCHEMA: staging_schema
COMMIT:
MINIMAL_INTERVAL: 200
CONSISTENCY: at-least
PROPERTIES:
eof.when.idle: 1500
You identify the Greenplum Database connection options via the DATABASE
, USER
, PASSWORD
, HOST
, and PORT
parameters.
The VERSION
parameter identifies the version of the GPSS YAML configuration file. You must specify version 2
or version v3
.
Specify the RabbitMQ server, virtual host, and queue or stream of interest using the SOURCE
block.
The DATA
block that you provide must specify the COLUMNS
and FORMAT
parameters. The DATA:COLUMNS
block includes the name and type of each data element in the RabbitMQ message. The default source-to-target data mapping behaviour of GPSS is to match a column name as defined in COLUMNS:NAME
with a column name in the target Greenplum Database OUTPUT:TABLE
:
NAME: __IGNORED__
to omit a RabbitMQ message value data element from the load operation.The DATA:FORMAT
keyword identifies the format of the RabbitMQ message value. GPSS supports comma-delimited text format (csv
) and data that is separated by a configurable delimiter (delimited
). GPSS also supports binary (binary
), single object or single record per line JSON/JSONB (json
or jsonl
), and custom (custom
) format value data.
When you provide a META
block, you must specify a single JSON-type COLUMNS
and the FORMAT: json
.
The available RabbitMQ meta data properties for a streaming source include:
stream
(text) - the RabbitMQ stream nameoffset
(bigint) - the message offsetThe available RabbitMQ meta data properties for a queue source include:
queue
(text) - the RabbitMQ queue namemessageId
(text) - the message identifiercorrelationId
(text) - the correlation identifiertimestamp
(bitint) - the time that the message was added to the RabbitMQ queueThe FILTER
parameter identifies a filter to apply to the RabbitMQ input messages before the data is loaded into Greenplum Database. If the filter evaluates to true
, GPSS loads the message. The message is dropped if the filter evaluates to false
. The filter string must be a valid SQL conditional expression and may reference one or more DATA
column names.
The ERROR_LIMIT
parameter identifies the number of errors or the error percentage threshold after which GPSS should exit the load operation. The default ERROR_LIMIT
is zero; the load operation is stopped when the first error is encountered.
You identify the target Greenplum Database schema name and table name via the RABBITMQ:OUTPUT:
SCHEMA
and TABLE
parameters. You must pre-create the Greenplum Database table before you attempt to load RabbitMQ data.
NoteGPSS supports loading from a RabbitMQ data source into a single Greenplum Database table only.
The default load mode is to insert RabbitMQ data into the Greenplum Database table. GPSS also supports updating and merging RabbitMQ message data into a Greenplum table. You specify the load MODE
, the MATCH_COLUMNS
and UPDATE_COLUMNS
, and any UPDATE_CONDITION
s that must be met to merge or update the data. In MERGE
MODE
, you can also specify ORDER_COLUMNS
to filter out duplicates and a DELETE_CONDITION
.
You can override the default mapping of the INPUT
DATA:COLUMNS
by specifying a MAPPING
block in which you identify the association between a specific column in the target Greenplum Database table and a RabbitMQ message value data element.
NoteWhen you specify a
MAPPING
block, ensure that you provide entries for all RabbitMQ data elements of interest - GPSS does not automatically match column names when you provide aMAPPING
.
MERGE
mode is similar to an UPSERT
operation; GPSS may insert new rows in the database, or may update an existing database row that satisfies match and update conditions. GPSS deletes rows in MERGE
mode when the data satisfies an optional DELETE_CONDITION
that you specify.
GPSS stages a merge operation in a temporary table, generating the SQL to populate the temp table from the set of OUTPUT
configuration properties that you provide.
GPSS uses the following algorithm for MERGE
mode processing:
MAPPINGS
.FILTER
.MATCH_COLUMNS
and ORDER_COLUMNS
to filter out duplicates.MATCH_COLUMNS
, UPDATE_COLUMNS
, and UPDATE_CONDITION
.MATCH_COLUMNS
and the DELETE_CONDITION
.The RABBITMQ:METADATA:SCHEMA
parameter specifies the name of the Greenplum Database schema in which GPSS creates external tables.
GPSS commits RabbitMQ data to the Greenplum Database table at the row and/or time intervals that you specify in the RABBITMQ:COMMIT:
MAX_ROW
and/or MINIMAL_INTERVAL
parameters. If you do not specify these properties, GPSS commits data at the default MINIMAL_INTERVAL
, 5000ms.
Specify a RABBITMQ:PROPERTIES
block to set RabbitMQ configuration properties. GPSS sends the property names and values to RabbitMQ when it instantiates a consumer for the load operation.
When you specify FORMAT: json
or FORMAT: jsonl
, valid COLUMN:TYPE
s for the data include json
or jsonb
. You can also specify the new GPSS gp_jsonb
(Beta) or gp_json
(Beta) column types.
gp_jsonb
is an enhanced JSONB type that adds support for \u
escape sequences and unicode. For example, gp_jsonb
can escape \uDD8B
and \u0000
as text format, but jsonb
treats these characters as illegal.gp_json
is an enhanced JSON type that can tolerate certain illegal unicode sequences. For example, gp_json
automatically escapes incorrect surrogate pairs and processes \u0000
as \\u0000
. Note that unicode escape values cannot be used for code point values above 007F
when the server encoding is not UTF8
.You can use the gp_jsonb
(Beta) and gp_json
(Beta) data types as follows:
As the COLUMN:TYPE
when the target Greenplum Database table column type is json
or jsonb
.
In a MAPPING
when the target Greenplum Database column is text
or varchar
. For example:
EXPRESSION: (j->>'a')::text
In a MAPPING
when FORMAT: avro
and the target Greenplum Database column is json
or jsonb
. For example:
EXPRESSION: j::gp_jsonb
or
EXPRESSION: j::gp_json
In a MAPPING
when FORMAT: avro
and the target Greenplum Database column is text
or varchar
. For example:
EXPRESSION: (j::gp_jsonb->>'a')::text
or
EXPRESSION: (j::gp_json->>'a')::text
NoteThe
gp_jsonb
(Beta) andgp_json
(Beta) data types are defined in an extension nameddataflow
. You mustCREATE EXTENSION dataflow;
in each database in which you choose to use these (Beta) data types.
Preserving Ill-Formed JSON Escape Sequences
GPSS exposes a configuration parameter that you can use with the gp_jsonb
and gp_json
types. The name of this parameter is gpss.json_preserve_ill_formed_prefix
. When set, GPSS does not return an error when it encounters an ill-formed JSON escape sequence with these types, but instead prepends it with the prefix that you specify.
For example, if gpss.json_preserve_ill_formed_prefix
is set to the string "##"
as follows:
SET gpss.json_preserve_ill_formed_prefix = "##";
and GPSS encounters an ill-formed JSON sequence such as the orphaned low surrogate \ude04X
, GPSS writes the data as ##\ude04X
instead.
You can define a MAPPING
between the RabbitMQ input data (DATA:COLUMNS
and META:COLUMNS
) and the columns in the target Greenplum Database table. Defining a mapping may be useful when you have a multi-field input column (such as a JSON-type column), and you want to assign individual components of the input field to specific columns in the target table.
You might also use a MAPPING
to assign a value expression to a target table column. The expression must be one that you could specify in the SELECT
list of a query, and can include a constant value, a column reference, an operator invocation, a built-in or user-defined function call, and so forth.
You must pre-create the Greenplum table before you load RabbitMQ data into Greenplum Database. You use the RABBITMQ:OUTPUT:
SCHEMA
and TABLE
load configuration file parameters to identify the schema and table names.
The target Greenplum table definition must include each column that GPSS will load into the table. The table definition may include additional columns; GPSS ignores these columns, and loads no data into them.
The name and data type that you specify for a column of the target Greenplum Database table must match the name and data type of the related, non-ignored RabbitMQ message element. If you have defined a column mapping, the name of the Greenplum Database column must match the target column name that you specified for the mapping, and the type must match the target column type or expression that you define.
The CREATE TABLE
command for the target Greenplum Database table receiving the RabbitMQ message data defined in the loadcfgrmq2.yaml
file presented in the Constructing the rabbitmq.yaml Configuration File section follows:
testdb=# CREATE TABLE public.tbl_int_text_column( c1 int8, c2 int8, path text );
NoteThis topic applies only when reading from a RabbitMQ stream.
RabbitMQ assigns each record/message within a stream a unique sequential id number. This id is referred to as an offset. GPSS retains, for each gpsscli load
invocation specifying the same RabbitMQ stream and Greenplum Database table names, the last offset consumed by the load operation. Refer to Understanding RabbitMQ Message Offset Management for more detailed information about how GPSS manages RabbitMQ message offsets.
gpsscli load
returns an error if its recorded offset for the RabbitMQ stream and Greenplum Database table combination is behind that of the current earliest RabbitMQ message offset for the topic, or when the earliest and latest offsets do not match.
When you receive one of these messages, you can choose to:
Resume the load operation from the earliest available message published to the stream by specifying the --force‑reset‑earliest
option to gpsscli load
:
$ gpsscli load --force-reset-earliest loadcfg2.yaml
Load only new messages published to the RabbitMQ stream, by specifying the ‑‑force‑reset‑latest
option with the command:
$ gpsscli load --force-reset-latest loadcfg2.yaml
Load messages published since a specific timestamp (milliseconds since epoch), by specifying the --force‑reset‑timestamp
option to gpsscli load
:
$ gpsscli load --force-reset-timestamp 1571066212000 loadcfg2.yaml
NoteSpecifying the
--force‑reset‑<xxx>
options when loading data may result in missing or duplicate messages. Use of these options outside of the offset mismatch scenario is discouraged.
Alternatively, you can provide the FALLBACK_OPTION
(version 2) or fallback_option
(version 3 (Beta)) property in the load configuration file to instruct GPSS to automatically read from the specified offset when it detects a mismatch.