You will perform the following tasks when you use the Greenplum Streaming Server (GPSS) to load data into a Greenplum Database table:
NoteThe Greenplum Streaming Server requires a load configuration file when you use the
gpsscli
orgpkafka
client utilities to load data into Greenplum Database. A load configuration file is not required if you are using the VMware Greenplum Connector for Apache NiFi or a custom GPSS client application.
You configure a load operation from a data source to Greenplum Database via a YAML-formatted configuration file as described in gpsscli.yaml. This configuration file includes properties that identify the data source and format, information about the Greenplum Database connection and target table, and error and commit thresholds for the operation.
GPSS supports some pre-defined data formats, including Avro, binary, CSV, and JSON. GPSS also supports custom data formats. Refer to Understanding Custom Formatters for information on developing and using a custom formatter with GPSS.
GPSS supports version 1 (deprecated), version 2, and version 3 (Beta) load configuration file formats. Versions 1 and 2 use a similar YAML structure. Version 3 introduces a new YAML structure, organization, and keywords.
Refer to Constructing the gpkafka.yaml Configuration File for the YAML file format for a Kafka data source, Constructing the filesource.yaml Configuration File for the YAML file format for a file data source, or Constructing the rabbitmq.yaml Configuration File for a RabbitMQ data source.
You are required to pre-create the target Greenplum table before you initiate a data load operation to Greenplum Database from a GPSS client. You must be able to identify both the schema name and table name of the target table.
NoteThe column data types that you specify for the target Greenplum Database table are informed by the data formats supported by the GPSS client.
If you load data to Greenplum Database from a GPSS client using a non-admin Greenplum user/role, the Greenplum administrator must assign the role certain privileges:
The role must have USAGE
and CREATE
privileges on any non-public database schema where:
gpss
creates external tables. =# GRANT USAGE, CREATE ON SCHEMA <schema_name> TO <role_name>;
If the role writing to Greenplum Database is not a database or table owner, the role must have SELECT
and INSERT
privileges on each Greenplum Database table to which the role will write data:
=# GRANT SELECT, INSERT ON <schema_name>.<table_name> TO <role_name>;
The role must have permission to create readable external tables using the Greenplum Database gpfdist
protocol:
=# ALTER ROLE <role_name> CREATEEXTTABLE(type = 'readable', protocol = 'gpfdist');
Refer to the Greenplum Database Managing Roles and Privileges documentation for further information on assigning privileges to Greenplum Database users.
CautionDo not directly
SELECT
from an external table that GPSS creates for your job. Any data that you read in this manner will not be loaded into the Greenplum Database table.
You run a GPSS client to use the Greenplum Streaming Server to load data into Greenplum Database. Installation, configuration, and run procedures for a GPSS client are client-specific. For example, refer to the VMware Greenplum Connector for Apache NiFi Documentation for the installation, configuration, and run procedures for the Connector for Apache NiFi.
You can also use the gpsscli client command line utility to load data into Greenplum Database.
The Greenplum Streaming Server (GPSS) includes the gpsscli
client command utility. gpsscli
provides subcommands to manage Greenplum Streaming Server load jobs and to view job status and progress:
Subcommand | Description |
---|---|
convert | Convert a version 1 or 2 load configuration file to version 3 (Beta) format |
dryrun | Perform a trial load without writing to Greenplum Database |
help | Display command help |
list | List jobs and their status |
load | Run one or more single-command load |
progress | Show job progress |
remove | Remove one or more jobs |
start | Start one or more jobs |
status | Show job status |
stop | Stop one or more jobs |
submit | Submit one or more jobs |
wait | Wait for a job to stop |
All subcommands include options that allow you to specify the host and/or port number of the GPSS instance that you want to service the request (--config
or --gpss-host
and --gpss-port
). You can also specify the directory to which GPSS writes gpsscli
client log files (--log-dir
).
NoteThe Greenplum Streaming Server includes a client command utility named
gpkafka
.gpkafka
is a wrapper around thegpss
andgpsscli
utilities that provides data load capabilities to Greenplum from a Kafka data source. Refer to Loading Kafka Data into Greenplum for information about loading data from Kafka into Greenplum Database.
A typical command workflow when using gpsscli
to load data into Greenplum Database follows:
Alternatively, you can run a single-command load operation that submits a GPSS job on your behalf, starts the job, displays job progress, and may stop the GPSS job. See Running a Single-Command Load.
All gpsscli
subcommands return zero (0
) on success and non-zero on failure. The specific return code values and failure descriptions are identified in the table below.
Return Code | Description |
---|---|
0 | Success |
1 | Internal error |
2 | RPC error |
3 | Job error; the status of one or more jobs that the subcommand operated on is Error |
You identify a job by a name that you specify or a unique identifier that GPSS assigns. Job names must be unique. Use the name to manage the job throughout its lifecycle.
GPSS uses a data source-specific combination of properties specified in the load configuration file to internally identify a job. For example, when it loads from a Kafka data source, GPSS uses the Kafka topic name, and the target Greenplum database, schema, and table names for internal job identification. GPSS creates internal and external tables for each job that are keyed off of these properties; these tables keep track of the progress of the load operation. GPSS considers any load configuration file submitted with the same value for these job-identifying properties to be the same internal job.
GPSS creates a unique external table to load data for a specific job directly into Greenplum Database segments. Kafka job external table names begin with the prefix gpkafkaloadext_
, file job external table names begin with gpfileloadext_
. RabbitMQ job external table names begin with the prefix gprabbitmqloadext_
. And by default, GPSS reuses this external table each time you restart the job.
The complete name and the lifecycle of a GPSS external table depends on the ReuseTables
setting in the gpss.json
server configuration file as described below.
ReuseTables=true
When ReuseTables
is true
(the default setting), GPSS reuses and does not drop an external table for a job.
GPSS creates the external table using the following naming format: gp<datasource>loadext_<hash>
.
GPSS calculates the <hash> based on values of server configuration properties and load/job configuration properties that would change the external table definition. These properties include:
Gpfdist host and port number
Target Greenplum Database schema name
Target Greenplum Database table name
Target Greenplum Database table definition
Gpfdist use of encrypted communications
Source data format
Source data types
Formatter options
Job key; GPSS generates the job key as follows for the different data sources:
Error Limit
ReuseTables=false
When ReuseTables
is false
, GPSS drops an external table, if one exists, when a job is (re)started.
NoteRepeated drop/create of external tables may cause bloating in the
pg_attribute
andpg_class
system catalog tables; be sure toVACUUM
these tables frequently.
GPSS creates the external table using the job name instead of a hash. The default job name is the base name of the YAML load configuration file. You set a custom job name when you specify the -–name <jobname>
option to the gpsscli submit
command.
The external table naming format when ReuseTables
is false
follows: gp<datasource>loadext_<jobname>
.
To register a data load operation to Greenplum Database, you submit a job to the Greenplum Streaming Server using the gpsscli submit subcommand. When you submit a job, you provide a YAML-formatted configuration file that defines the properties of the load operation. Load properties include Greenplum-specific options, as well as properties that are specific to the data source. See gpsscli.yaml.
You identify a GPSS job by a name that you provide via the --name
option. If you do not specify a job name, GPSS assigns and returns the base name of the load configuration file as the job identifier. You use this name or identifier to identify the job throughout its lifecycle.
The following example submits a GPSS job named order_upload
whose load properties are defined in the configuration file named loadcfg.yaml
:
$ gpsscli submit --name order_upload loadcfg.yaml
A newly-submitted GPSS job is in the Submitted state.
To start a GPSS job, you run the gpsscli start subcommand. When you start a job, GPSS initiates the data load operation from the client. It sets up the connection to Greenplum Database and creates the external tables that it uses to load data directly into Greenplum segments.
The following example starts the GPSS job named order_upload
:
$ gpsscli start order_upload
A job that starts successfully enters the Running state.
The default behaviour of gpsscli start
is to return immediately. When you specify the --quit-at-eof
option, the command reads data until it receives an EOF, and then stops the job. In this scenario, the job transitions to the Success or Error state when the command exits.
GPSS provides several commands to check the status of a running job(s):
The gpsscli list subcommand lists running (or all) jobs and their status:
$ gpsscli list --all
JobName JobID GPHost GPPort DataBase Schema Table Topic Status
order_upload d577cf37890b5b6bf4e713a9586e86c9 sys1 5432 testdb public order_sync orders JOB_RUNNING
file_jobf3 cfb985bcdc8884352b4b8853f5d06bbe sys1 5432 testdb public from_csvfile file:///tmp/data.csvJOB_STOPPED
The gpsscli status subcommand displays the status of a specific job:
$ gpsscli status order_upload
...,101565,info,Job order_upload, status JOB_RUNNING, errmsg [], time 2020-08-04T16:44:03.376216533Z
Use the gpsscli status
command to determine the status or success or failure of the operation. If the job status is Error, you will want to examine command output and log file messages for additional information. See Checking for Load Errors.
The gpsscli progress subcommand displays the progress of a running Kafka job. The command waits, and displays the job commit history and transfer speed at runtime. gpsscli progress
returns when the job stops.
NoteGPSS currently supports job progress tracking only for Kafka data sources.
$ gpsscli progress order_upload
StartTime EndTime MsgNum MsgSize InsertedRecords RejectedRecords Speed
2019-10-15T21:56:49.950134Z 2019-10-15T21:56:49.964751Z 1000 78134 1000 0 735.13KB/sec
2019-10-15T21:56:49.976231Z 2019-10-15T21:56:49.984311Z 1000 77392 1000 0 701.58KB/sec
2019-10-15T21:56:49.993607Z 2019-10-15T21:56:50.003602Z 1000 77194 1000 0 723.32KB/sec
By default, gpsscli progress
displays job progress by batch. To display job progress by partition, specify the --partition
option to the subcommand:
$ gpsscli progress order_upload --partition
PartitionID StartTime EndTime BeginOffset EndOffset MsgSize Speed
0 2019-10-15T21:56:54.80469Z 2019-10-15T21:56:54.830441Z 242000 243000 81033 652.29KB/sec
0 2019-10-15T21:56:54.846354Z 2019-10-15T21:56:54.880517Z 243000 244000 81021 675.12KB/sec
0 2019-10-15T21:56:54.893097Z 2019-10-15T21:56:54.904745Z 244000 245000 80504 673.67KB/sec
GPSS also keeps track of the progress of each Kafka load job in a separate CSV-format log file. The job progress log files, named progress_<jobname>_<jobid>_<date>.log
, reside in the GPSS server log directory. Refer to the Kafka data source Checking the Progress of a Load Operation topic for more information.
You can use the gpsscli wait subcommand to wait for a running job to complete. A job is complete when there is no more data to read, or when an error is returned. Such jobs transition from the Running state to the Success or Error state.
$ gpsscli wait order_upload
gpsscli wait
exits when the job completes.
Use the gpsscli stop subcommand to stop a specific job. When you stop a job, GPSS writes any unwritten batched data to the Greenplum Database table and stops actively reading new data from the data source.
$ gpsscli stop order_upload
A job that you stop enters the Stopped state.
The gpsscli remove subcommand removes a GPSS job. When you remove a job, GPSS unregisters the job from its job list and releases all job-related resources.
$ gpsscli remove order_upload
The gpsscli load subcommand initiates a data load operation. When you run gpsscli load
, GPSS submits, starts, and displays the progress (Kafka job only) of a job on your behalf.
By default, gpsscli load
loads all available data and then waits indefinitely for new messages to load. In the case of user interrupt or exit, the GPSS job remains in the Running state. You must explicitly stop the job with gpsscli stop
when running in this mode.
When you provide the --quit-at-eof
option to the command, the utility exits after it reads all published data, writes the data to Greenplum Database, and stops the job. The GPSS job is in the Success or Error state when the command returns.
Similar to the gpsscli submit
command, gpsscli load
takes as input an optional name and a YAML-format configuration file that defines the load properties:
$ gpsscli load --quit-at-eof loadcfg.yaml
Because the above command does not specify the --name
option, GPSS assigns and returns the job identifier loadcfg
when you run it.
In the default configuration, GPSS relies on the gpsscli
subcommands that you submit to initiate and stop jobs. Once a job is started, GPSS does not automatically (re)start the job, and GPSS stops a job only when you have specified the --quit-at-eof
option to the gpsscli
subcommand.
You can configure GPSS to automatically stop and restart failed and running jobs via scheduling properties that you specify in the load configuration file.
You can register to be notified when a job is stopped for any reason (success, error, completed, user-initiated stop) via alert properties that you specify in the load configuration file. When a job stops, GPSS will invoke a command that you specify, after an optional period of time that you configure elapses.
You can configure GPSS to restart a failed job after a period of time that you specify; you can also configure the maximum number of times that GPSS retries the job. The load configuration file properties that govern failed job retry are located in the SCHEDULE:
block.
Refer to Auto-Restarting a Failed Job for additional information about retrying a failed job.
You can configure GPSS to automatically stop a running job after it has run for a period of time, or at a specific clock time after receiving an EOF. You can also configure a restart interval and the maximum number of times GPSS should restart a job that it stopped. The load configuration file properties that govern job scheduling in GPSS are also located in the SCHEDULE:
block.
The Greenplum Streaming Server cannot directly return success or an error to the client. You can obtain success and error information for a GPSS load operation from gpsscli
subcommand output, and from messages that GPSS writes to stdout
or writes to the server, progress (Kafka jobs only), and/or client log files.
You can also view data formatting-specific errors encountered during a load operation in the error log.
Error checking activity may include:
GPSS writes server and client log messages to files as described in Managing GPSS Log Files.
GPSS Version | Log File Content |
---|---|
1.4.0 and older | <date>:<time> <proc>:<user>:<host>:<proc_pid>-[<severity>]:-<message> |
1.4.1 - 1.9.x | timestamp,pid,level,message (header row, CSV format) |
1.10.0 and newer | timestamp,pid,level,message (client log file header row, CSV format)timestamp,job_id,pid,level,message (server log file header row, CSV format) |
Example message in a gpss
log file:
20230427 15:17:22.95110,-,31424,info,gpss listening on :5000
GPSS writes at most the first 8 characters of a job identifier, or writes -
when the message is not job-specific.
Example message in a gpsscli
log file:
20230427 16:28:46.39607,1305,info,"JobID: 593347a306a1f9439a127b982b2f891f,JobName: nightly_load"
To determine if GPSS loaded one or more batches of data to Greenplum Database successfully, first examine the status and progress of the job in question. The gpsscli status
and gpsscli progress
command output will identify if any known error conditions exist.
Also examine gpss
command output and logs, searching for messages that identify the number of rows inserted and rejected. For a Kafka or RabbitMQ data source, search for:
... -[INFO]:- ... Inserted 9 rows
... -[INFO]:- ... Rejected 0 rows
Or, for load jobs originating from a file data source:
... -[INFO]:- ... inserted 5, rejected 1
When a Kafka, file, or S3 job fails, you may choose to perform a trial run of the load operation to help diagnose the cause of the failure. The gpsscli dryrun command reads the data from the source and prepares to load the data, but does not actually insert it into Greenplum Database. The command returns the results of this processing, as well as the SQL commands that GPSS would run to complete the job.
Sample command that specifies a Kafka load configuration file:
$ gpsscli dryrun --include-error-process kjobcfgv3.yaml
Sample command output:
jobid: 01ba08c0f7fc8e3a49e2ad1ee48ef899
jobname: kjobcfgv3
jobtype: KafkaJob
tracking table name: gpkafka_tbl_1_column_text_01ba08c0f7fc8e3a49e2ad1ee48ef899
progress log file name: progress_kjobcfgv3_01ba08c0_20220218.log
Extension version checking:
<SQL commands to check extension versions>
SQL of job:
<SQL commands to fullfil the load operation>
Because the --include-error-process
flag was specified for the Kafka job dry run, the output may include the following text:
Check format error:
error sql query: <query>
clean up error table: <query>
Check failed batch with expression error:
get failing batch query: <query>
If gpss
command or log output indicates that rows were rejected, the output will identify an SQL query that you can run to view the data formatting errors that GPSS encountered while inserting data into a Greenplum Database table.
GPSS uses the LOG ERRORS
feature of Greenplum Database external tables to detect and log data rows with formatting errors. The functions that you use to access and manage the error log, and the persistence of the error data, depend on the version of Greenplum Database that you are running GPSS against and the ReuseTables
setting in effect when you started the gpss
server.
If you are running GPSS against Greenplum Database versions 5.26+ or 6.6+ and you started the gpss
server with ReuseTables=false
:
LOG ERRORS PERSISTENTLY
when it creates external tables for a job.gp_read_persistent_error_log()
function to retrieve the error data.If you are running GPSS against older 5.x and 6.x versions of Greenplum, or you started the gpss
server with ReuseTables=true
:
gp_read_error_log()
function to retrieve the error data.gpss
server is started with ReuseTables=true
, GPSS does not drop an external table for a job. If ReuseTables=false
, GPSS drops an external table, if one exists, when a job is (re)started.)Refer to the Greenplum Database CREATE EXTERNAL TABLE documentation for more information about the external table error logs and error log management.
When you run the query to view the error log, you specify the name of the external table that GPSS used for the load operation. You identify the name of the external table by examining the gpss
command output and/or log file messages. For best results, use the (short) time interval identified in the gpss
output.
Kafka job external table names begin with gpkafkaloadext_
, file job external table names begin with gpfileloadext_
, and RabbitMQ job external table names begin with gprabbitmqloadext_
.
The following example query displays the number of errors returned in a Kafka load job:
SELECT count(*) FROM gp_read_error_log('"public"."gpkafkaloadext_ae0eac9f8c94a487f30f749175c3afbf"')
WHERE cmdtime > '2018-08-10 18:44:23.814651+00';
Warning Do not directly
SELECT
from an external table that GPSS creates for your job. Any data that you read in this manner will not be loaded into the Greenplum Database table.
A job may fail for temporary reasons. You can configure GPSS to automatically restart an errored job. GPSS automatic job restart is deactivated by default. When the YAML-format load configuration file submitted for a job includes the non-default SCHEDULE:
block MAX_RETRIES
and RETRY_INTERVAL
configuration settings, GPSS will attempt to restart the job if the job enters the Error state after it starts.
GPSS stops trying to restart the job when the configured retry limit is reached, or if the job is removed or the job configuration is updated during retry.
Jobs that you start via the gpsscli start
, gpsscli load
, and gpkafka load
commands are eligible for automatic job restart on error. If you provide the --quit-at-eof
flag or one of the --force-reset-*xxx*
flags when you run the command and the Kafka or RabbitMQ job load configuration file specifies failed job retry settings, GPSS ignores the flag on any retry attempts that it initiates.
GPSS catches data formatting errors during loading, and you can view these errors with gp_read_error_log()
as described in Reading the Error Log.
There may be cases where your data is formatted correctly, but GPSS encounters an error when it evaluates a mapping or a filter expression. If the evaluation fails, GPSS cannot log and propagate the error back to the user.
For example, if you specify the following mapping expression in your load configuration file:
EXPRESSION: (jdata->>'id')::int
and the content of jdata->>'id'
is a string that includes non-integer characters, the expression will fail when Greenplum Database evaluates it.
The load configuration property COMMIT: SAVE_FAILING_BATCH
(versions 2 and 3 load configuration file formats only) governs whether or not GPSS saves a batch of data into a backup table before it writes the data to Greenplum Database. Saving the data in this manner aids loading recovery when GPSS encounters errors during the evaluation of expressions.
By default, SAVE_FAILING_BATCH
is false
, and GPSS immediately terminates a load job when it encounters an expression error.
When you set SAVE_FAILING_BATCH
to true
, GPSS writes all data in the batch to a backup table named gpssbackup_<jobhash>
. GPSS writes both good and bad data to the backup table.
A backup table has the following columns:
Column Name | Description |
---|---|
data | The data associated with the row that GPSS attempted to load into Greenplum Database. |
gpss_save_timestamp | The time that GPSS inserted the row into the backup table. |
gpss_expression_error | The error that GPSS encountered when it ran the expression specified in the column mapping. |
Sample backup table content for a failed load operation follows:
test=# SELECT * from gpssbackup_e0c5991570303703450bbac2ee8816bc;
-[ RECORD 1 ]---------+-----------------------------------------------------------------------------------------------------------
data | {"device": "agkNtzFnHIVASYNvo", "humidity": 91.3, "temperature": 9, "time": "2019-09-24T15:33:57.054175"}
gpss_save_timestamp | 2022-07-04 07:51:17.798469+00
gpss_expression_error | division by zero
-[ RECORD 2 ]---------+-----------------------------------------------------------------------------------------------------------
data | {"device": "LcZQGnVXhORIKxWY", "humidity": 46.289, "temperature": 9, "time": "2019-09-24T15:33:57.054561"}
gpss_save_timestamp | 2022-07-04 07:51:17.798469+00
gpss_expression_error | division by zero
GPSS continues to process Kafka messages even after it encounters an expression error. When GPSS encounters one or more expression errors in a batch, none of the good data in the batch is written to Greenplum. You can set the RECOVER_FAILING_BATCH
(Beta) configuration property to instruct GPSS to automatically reload the good data in the batch, and retain only the error data in the backup table. GPSS displays additional information about the recovery process when you set this option.
NoteUsing a backup table in this manner to hedge against expression errors may impact performance, especially when the data that you are loading has not been cleaned.
GPSS creates a unique external table to load data for a specific job directly into Greenplum Database segments. By default, GPSS reuses this external table each time you restart the job. If the structure of either the source data or the destination Greenplum Database table is altered, GPSS may not be able to reuse the external table it initially created for the job.
You can configure GPSS to create a new external table for all new and restarted jobs submitted to a gpss
service instance by setting the ReuseTables
configuration property to false
in the gpss.json
file.