You will perform the following tasks when you use the Greenplum Streaming Server (GPSS) to load data into a Greenplum Database table:

  1. Ensure that you meet the prerequisites, and that you have configured and started the Greenplum Streaming Server.
  2. Identify the source and the format of the data and construct the load configuration file (optional).
  3. Create the target Greenplum Database table.
  4. Assign Greenplum Database role permissions to the table, if required, as described in Configuring Greenplum Database Role Privileges.
  5. Run the GPSS client.
  6. Verify the load operation as described in Checking for Load Errors.

Constructing the Load Configuration File

Note: The Greenplum Streaming Server requires a load configuration file when you use the gpsscli or gpkafka client utilities to load data into Greenplum Database. A load configuration file is not required if you are using the Tanzu Greenplum Connector for Informatica, the Tanzu Greenplum Connector for Apache NiFi, or a custom GPSS client application.

You configure a load operation from a data source to Greenplum Database via a YAML-formatted configuration file as described in gpsscli.yaml. This configuration file includes properties that identify the data source and format, information about the Greenplum Database connection and target table, and error and commit thresholds for the operation.

GPSS supports some pre-defined data formats, including Avro, binary, CSV, and JSON. GPSS also supports custom data formats. Refer to Understanding Custom Formatters for information on developing and using a custom formatter with GPSS.

GPSS supports version 1 (deprecated), version 2, and version 3 (Beta) load configuration file formats. Versions 1 and 2 use a similar YAML structure. Version 3 introduces a new YAML structure, organization, and keywords.

Refer to Constructing the gpkafka.yaml Configuration File for the YAML file format for a Kafka data source, Constructing the filesource.yaml Configuration File for the YAML file format for a file data source, or Constructing the rabbitmq.yaml Configuration File for a RabbitMQ data source.

Creating the Target Greenplum Table

You are required to pre-create the target Greenplum table before you initiate a data load operation to Greenplum Database from a GPSS client. You must be able to identify both the schema name and table name of the target table.

Note: The column data types that you specify for the target Greenplum Database table are informed by the data formats supported by the GPSS client.

Configuring Greenplum Database Role Privileges

If you load data to Greenplum Database from a GPSS client using a non-admin Greenplum user/role, the Greenplum administrator must assign the role certain privileges:

  • The role must have USAGE and CREATE privileges on any non-public database schema where:

    • The role writes data to a table in the schema, or
    • gpss creates external tables.

      For example:
    =# GRANT USAGE, CREATE ON SCHEMA <schema_name> TO <role_name>;
    
  • If the role writing to Greenplum Database is not a database or table owner, the role must have SELECT and INSERT privileges on each Greenplum Database table to which the role will write data:

    =# GRANT SELECT, INSERT ON <schema_name>.<table_name> TO <role_name>;
    
  • The role must have permission to create readable external tables using the Greenplum Database gpfdist protocol:

    =# ALTER ROLE <role_name> CREATEEXTTABLE(type = 'readable', protocol = 'gpfdist');
    

Refer to the Greenplum Database Managing Roles and Privileges documentation for further information on assigning privileges to Greenplum Database users.

Warning: Do not directly SELECT from an external table that GPSS creates for your job. Any data that you read in this manner will not be loaded into the Greenplum Database table.

Running the Client

You run a GPSS client to use the Greenplum Streaming Server to load data into Greenplum Database. Installation, configuration, and run procedures for a GPSS client are client-specific. For example, refer to the Tanzu Greenplum Connector for Informatica Documentation for information about this procedure for the Tanzu Greenplum Connector for Informatica. Similarly, the Tanzu Greenplum Connector for Apache NiFi Documentation provides the installation, configuration, and run procedures for the Connector for Apache NiFi.

You can also use the gpsscli client command line utility to load data into Greenplum Database.

Using the gpsscli Client Utility

The Greenplum Streaming Server (GPSS) includes the gpsscli client command utility. gpsscli provides subcommands to manage Greenplum Streaming Server load jobs and to view job status and progress:

