NoteThe VMware Tanzu Greenplum streaming server Kafka data source is also known as the Greenplum-Kafka Integration.
Apache Kafka is a fault-tolerant, low-latency, distributed publish-subscribe message system. The Tanzu Greenplum streaming server supports loading Kafka data from the Apache and Confluent Kafka distributions. Refer to the Apache Kafka Documentation for more information about Apache Kafka.
A Kafka message may include a key and a value, and may be comprised of a single line or multiple lines. Kafka stores streams of messages (or records) in categories called topics. A Kafka producer publishes records to partitions in one or more topics. A Kafka consumer subscribes to a topic and receives records in the order that they were sent within a given Kafka partition. Kafka does not guarantee the order of data originating from different Kafka partitions.
You can use the gpsscli or gpkafka load utilities to load Kafka data into VMware Tanzu Greenplum.
Note
gpkafka load
is a wrapper around the Tanzu Greenplum streaming server gpss and gpsscli commands. VMware recommends that you migrate to using these utilities directly.
Both the gpss
server and the gpkafka load
utilities are a Kafka consumer. They ingest streaming data from a single Kafka topic, using Tanzu Greenplum readable external tables to transform and insert or update the data into a target Greenplum table. You identify the Kafka source, data format, and the Greenplum connection options and target table definition in a YAML-formatted load configuration file that you provide to the utility. In the case of user interrupt or exit, the utility resumes a subsequent data load operation specifying the same Kafka topic and target Tanzu Greenplum table names from the last recorded offset.
The Tanzu Greenplum streaming server requires Kafka version 0.11 or newer for exactly-once delivery assurance. You can run with an older version of Kafka (but lose the exactly-once guarantee) by adding the following PROPERTIES
or rdkafka_prop
(v3) block to your gpkafka.yaml
load configuration file:
PROPERTIES:
api.version.request: false
broker.version.fallback: 0.8.2.1
You will perform the following tasks when you use the Tanzu Greenplum streaming server to load Kafka data into a Tanzu Greenplum table:
gpkafka
log files is gpkafka_*date*.log
.)Before using the gpsscli
or gpkafka
utilities to load Kafka data to Tanzu Greenplum, ensure that you:
The Tanzu Greenplum streaming server supports Kafka message key and value data in the following formats:
Format | Description |
---|---|
avro | Avro-format data. GPSS supports:
In all cases, GPSS reads Avro data from Kafka only as a single JSON-type column. GPSS supports |
binary | Binary format data. GPSS reads binary data from Kafka only as a single bytea-type column. |
csv | Comma-delimited text format data. |
custom | Data of a custom format, parsed by a custom formatter function. |
delimited | Text data separated by a configurable delimiter. The `delimited` data format supports a multi-byte delimiter. |
json, jsonl (version 2 only) | JSON- or JSONB-format data. Specify the json format when the file is in JSON or JSONB format. GPSS can read JSON data as a single object or can read a single JSON record per line. You must define a mapping if you want GPSS to write the data into specific columns in the target Tanzu Greenplum table.
Note: GPSS supports JSONB-format data only when loading to Greenplum 6.
Note: Specify
FORMAT: jsonl in version 2 format load configuration files. Specify
json with
is_jsonl: true in version 3 format load configuration files.
|
To write Kafka message data into a Tanzu Greenplum table, you must identify the data format in the load configuration file.
Specify the avro
format when your Kafka message data is a single-object encoded Avro file or you are using the Confluent Schema Registry to load Avro message key and/or value data. (If the schema registry is SSL-secured, refer to Accessing an SSL-Secured Schema Registry for configuration details.) GPSS reads Avro data from Kafka and loads it into a single JSON-type column. You must define a mapping if you want GPSS to write the data into specific columns in the target Tanzu Greenplum table.
Use the binary
format when your Kafka message data is a stream of bytes. GPSS reads binary data from Kafka and loads it into a single bytea-type column.
Use the csv
format when your Kafka message data is comma-delimited text and conforms to RFC 4180. The message content may not contain line ending characters (CR and LF).
Data in csv
format may appear in Kafka messages as follows:
"1313131","12","backorder","1313.13"
"3535353","11","shipped","761.35"
"7979797","11","partial","18.72"
The Tanzu Greenplum streaming server provides a custom data formatter plug-in framework for Kafka messages using user-defined functions. The type of Kafka message data processed by a custom formatter is formatter-specific. For example, a custom formatter may process compressed or complex data.
Refer to Custom Formatter for Kafka for an example custom formatter that loads Kafka data into Tanzu Greenplum.
The Tanzu Greenplum streaming server supports loading Kafka data delimited by one or more characters that you specify. Use the delimited
format for such data. The delimiter may be a multi-byte value and up to 32 bytes in length. You can also specify quote and escape characters, and an end-of-line prefix.
NoteThe delimiter may not contain the quote or escape characters.
When you specify a quote character:
\n
, which do not require additional escape if they are quoted.When you do not specify a quote character:
The escape character is optional.
If you do not specify an escape character, GPSS treats the delimiter as the column separator, and treats any end-of-line prefix plus \n
as the row separator.
If you do specify an escape character:
\n
as the row separator.Sample data using a pipe ('|') delimiter character follows:
1313131|12|backorder|1313.13
3535353|11|shipped|761.35
7979797|11|partial|18.72
Specify the json
format when your Kafka message data is in JSON or JSONB format and you want GPSS to read JSON data from Kafka as a single object into a single column (per the JSON specification, newlines and white space are ignored). You must define a mapping if you want GPSS to write the data into specific columns in the target Tanzu Greenplum table.
NoteGPSS supports JSONB-format data only when loading to Greenplum 6.
Specify FORMAT: jsonl
in version 2 format load configuration files or specify json
with is_jsonl: true
in version 3 format load configuration files when your Kafka message data is in JSON format, single JSON record per line. You must define a mapping if you want GPSS to write the data into specific columns in the target Tanzu Greenplum table.
Sample JSON message data:
{ "cust_id": 1313131, "month": 12, "amount_paid":1313.13 }
{ "cust_id": 3535353, "month": 11, "amount_paid":761.35 }
{ "cust_id": 7979797, "month": 11, "amount_paid":18.82 }
A Kafka message may contain a single line or multiple lines.
GPSS supports the following combinations of single and multiple line messages for the key and value data input components:
Key | Value |
---|---|
single-line | none |
none | single-line |
single-line | single-line |
multi-line | none |
none | multi-line |
GPSS does not support multiple-line messages for both the key and value.
A custom data formatter for Kafka messages is a user-defined function. If you are using a custom formatter, you must create the formatter function and register it in each database in which you will use the function to write Kafka data to Greenplum tables.
You configure a data load operation from Kafka to Tanzu Greenplum via a YAML-formatted configuration file. This configuration file includes parameters that identify the source Kafka data and information about the Tanzu Greenplum connection and target table, as well as error and commit thresholds for the operation.
The Tanzu Greenplum streaming server supports three versions of the YAML configuration file: version 1 (deprecated), version 2, and version 3. Versions 2 and 3 of the configuration file format supports all features of Version 1 of the configuration file, and introduce support for loading both the Kafka message key and value to Greenplum, as well as loading meta data.
Refer to the gpkafka.yaml reference page for Version 1 configuration file contents and syntax. Refer to the gpkafka-v2.yaml reference page for Version 2 configuration file format and the configuration parameters that this version supports. gpkafka-v3.yaml describes the Version 3 format. You may find a quick start guide and sample YAML configuration files under the $GPHOME/docs/cli_help/gpss
directory.
Contents of a sample Version 2 YAML configuration file named loadcfg2.yaml
follows:
DATABASE: ops
USER: gpadmin
PASSWORD: changeme
HOST: mdw-1
PORT: 5432
VERSION: 2
KAFKA:
INPUT:
SOURCE:
BROKERS: kbrokerhost1:9092
TOPIC: customer_expenses2
PARTITIONS: (1, 2...4, 7)
VALUE:
COLUMNS:
- NAME: c1
TYPE: json
FORMAT: avro
AVRO_OPTION:
SCHEMA_REGISTRY_ADDR: http://localhost:8081
KEY:
COLUMNS:
- NAME: key
TYPE: json
FORMAT: avro
AVRO_OPTION:
SCHEMA_REGISTRY_ADDR: http://localhost:8081
FILTER: (c1->>'month')::int = 11
ERROR_LIMIT: 25
OUTPUT:
SCHEMA: payables
TABLE: expenses2
MAPPING:
- NAME: customer_id
EXPRESSION: (c1->>'cust_id')::int
- NAME: newcust
EXPRESSION: ((c1->>'cust_id')::int > 5000000)::boolean
- NAME: expenses
EXPRESSION: (c1->>'expenses')::decimal
- NAME: tax_due
EXPRESSION: ((c1->>'expenses')::decimal * .075)::decimal
METADATA:
SCHEMA: gpkafka_internal
COMMIT:
MINIMAL_INTERVAL: 2000
You identify the Tanzu Greenplum connection options via the DATABASE
, USER
, PASSWORD
, HOST
, and PORT
parameters.
The VERSION
parameter identifies the version of the GPSS YAML configuration file. The default version is Version 1. You must specify version 2
or version v3
.
Specify the Kafka brokers and topic of interest using the SOURCE
block. You must create the Kafka topic prior to loading data. By default, GPSS reads Kafka messages from all partitions. You may specify a single, a comma-separated list, and/or a range of partition numbers to restrict the partitions from which GPSS reads messages. The PARTITIONS
property is supported only for version 2 and 3 load configuration file formats.
NoteYou must configure different jobs that load from the same Kafka topic to the same Tanzu Greenplum table with non-overlapping
PARTITIONS
values.
When you provide a VALUE
block, you must specify the COLUMNS
and FORMAT
parameters. The VALUE:COLUMNS
block includes the name and type of each data element in the Kafka message. The default source-to-target data mapping behaviour of GPSS is to match a column name as defined in COLUMNS:NAME
with a column name in the target Tanzu Greenplum OUTPUT:TABLE
:
NAME: __IGNORED__
to omit a Kafka message value data element from the load operation.The VALUE:FORMAT
keyword identifies the format of the Kafka message value. GPSS supports comma-delimited text format (csv
) and data that is separated by a configurable delimiter (delimited
). GPSS also supports binary (binary
), single object or single record per line JSON/JSONB (json
or jsonl
), custom (custom
), and Avro (avro
) format value data.
When you provide a META
block, you must specify a single JSON-type COLUMNS
and the FORMAT: json
. Meta data for Kafka includes the following properties:
topic
- text
partition
- int
offset
- bigint
timestamp
- bigint
When you provide a KEY
block, you must specify the COLUMNS
and FORMAT
parameters. The KEY:COLUMNS
block includes the name and type of each element of the Kafka message key, and is subject to the same restrictions as identified for VALUE:COLUMNS
above. The KEY:FORMAT
keyword identifies the format of the Kafka message key. GPSS supports avro
, binary
, csv
, custom
, delimited
, json
, and jsonl
format key data.
The FILTER
parameter identifies a filter to apply to the Kafka input messages before the data is loaded into Tanzu Greenplum. If the filter evaluates to true
, GPSS loads the message. The message is dropped if the filter evaluates to false
. The filter string must be a valid SQL conditional expression and may reference one or more KEY
or VALUE
column names.
The ERROR_LIMIT
parameter identifies the number of errors or the error percentage threshold after which GPSS should exit the load operation. The default ERROR_LIMIT
is zero; the load operation is stopped when the first error is encountered.
NoteYou must specify only one of the
OUTPUT
orOUTPUTS
blocks.
You identify the target Tanzu Greenplum schema name and table name via the KAFKA:OUTPUT:
SCHEMA
and TABLE
parameters. You must pre-create the Tanzu Greenplum table before you attempt to load Kafka data.
The default load mode is to insert Kafka data into the Tanzu Greenplum table. GPSS also supports updating and merging Kafka message data into a Greenplum table. You specify the load MODE
, the MATCH_COLUMNS
and UPDATE_COLUMNS
, and any UPDATE_CONDITION
s that must be met to merge or update the data. In MERGE
MODE
, you can also specify ORDER_COLUMNS
to filter out duplicates and a DELETE_CONDITION
.
You can override the default mapping of the INPUT
VALUE:COLUMNS
and KEY:COLUMNS
by specifying a MAPPING
block in which you identify the association between a specific column in the target Tanzu Greenplum table and a Kafka message value or key data element. You can also map the META
data columns, and map a Tanzu Greenplum table column to a value expression.
NoteWhen you specify a
MAPPING
block, ensure that you provide entries for all Kafka data elements of interest - GPSS does not automatically match column names when you provide aMAPPING
.
Note(version 2) You must specify only one of the
OUTPUT
orOUTPUTS
blocks.
If you want to load from a single Kafka topic to multiple Tanzu Greenplum tables, you provide an OUTPUTS:TABLE
(version 2) or targets:gpdb:tables:table
(version 3) block for each table, and specify the properties that identify the data targeted to each.
MERGE
mode is similar to an UPSERT
operation; GPSS may insert new rows in the database, or may update an existing database row that satisfies match and update conditions. GPSS deletes rows in MERGE
mode when the data satisfies an optional DELETE_CONDITION
that you specify.
GPSS stages a merge operation in a temporary table, generating the SQL to populate the temp table from the set of OUTPUT
configuration properties that you provide.
GPSS uses the following algorithm for MERGE
mode processing:
MAPPINGS
.FILTER
.MATCH_COLUMNS
and ORDER_COLUMNS
to filter out duplicates.MATCH_COLUMNS
, UPDATE_COLUMNS
, and UPDATE_CONDITION
.MATCH_COLUMNS
and the DELETE_CONDITION
.The KAFKA:METADATA:SCHEMA
parameter specifies the name of the Tanzu Greenplum schema in which GPSS creates external and history tables.
GPSS commits Kafka data to the Tanzu Greenplum table at the row and/or time intervals that you specify in the KAFKA:COMMIT:
MAX_ROW
and/or MINIMAL_INTERVAL
parameters. If you do not specify these properties, GPSS commits data at the default MINIMAL_INTERVAL
, 5000ms.
You can configure GPSS to run a task (user-defined function or SQL commands) after GPSS reads a configurable number of batches from Kafka. Use the KAFKA:TASK:
POST_BATCH_SQL
and BATCH_INTERVAL
configuration parameters to specify the task and the batch interval.
Specify a KAFKA:PROPERTIES
block to set Kafka consumer configuration properties. GPSS sends the property names and values to Kafka when it instantiates a consumer for the load operation.
You can specify any data format in the Version 2 configuration file KEY:FORMAT
and VALUE:FORMAT
parameters, with some restrictions. The Tanzu Greenplum streaming server supports the following KEY:FORMAT
and VALUE:FORMAT
combinations:
KEY:FORMAT | VALUE:FORMAT | Description |
---|---|---|
any | none (VALUE block omitted) |
GPSS loads only the Kafka message key data, subject to any MAPPING that you specify, to Tanzu Greenplum. |
none (KEY block omitted) |
any | Equivalent to configuration file Version 1. GPSS ignores the Kafka message key and loads only the Kafka message value data, subject to any MAPPING that you specify, to Tanzu Greenplum. |
csv | any | Not permitted. |
any | csv | Not permitted. |
avro, binary, delimited, json, jsonl | avro, binary, delimited, json, jsonl | Any combination is permitted. GPSS loads both the Kafka message key and value data, subject to any MAPPING that you specify, to Tanzu Greenplum. |
When you specify FORMAT: json
or FORMAT: jsonl
, valid COLUMN:TYPE
s for the data include json
or jsonb
. You can also specify the new GPSS gp_jsonb
or gp_json
column types.
gp_jsonb
is an enhanced JSONB type that adds support for \u
escape sequences and unicode. For example, gp_jsonb
can escape \uDD8B
and \u0000
as text format, but jsonb
treats these characters as illegal.gp_json
is an enhanced JSON type that can tolerate certain illegal unicode sequences. For example, gp_json
automatically escapes incorrect surrogate pairs and processes \u0000
as \\u0000
. Note that unicode escape values cannot be used for code point values above 007F
when the server encoding is not UTF8
.You can use the gp_jsonb
and gp_json
data types as follows:
As the COLUMN:TYPE
when the target Tanzu Greenplum table column type is json
or jsonb
.
In a MAPPING
when the target Tanzu Greenplum column is text
or varchar
. For example:
EXPRESSION: (j->>'a')::text
In a MAPPING
when FORMAT: avro
and the target Tanzu Greenplum column is json
or jsonb
. For example:
EXPRESSION: j::gp_jsonb
or
EXPRESSION: j::gp_json
In a MAPPING
when FORMAT: avro
and the target Tanzu Greenplum column is text
or varchar
. For example:
EXPRESSION: (j::gp_jsonb->>'a')::text
or
EXPRESSION: (j::gp_json->>'a')::text
NoteThe
gp_jsonb
andgp_json
data types are defined in an extension nameddataflow
. You mustCREATE EXTENSION dataflow;
in each database in which you choose to use these data types.
Preserving Ill-Formed JSON Escape Sequences
GPSS exposes a configuration parameter that you can use with the gp_jsonb
and gp_json
types. The name of this parameter is gpss.json_preserve_ill_formed_prefix
. When set, GPSS does not return an error when it encounters an ill-formed JSON escape sequence with these types, but instead prepends it with the prefix that you specify.
For example, if gpss.json_preserve_ill_formed_prefix
is set to the string "##"
as follows:
SET gpss.json_preserve_ill_formed_prefix = "##";
and GPSS encounters an ill-formed JSON sequence such as the orphaned low surrogate \ude04X
, GPSS writes the data as ##\ude04X
instead.
You can define a MAPPING
between the Kafka input data (VALUE:COLUMNS
, KEY:COLUMNS
, and META:COLUMNS
) and the columns in the target Tanzu Greenplum table. Defining a mapping may be useful when you have a multi-field input column (such as a JSON-type column), and you want to assign individual components of the input field to specific columns in the target table.
You might also use a MAPPING
to assign a value expression to a target table column. The expression must be one that you could specify in the SELECT
list of a query, and can include a constant value, a column reference, an operator invocation, a built-in or user-defined function call, and so forth.
If you choose to map more than one input column in an expression, you can can create a user-defined function to parse and transform the input column and return the columns of interest.
For example, suppose a Kafka producer emits the following JSON messages to a topic:
{ "customer_id": 1313131, "some_intfield": 12 }
{ "customer_id": 77, "some_intfield": 7 }
{ "customer_id": 1234, "some_intfield": 56 }
You could define a user-defined function, udf_parse_json()
, to parse the data as follows:
=> CREATE OR REPLACE FUNCTION udf_parse_json(value json)
RETURNS TABLE (x int, y text)
LANGUAGE plpgsql AS $$
BEGIN
RETURN query
SELECT ((value->>'customer_id')::int), ((value->>'some_intfield')::text);
END $$;
This function returns the two fields in each JSON record, casting the fields to integer and text, respectively.
An example MAPPING
for the topic data in a JSON-type KAFKA:INPUT:COLUMNS
named jdata
follows:
MAPPING:
cust_id: (jdata->>'customer_id')
field2: ((jdata->>'some_intfield') * .075)::decimal
j1, j2: (udf_parse_json(jdata)).*
The Tanzu Greenplum table definition for this example scenario is:
=> CREATE TABLE t1map( cust_id int, field2 decimal(7,2), j1 int, j2 text );
When you specify AVRO_OPTION:BYTES_TO_BASE64
, GPSS maps Avro bytes fields to base64-encoded strings. You can provide a MAPPING
to decode these strings and write the data to a Greenplum bytea
column.
For example, if the Avro schema is:
{
"type": "record",
"name": "bytes_test",
"fields": [
{"name": "id", "type": "long"},
{"name": "string", "type": "string"},
{"name": "bytes", "type": "bytes"},
{
"name": "inner_record",
"type": {
"type": "map",
"values": {
"type": "bytes",
"name": "nested_bytes"
}
}
}
]
}
And if your load configuration file includes these input property settings:
VALUE:
COLUMNS:
- NAME: c1
TYPE: json
FORMAT: avro
AVRO_OPTION:
SCHEMA_REGISTRY_ADDR: http://localhost:8081
BYTES_TO_BASE64: true
You can define a MAPPING
to decode the encoded strings as follows:
MAPPING:
- NAME: id
EXPRESSION: (c1->>'id')::int
- NAME: bytes1
EXPRESSION: (decode(c1->>'bytes', 'base64'))
- NAME: bytes2
EXPRESSION: (decode((c1->>'inner_record')::json->>'nested_bytes', 'base64'))
This mapping decodes the bytes1
and bytes2
fields to the Greenplum bytea
data type. GPSS would expect to load these mapped fields to a Greenplum table with the following definition:
CREATE TABLE avbyte( id int, bytes1 bytea, bytes2 bytea);
You must pre-create the Greenplum table before you load Kafka data into Tanzu Greenplum. You use the KAFKA:OUTPUT:
SCHEMA
and TABLE
load configuration file parameters to identify the schema and table names.
The target Greenplum table definition must include each column that GPSS will load into the table. The table definition may include additional columns; GPSS ignores these columns, and loads no data into them.
The name and data type that you specify for a column of the target Tanzu Greenplum table must match the name and data type of the related, non-ignored Kafka message element. If you have defined a column mapping, the name of the Tanzu Greenplum column must match the target column name that you specified for the mapping, and the type must match the target column type or expression that you define.
The CREATE TABLE
command for the target Tanzu Greenplum table receiving the Kafka topic data defined in the loadcfg2.yaml
file presented in the Constructing the gpkafka.yaml Configuration File section follows:
testdb=# CREATE TABLE payables.expenses2( customer_id int8, newcust bool,
expenses decimal(9,2), tax_due decimal(7,2) );
Note
gpkafka load
is a wrapper around the Tanzu Greenplum streaming server (GPSS)gpss
andgpsscli
utilities. Starting in Tanzu Greenplum streaming server version 1.3.2,gpkafka load
no longer launches agpss
server instance, but rather calls the backend server code directly.
When you run gpkafka load
, the command submits, starts, and stops a GPSS job on your behalf.
VMware recommends that you migrate to using the GPSS utilities directly.
You run the gpkafka load
command to load Kafka data to Greenplum. When you run the command, you provide the name of the configuration file that defines the parameters of the load operation. For example:
$ gpkafka load loadcfg2.yaml
The default mode of operation for gpkafka load
is to read all pending messages and then to wait for, and then consume, new Kafka messages. When running in this mode, gpkafka load
waits indefinitely; you can interrupt and exit the command with Control-c.
To run the command in batch mode, you provide the --quit-at-eof
option. In this mode, gpkafka load
exits when there are no new messages in the Kafka stream.
gpkafka load
resumes a subsequent data load operation specifying the same Kafka topic and target Tanzu Greenplum table names from the last recorded offset.
Refer to the gpkafka load reference page for additional information about this command.
NoteGPSS cannot detect the addition of a new Kafka partition while a load operation is in progress. You must stop, and then restart the load operation to read Kafka messages published to the new partition.
The gpkafka load
command uses the gpfdist
or gpfdists
protocol to load data into Greenplum. You can configure the protocol used for the load request by providing the --config gpfdistconfig.json
option to the command, where gpfdistconfig.json
identifies a GPSS configuration file that specifies gpfdist
configuration in a Gpfdist
protocol block. Refer to Configuring the Tanzu Greenplum Streaming Server in the Tanzu Greenplum streaming server documentation for detailed information about the file format and properties supported.
Note
gpkafka load
reads the configuration specified in theGpfdist
protocol block of thegpfdistconfig.json
file; it ignores the GPSS configuration specified in theListenAddress
block of the file.
Or, you may choose to provide gpfdist
host or port configuration settings on the gpkafka load
command line by specifying the --gpfdist-host hostaddr
or --gpfdist-port portnum
options to the command. Any options that you specify on the command line override settings provided in the gpfdistconfig.json
file.
Kafka maintains a partitioned log for each topic, assigning each record/message within a partition a unique sequential id number. This id is referred to as an offset. Kafka retains, for each gpkafka load
invocation specifying the same Kafka topic and Tanzu Greenplum table names, the last offset within the log consumed by the load operation. The Tanzu Greenplum streaming server also records this offset value. Refer to Understanding Kafka Message Offset Management for more detailed information about how GPSS manages message offsets.
Kafka persists a message for a configurable retention time period and/or log size, after which it purges messages from the log. Kafka topics or messages can also be purged on demand. This may result in an offset mismatch between Kafka and the Tanzu Greenplum streaming server.
gpkafka load
returns an error if its recorded offset for the Kafka topic and Tanzu Greenplum table combination is behind that of the current earliest Kafka message offset for the topic, or when the earliest and latest offsets do not match.
When you receive one of these messages, you can choose to:
Resume the load operation from the earliest available message published to the topic by specifying the --force‑reset‑earliest
option to gpkafka load
:
$ gpkafka load --force-reset-earliest loadcfg2.yaml
Load only new messages published to the Kafka topic, by specifying the ‑‑force‑reset‑latest
option with the command:
$ gpkafka load --force-reset-latest loadcfg2.yaml
Load messages published since a specific timestamp (milliseconds since epoch), by specifying the --force‑reset‑timestamp
option to gpkafka load
. To determine the create time epoch timestamp for a Kafka message, run the Kafka console consumer on the topic specifying the --property print.timestamp=true
option, and review the output. You can also use a converter such as EpocConverter to convert a human-readable date to epoch time.
$ gpkafka load --force-reset-timestamp 1571066212000 loadcfg2.yaml
NoteSpecifying the
--force‑reset‑<xxx>
options when loading data may result in missing or duplicate messages. Use of these options outside of the offset mismatch scenario is discouraged.
Alternatively, you can provide the FALLBACK_OPTION
(version 2) or fallback_option
(version 3) property in the load configuration file to instruct GPSS to automatically read from the specified offset when it detects a mismatch.
Starting in version 1.4.1, GPSS keeps track of the progress of each Kafka load job in a separate CSV-format log file. The progress log file for a specific job is named progress_*jobname*_*jobid*_*date*.log
, and resides in the following log directory:
gpkafka load
command, GPSS writes the progress log file to the directory that you specified with the -l | --log-dir
option to the command, or to the $HOME/gpAdminLogs
directory.gpsscli
commands, GPSS writes the progress log file to the directory that you specified with the -l | --log-dir
option when you started the GPSS server instance, or to the $HOME/gpAdminLogs
directory.A progress log file includes information and statistics about the load time, data size, and speed. It also includes the number of rows written to the Greenplum table, the number of rows rejected by Greenplum, and the total number of rows operated on by GPSS (inserted rows plus rejected rows).
A progress log file includes the following header row:
timestamp,pid,batch_id,start_time,end_time,total_byte,speed,total_read_count,inserted_rows,rejected_rows,total_rows
Example Kafka progress log message:
20220704 10:17:00.52827,101417,1,2022-07-04 17:16:33.421+00,2022-07-04 17:17:00.497+00,79712,2.88KB,997,991,6,997
When GPSS reads Kafka data in jsonl
, delimited
, or csv
formats, a Kafka message may contain multiple rows. For these formats, the progress log total_read_count
identifies the Kafka message number, while total_rows
identifies the total number of inserted and rejected rows.