Subcommand Description
convert Convert a version 1 or 2 load configuration file to version 3 (Beta) format
dryrun Perform a trial load without writing to Greenplum Database
help Display command help
list List jobs and their status
load Run one or more single-command load
progress Show job progress
remove Remove one or more jobs
start Start one or more jobs
status Show job status
stop Stop one or more jobs
submit Submit one or more jobs
wait Wait for a job to stop

All subcommands include options that allow you to specify the host and/or port number of the GPSS instance that you want to service the request (--config or --gpss-host and --gpss-port). You can also specify the directory to which GPSS writes gpsscli client log files (--log-dir).

Note: The Greenplum Streaming Server includes a client command utility named gpkafka. gpkafka is a wrapper around the gpss and gpsscli utilities that provides data load capabilities to Greenplum from a Kafka data source. Refer to Loading Kafka Data into Greenplum for information about loading data from Kafka into Greenplum Database.

A typical command workflow when using gpsscli to load data into Greenplum Database follows:

  1. Submit a Greenplum Streaming Server job.
  2. Start the Greenplum Streaming Server job.
  3. (Optional) Check the status or progress of the Greenplum Streaming Server job.
  4. (Optional) Wait for a Greenplum Streaming Server job to complete.
  5. Stop the Greenplum Streaming Server job.
  6. Remove the Greenplum Streaming Server job.

Alternatively, you can run a single-command load operation that submits a GPSS job on your behalf, starts the job, displays job progress, and may stop the GPSS job. See Running a Single-Command Load.

About the gpsscli Return Codes

All gpsscli subcommands return zero (0) on success and non-zero on failure. The specific return code values and failure descriptions are identified in the table below.

Return Code Description
0 Success
1 Internal error
2 RPC error
3 Job error; the status of one or more jobs that the subcommand operated on is Error

About GPSS Job Identification

You identify a job by a name that you specify or a unique identifier that GPSS assigns. Job names must be unique. Use the name to manage the job throughout its lifecycle.

GPSS uses a data source-specific combination of properties specified in the load configuration file to internally identify a job. For example, when it loads from a Kafka data source, GPSS uses the Kafka topic name, and the target Greenplum database, schema, and table names for internal job identification. GPSS creates internal and external tables for each job that are keyed off of these properties; these tables keep track of the progress of the load operation. GPSS considers any load configuration file submitted with the same value for these job-identifying properties to be the same internal job.

About External Table Naming and Lifecycle

GPSS creates a unique external table to load data for a specific job directly into Greenplum Database segments. Kafka job external table names begin with the prefix gpkafkaloadext_, file job external table names begin with gpfileloadext_. RabbitMQ job external table names begin with the prefix gprabbitmqloadext_. And by default, GPSS reuses this external table each time you restart the job.

The complete name and the lifecycle of a GPSS external table depends on the ReuseTables setting in the gpss.json server configuration file as described below.

ReuseTables=true

When ReuseTables is true (the default setting), GPSS reuses and does not drop an external table for a job.

GPSS creates the external table using the following naming format: gp<datasource>loadext_<hash>.

GPSS calculates the <hash> based on values of server configuration properties and load/job configuration properties that would change the external table definition. These properties include:

  • Gpfdist host and port number

  • Target Greenplum Database schema name

  • Target Greenplum Database table name

  • Target Greenplum Database table definition

  • Gpfdist use of encrypted communications

  • Source data format

  • Source data types

  • Formatter options

  • Job key; GPSS generates the job key as follows for the different data sources:

    • For file and s3 jobs, GPSS generates the job key from the database name, metadata schema name, target table name, and source URL.
    • For Kafka jobs, GPSS generates the job key from the output schema and table names, or the target schema names and table names plus source topic when the load job targets multiple outputs.
    • For RabbitMQ jobs, GPSS generates the job key from the database name, output schema and table names, the RabbitMQ stream or queue name, and the RabbitMQ virtual host.
  • Error Limit

ReuseTables=false

When ReuseTables is false, GPSS drops an external table, if one exists, when a job is (re)started.

Note: Repeated drop/create of external tables may cause bloating in the pg_attribute and pg_class system catalog tables; be sure to VACUUM these tables frequently.

GPSS creates the external table using the job name instead of a hash. The default job name is the base name of the YAML load configuration file. You set a custom job name when you specify the -–name <jobname> option to the gpsscli submit command.

The external table naming format when ReuseTables is false follows: gp<datasource>loadext_<jobname>.

Submitting a Job

To register a data load operation to Greenplum Database, you submit a job to the Greenplum Streaming Server using the gpsscli submit subcommand. When you submit a job, you provide a YAML-formatted configuration file that defines the properties of the load operation. Load properties include Greenplum-specific options, as well as properties that are specific to the data source. See gpsscli.yaml.

You identify a GPSS job by a name that you provide via the --name option. If you do not specify a job name, GPSS assigns and returns the base name of the load configuration file as the job identifier. You use this name or identifier to identify the job throughout its lifecycle.

The following example submits a GPSS job named order_upload whose load properties are defined in the configuration file named loadcfg.yaml:

$ gpsscli submit --name order_upload loadcfg.yaml

A newly-submitted GPSS job is in the Submitted state.

Starting a Job

To start a GPSS job, you run the gpsscli start subcommand. When you start a job, GPSS initiates the data load operation from the client. It sets up the connection to Greenplum Database and creates the external tables that it uses to load data directly into Greenplum segments.

The following example starts the GPSS job named order_upload:

$ gpsscli start order_upload

A job that starts successfully enters the Running state.

The default behaviour of gpsscli start is to return immediately. When you specify the --quit-at-eof option, the command reads data until it receives an EOF, and then stops the job. In this scenario, the job transitions to the Success or Error state when the command exits.

Checking Job Status, Progress, History

GPSS provides several commands to check the status of a running job(s):

  • The gpsscli list subcommand lists running (or all) jobs and their status:

    $ gpsscli list --all
    JobName        JobID                               GPHost     GPPort    DataBase    Schema     Table            Topic           Status  
    order_upload   d577cf37890b5b6bf4e713a9586e86c9    sys1       5432      testdb      public     order_sync       orders          JOB_RUNNING
    file_jobf3     cfb985bcdc8884352b4b8853f5d06bbe    sys1       5432      testdb      public     from_csvfile     file:///tmp/data.csvJOB_STOPPED
    
  • The gpsscli status subcommand displays the status of a specific job:

    $ gpsscli status order_upload
    ...,101565,info,Job order_upload, status JOB_RUNNING, errmsg [], time 2020-08-04T16:44:03.376216533Z
    

    Use the gpsscli status command to determine the status or success or failure of the operation. If the job status is Error, you will want to examine command output and log file messages for additional information. See Checking for Load Errors.

  • The gpsscli progress subcommand displays the progress of a running Kafka job. The command waits, and displays the job commit history and transfer speed at runtime. gpsscli progress returns when the job stops.

    Note: GPSS currently supports job progress tracking only for Kafka data sources.

    $ gpsscli progress order_upload
    StartTime		      EndTime			     MsgNum   MsgSize  	InsertedRecords	RejectedRecords Speed
    2019-10-15T21:56:49.950134Z   2019-10-15T21:56:49.964751Z    1000     78134     1000      	0               735.13KB/sec
    2019-10-15T21:56:49.976231Z   2019-10-15T21:56:49.984311Z    1000     77392     1000      	0               701.58KB/sec
    2019-10-15T21:56:49.993607Z   2019-10-15T21:56:50.003602Z    1000     77194     1000      	0               723.32KB/sec
    

    By default, gpsscli progress displays job progress by batch. To display job progress by partition, specify the --partition option to the subcommand:

    $ gpsscli progress order_upload --partition
    PartitionID   StartTime 	            EndTime	                   BeginOffset   EndOffset    MsgSize    Speed
    0    	      2019-10-15T21:56:54.80469Z    2019-10-15T21:56:54.830441Z    242000        243000       81033      652.29KB/sec
    0    	      2019-10-15T21:56:54.846354Z   2019-10-15T21:56:54.880517Z    243000        244000       81021      675.12KB/sec
    0    	      2019-10-15T21:56:54.893097Z   2019-10-15T21:56:54.904745Z    244000        245000       80504      673.67KB/sec
    

    GPSS also keeps track of the progress of each Kafka load job in a separate CSV-format log file. The job progress log files, named progress_<jobname>_<jobid>_<date>.log, reside in the GPSS server log directory. Refer to the Kafka data source Checking the Progress of a Load Operation topic for more information.

Waiting for a Job to Complete

You can use the gpsscli wait subcommand to wait for a running job to complete. A job is complete when there is no more data to read, or when an error is returned. Such jobs transition from the Running state to the Success or Error state.

$ gpsscli wait	order_upload

gpsscli wait exits when the job completes.

Stopping a Job

Use the gpsscli stop subcommand to stop a specific job. When you stop a job, GPSS writes any unwritten batched data to the Greenplum Database table and stops actively reading new data from the data source.

$ gpsscli stop	order_upload

A job that you stop enters the Stopped state.

Removing a Job

The gpsscli remove subcommand removes a GPSS job. When you remove a job, GPSS unregisters the job from its job list and releases all job-related resources.

$ gpsscli remove order_upload

Running a Single-Command Load

The gpsscli load subcommand initiates a data load operation. When you run gpsscli load, GPSS submits, starts, and displays the progress (Kafka job only) of a job on your behalf.

By default, gpsscli load loads all available data and then waits indefinitely for new messages to load. In the case of user interrupt or exit, the GPSS job remains in the Running state. You must explicitly stop the job with gpsscli stop when running in this mode.

When you provide the --quit-at-eof option to the command, the utility exits after it reads all published data, writes the data to Greenplum Database, and stops the job. The GPSS job is in the Success or Error state when the command returns.

Similar to the gpsscli submit command, gpsscli load takes as input an optional name and a YAML-format configuration file that defines the load properties:

$ gpsscli load --quit-at-eof loadcfg.yaml

Because the above command does not specify the --name option, GPSS assigns and returns the job identifier loadcfg when you run it.

About GPSS Job Initiation and Scheduling

In the default configuration, GPSS relies on the gpsscli subcommands that you submit to initiate and stop jobs. Once a job is started, GPSS does not automatically (re)start the job, and GPSS stops a job only when you have specified the --quit-at-eof option to the gpsscli subcommand.

You can configure GPSS to automatically stop and restart failed and running jobs via properties that you specify in the load configuration file.

About Retrying a Failed Job

You can configure GPSS to restart a failed job after a period of time that you specify; you can also configure the maximum number of times that GPSS retries the job. The load configuration file properties that govern failed job retry are located in the SCHEDULE: block.

Refer to Auto-Restarting a Failed Job for additional information about retrying a failed job.

About Job Scheduling

You can configure GPSS to automatically stop a running job after it has run for a period of time, or at a specific clock time after receiving an EOF. You can also configure a restart interval and the maximum number of times GPSS should restart a job that it stopped. The load configuration file properties that govern job scheduling in GPSS are also located in the SCHEDULE: block.

Checking for Load Errors

The Greenplum Streaming Server cannot directly return success or an error to the client. You can obtain success and error information for a GPSS load operation from gpsscli subcommand output, and from messages that GPSS writes to stdout or writes to the server, progress (Kafka jobs only), and/or client log files.

You can also view data formatting-specific errors encountered during a load operation in the error log.

Error checking activity may include:

Examining GPSS Log Files

GPSS writes server and client log messages to files as described in Managing GPSS Log Files.

GPSS version 1.4.0 and earlier wrote server and client log messages in the following format:

<date>:<time> <proc>:<user>:<host>:<proc_pid>-[<severity>]:-<message>

Starting in version 1.4.1, GPSS writes server and client log messages to a CSV-format file with the following header row:

timestamp,pid,level,message

Example message in a gpss log file:

20200804 09:42:41.16105,101417,info,using config file: /home/gpadmin/gpss.json

Example message in a gpsscli log file:

0200804 13:05:00.26568,117489,info,"stop job: fakejobname failed, code: Unknown, message: StopJob rpc failed: Job can not be found"

Determining Batch Load Status

To determine if GPSS loaded one or more batches of data to Greenplum Database successfully, first examine the status and progress of the job in question. The gpsscli status and gpsscli progress command output will identify if any known error conditions exist.

Also examine gpss command output and logs, searching for messages that identify the number of rows inserted and rejected. For a Kafka or RabbitMQ data source, search for:

... -[INFO]:- ... Inserted 9 rows
... -[INFO]:- ... Rejected 0 rows

Or, for load jobs originating from a file data source:

... -[INFO]:- ... inserted 5, rejected 1

Diagnosing an Error with a Trial Load

When a Kafka, file, or S3 job fails, you may choose to perform a trial run of the load operation to help diagnose the cause of the failure. The gpsscli dryrun command reads the data from the source and prepares to load the data, but does not actually insert it into Greenplum Database. The command returns the results of this processing, as well as the SQL commands that GPSS would run to complete the job.

Sample command that specifies a Kafka load configuration file:

$ gpsscli dryrun --include-error-process kjobcfgv3.yaml

Sample command output:

jobid: 01ba08c0f7fc8e3a49e2ad1ee48ef899
jobname: kjobcfgv3
jobtype: KafkaJob
tracking table name: gpkafka_tbl_1_column_text_01ba08c0f7fc8e3a49e2ad1ee48ef899
progress log file name: progress_kjobcfgv3_01ba08c0_20220218.log

Extension version checking:
    <SQL commands to check extension versions>

SQL of job:
    <SQL commands to fullfil the load operation>

Because the --include-error-process flag was specified for the Kafka job dry run, the output may include the following text:

Check format error:
    error sql query: <query>
    clean up error table: <query>

Check failed batch with expression error:
    get failing batch query: <query>

Reading the Error Log

If gpss command or log output indicates that rows were rejected, the output will identify an SQL query that you can run to view the data formatting errors that GPSS encountered while inserting data into a Greenplum Database table.

GPSS uses the LOG ERRORS feature of Greenplum Database external tables to detect and log data rows with formatting errors. The functions that you use to access and manage the error log, and the persistence of the error data, depend on the version of Greenplum Database that you are running GPSS against and the ReuseTables setting in effect when you started the gpss server.

If you are running GPSS against Greenplum Database versions 5.26+ or 6.6+ and you started the gpss server with ReuseTables=false:

  • GPSS automatically specifies LOG ERRORS PERSISTENTLY when it creates external tables for a job.
  • You use the gp_read_persistent_error_log() function to retrieve the error data.
  • The error data persists in the error log, and stays around until you explicitly remove it.

If you are running GPSS against older 5.x and 6.x versions of Greenplum, or you started the gpss server with ReuseTables=true:

  • You use the gp_read_error_log() function to retrieve the error data.
  • The error data is accessible from the error log until GPSS drops the external table. (If the gpss server is started with ReuseTables=true, GPSS does not drop an external table for a job. If ReuseTables=false, GPSS drops an external table, if one exists, when a job is (re)started.)

Refer to the Greenplum Database CREATE EXTERNAL TABLE documentation for more information about the external table error logs and error log management.

When you run the query to view the error log, you specify the name of the external table that GPSS used for the load operation. You identify the name of the external table by examining the gpss command output and/or log file messages. For best results, use the (short) time interval identified in the gpss output.

Kafka job external table names begin with gpkafkaloadext_, file job external table names begin with gpfileloadext_, and RabbitMQ job external table names begin with gprabbitmqloadext_.

The following example query displays the number of errors returned in a Kafka load job:

SELECT count(*) FROM gp_read_error_log('"public"."gpkafkaloadext_ae0eac9f8c94a487f30f749175c3afbf"')
    WHERE cmdtime > '2018-08-10 18:44:23.814651+00';

Warning: Do not directly SELECT from an external table that GPSS creates for your job. Any data that you read in this manner will not be loaded into the Greenplum Database table.

Auto-Restarting a Failed Job

A job may fail for temporary reasons. You can configure GPSS to automatically restart an errored job. GPSS automatic job restart is disabled by default. When the YAML-format load configuration file submitted for a job includes the non-default SCHEDULE: block MAX_RETRIES and RETRY_INTERVAL configuration settings, GPSS will attempt to restart the job if the job enters the Error state after it starts.

GPSS stops trying to restart the job when the configured retry limit is reached, or if the job is removed or the job configuration is updated during retry.

Jobs that you start via the gpsscli start, gpsscli load, and gpkafka load commands are eligible for automatic job restart on error. If you provide the --quit-at-eof flag or one of the --force-reset-*xxx* flags when you run the command and the Kafka or RabbitMQ job load configuration file specifies failed job retry settings, GPSS ignores the flag on any retry attempts that it initiates.

Redirecting Data to a Backup Table when GPSS Encounters Expression Evaluation Errors

GPSS catches data formatting errors during loading, and you can view these errors with gp_read_error_log() as described in Reading the Error Log.

There may be cases where your data is formatted correctly, but GPSS encounters an error when it evaluates a mapping or a filter expression. If the evaluation fails, GPSS cannot log and propagate the error back to the user.

For example, if you specify the following mapping expression in your load configuration file:

EXPRESSION: (jdata->>'id')::int

and the content of jdata->>'id' is a string that includes non-integer characters, the expression will fail when Greenplum Database evaluates it.

The load configuration property COMMIT: SAVE_FAILING_BATCH (versions 2 and 3 load configuration file formats only) governs whether or not GPSS saves a batch of data into a backup table before it writes the data to Greenplum Database. Saving the data in this manner aids loading recovery when GPSS encounters errors during the evaluation of expressions.

By default, SAVE_FAILING_BATCH is false, and GPSS immediately terminates a load job when it encounters an expression error.

When you set SAVE_FAILING_BATCH to true, GPSS writes all data in the batch to a backup table named gpssbackup_<jobhash>. GPSS writes both good and bad data to the backup table.

A backup table has the following columns:

Column Name Description
data The data associated with the row that GPSS attempted to load into Greenplum Database.
gpss_save_timestamp The time that GPSS inserted the row into the backup table.
gpss_expression_error The error that GPSS encountered when it ran the expression specified in the column mapping.

Sample backup table content for a failed load operation follows:

test=# SELECT * from gpssbackup_e0c5991570303703450bbac2ee8816bc; 

-[ RECORD 1 ]---------+----------------------------------------------------------------------------------------------------------- 
data                  | {"device": "agkNtzFnHIVASYNvo", "humidity": 91.3, "temperature": 9, "time": "2019-09-24T15:33:57.054175"} 
gpss_save_timestamp   | 2022-07-04 07:51:17.798469+00 
gpss_expression_error | division by zero 

-[ RECORD 2 ]---------+----------------------------------------------------------------------------------------------------------- 
data                  | {"device": "LcZQGnVXhORIKxWY", "humidity": 46.289, "temperature": 9, "time": "2019-09-24T15:33:57.054561"} 
gpss_save_timestamp   | 2022-07-04 07:51:17.798469+00 
gpss_expression_error | division by zero

GPSS continues to process Kafka messages even after it encounters an expression error. When GPSS encounters one or more expression errors in a batch, none of the good data in the batch is written to Greenplum. You can set the RECOVER_FAILING_BATCH (Beta) configuration property to instruct GPSS to automatically reload the good data in the batch, and retain only the error data in the backup table. GPSS displays additional information about the recovery process when you set this option.

Note: Using a backup table in this manner to hedge against expression errors may impact performance, especially when the data that you are loading has not been cleaned.

Preventing External Table Reuse

GPSS creates a unique external table to load data for a specific job directly into Greenplum Database segments. By default, GPSS reuses this external table each time you restart the job. If the structure of either the source data or the destination Greenplum Database table is altered, GPSS may not be able to reuse the external table it initially created for the job.

You can configure GPSS to create a new external table for all new and restarted jobs submitted to a gpss service instance by setting the ReuseTables configuration property to false in the gpss.json file.

check-circle-line exclamation-circle-line close-line
Scroll to top icon