The Greenplum Streaming Server (GPSS) is a gRPC server. GPSS uses gRPC protocol buffers (protobuf
) to define the GPSS client interfaces and their message interchange format. With protocol buffers, the structure of the data (messages) and the operations supported (services) are defined in a .proto
file, an ordinary text file. Refer to the Protocol Buffers Language Guide for detailed information about this data serialization framework.
The GPSS Batch Data API .proto
file defines the methods that clients can invoke to obtain metadata information from, and write data to, Greenplum Database. For example, a GPSS client that you develop can submit a request to list the tables that reside in a specific Greenplum schema, or to insert data into a specific Greenplum table.
The GPSS Batch Data API service definition follows. Copy/paste the contents to a file named gpss.proto
, and note the file system location.
syntax = "proto3";
import "google/protobuf/empty.proto";
import "google/protobuf/struct.proto";
import "google/protobuf/timestamp.proto";
package api;
option java_multiple_files = true;
// Connect service Request message
message ConnectRequest {
string Host = 1; // Host address of Greenplum coordinator; must be accessible from gpss server system
int32 Port = 2; // Greenplum coordinator port
string Username = 3; // User or role name that gpss uses to access Greenplum
string Password = 4; // User password
string DB = 5; // Database name
bool UseSSL = 6; // Use SSL or not; ignored, use the gpss config file to config SSL
int32 SessionTimeout = 7; // Release the session after idle for specified number of seconds
}
// Connect service Response message
message Session {
string ID = 1; // Id of client connection to gpss
}
// Operation mode
enum Operation {
Insert = 0; // Insert all data into table; behavior of duplicate key or data depends upon the constraints of the target table.
Merge = 1; // Insert and Update
Update = 2; // Update the value of "UpdateColumns" if "MatchColumns" match
Read = 3; // Not supported
}
// Required parameters of the Insert operation
message InsertOption {
repeated string InsertColumns = 1; // Names of the target table columns the insert operation should update; used in 'INSERT INTO', useful for partial loading
bool TruncateTable = 2; // Truncate table before loading?
int64 ErrorLimitCount = 4; // Error limit count; used by external table
int32 ErrorLimitPercentage = 5; // Error limit percentage; used by external table
}
// Required parameters of the Update operation
message UpdateOption {
repeated string MatchColumns = 1; // Names of the target table columns to compare when determining to update or not
repeated string UpdateColumns = 2; // Names of the target table columns to update if MatchColumns match
string Condition = 3; // Optional additional match condition; SQL syntax and used after the 'WHERE' clause
int64 ErrorLimitCount = 4; // Error limit count; used by external table
int32 ErrorLimitPercentage = 5; // Error limit percentage; used by external table
}
// Required parameters of the Merge operation
// Merge operation creates a session-level temp table in StagingSchema
message MergeOption {
repeated string InsertColumns = 1;
repeated string MatchColumns = 2;
repeated string UpdateColumns = 3;
string Condition = 4;
int64 ErrorLimitCount = 5;
int32 ErrorLimitPercentage = 6;
}
// Open service Request message
message OpenRequest {
Session Session = 1; // Session ID returned by Connect
string SchemaName = 2; // Name of the Greenplum Database schema
string TableName = 3; // Name of the Greenplum Database table
string PreSQL = 4; // SQL to execute before gpss loads the data
string PostSQL = 5; // SQL to execute after gpss loads the data
int32 Timeout = 6; // Time to wait before aborting the operation (seconds); not supported
string Encoding = 7; // Encoding of text data; not supported
string StagingSchema = 8; // Schema in which gpss creates external and temp tables; default is to create these tables in the same schema as the target table
FormatAvro avro = 9;
FormatBinary binary = 10;
FormatCSV csv = 11;
FormatDelimited delimited = 12;
FormatJSON json = 13;
FormatCustom custom = 14;
string proto = 15;
}
oneof Option { // Identify the type of write operation to perform
InsertOption InsertOption = 100;
UpdateOption UpdateOption = 101;
MergeOption MergeOption = 102;
}
}
message DBValue {
oneof DBType {
int32 Int32Value = 1;
int64 Int64Value = 2;
float Float32Value = 5;
double Float64Value = 6;
string StringValue = 7; // Includes types whose values are presented as string but are not a real string type in Greenplum; for example: macaddr, time with time zone, box, etc.
bytes BytesValue = 8;
google.protobuf.Timestamp TimeStampValue = 10; // Time without timezone
google.protobuf.NullValue NullValue = 11;
string OtherValue = 12;
}
}
message Row {
repeated DBValue Columns = 1;
}
message RowData {
bytes Data = 1; // A single protobuf-encoded Row
}
// Write service Request message
message WriteRequest {
Session Session = 1;
repeated RowData Rows = 2; // The data to load into the target table
}
// Close service Response message
message TransferStats { // Status of the data load operation
int64 SuccessCount = 1; // Number of rows successfully loaded
int64 ErrorCount = 2; // Number of error lines if Errorlimit is not reached
repeated string ErrorRows = 3; // Number of rows with incorrectly-formatted data; not supported
}
// Close service Request message
message CloseRequest {
Session session = 1;
int32 MaxErrorRows = 2; // -1: returns all, 0: nothing, >0: max rows
bool Abort = 3;
}
// ListSchema service request message
message ListSchemaRequest {
Session Session = 1;
}
message Schema {
string Name = 1;
string Owner = 2;
}
// ListSchema service response message
message Schemas {
repeated Schema Schemas = 1;
}
// ListTable service request message
message ListTableRequest {
Session Session = 1;
string Schema = 2; // 'public' is the default if no Schema is provided
}
// DescribeTable service request message
message DescribeTableRequest {
Session Session = 1;
string SchemaName = 2;
string TableName = 3;
}
enum RelationType {
Table = 0;
View = 1;
Index = 2;
Sequence = 3;
Special = 4;
Other = 255;
}
message TableInfo {
string Name = 1;
RelationType Type = 2;
}
// ListTable service response message
message Tables {
repeated TableInfo Tables = 1;
}
// DescribeTable service response message
message Columns {
repeated ColumnInfo Columns = 1;
}
message ColumnInfo {
string Name = 1; // Column name
string DatabaseType = 2; // Greenplum data type
bool HasLength = 3; // Contains length information?
int64 Length = 4; // Length if HasLength is true
bool HasPrecisionScale = 5; // Contains precision or scale information?
int64 Precision = 6;
int64 Scale = 7;
bool HasNullable = 8; // Contains Nullable constraint?
bool Nullable = 9;
}
service Gpss {
// Establish a connection to Greenplum Database; returns a Session object
rpc Connect(ConnectRequest) returns (Session) {}
// Disconnect, freeing all resources allocated for a session
rpc Disconnect(Session) returns (google.protobuf.Empty) {}
// Prepare and open a table for write
rpc Open(OpenRequest) returns(google.protobuf.Empty) {}
// Write data to table
rpc Write(WriteRequest) returns(google.protobuf.Empty) {}
// Close a write operation
rpc Close(CloseRequest) returns(TransferStats) {}
// List all available schemas in a database
rpc ListSchema(ListSchemaRequest) returns (Schemas) {}
// List all tables and views in a schema
rpc ListTable(ListTableRequest) returns (Tables) {}
// Decribe table metadata(column name and column type)
rpc DescribeTable(DescribeTableRequest) returns (Columns) {}
}
// The format of the source data.
// If there is an intermediate column inside Format,
// then the source data will be transformed to the intermediate column.
// If there is no source_column_name in Format,
// then the column name will be the Target table column name,
// and the source column data type will be matched with Target column type.
message SourceDataFormat {
oneof unit {
FormatAvro avro = 1;
FormatBinary binary = 2;
FormatCSV csv = 3;
FormatDelimited delimited = 4;
FormatJSON json = 5;
FormatCustom custom = 6;
string protobuf = 7;
}
}
message FormatAvro {
string source_column_name = 1; // The source column name
string schema_url = 2; // If specified, gpss requests the avro schema from url
bool bytes_to_base64 = 3; // When true and schema_url is specified, gpss converts bytes field in avro message to base64-encoded string
bool ignore_deserialize_error = 4; // When true, gpss ignores avro deserialize errors, and puts data into log error
string schema_path_on_gpdb = 5; // Used for standalone avro schema; if exists, gpss retrieves the avro schema from the path on every node in the greenplum cluster
string schema_ca_on_gpdb = 6; // The path to the specified CA certificate file for gpss verifying the peer; the CA file must exist at that path on every greenplum segment
string schema_cert_on_gpdb = 7; // The path to the specified client certificate file for gpss connecting to HTTPS schema registry; required if the registry's client authentication is enabled
string schema_key_on_gpdb = 8; // The path to the specified private key file for gpss connecting to HTTPS schema registry; required if the registry's client authentication is enabled
string schema_min_tls_version = 9; // The minimum transport layer security (TLS) version that gpss requests on the registry connection; the default value is 1.0, and gpss supports minimum TLS versions of 1.0, 1.1, 1.2, and 1.3
}
message FormatBinary {
string source_column_name = 1; // The source column name
}
message FormatCSV {
repeated IntermediateColumn columns = 1; // Source column, move to format.Column c1:bin, c2:json ...
string delimiter = 2;
string quote = 3;
string null = 4;
string escape = 5;
string force_not_null = 6;
string newline = 7;
bool fill_missing_fields = 8;
bool header = 9;
}
message FormatDelimited {
repeated IntermediateColumn columns = 1; // The source column names
string delimiter = 2;
}
message FormatJSON {
IntermediateColumn column = 1; // The source column name
}
message FormatCustom {
repeated IntermediateColumn columns = 1;
string name = 2;
repeated string options = 3;
}
// IntermediateColumn is an intermediate result after parsing SourceDataFormat,
// IntermediateColumn looks like a virtual table column. It
// will be used to filter or convert types.
// The Source Data is parsed to a table column style data.
// source column: name and type, the type must be valid.
// ex: convert a string "123" to 123 integer.
// Caution: the FormatJSON is not a decomposed format, json is treated as an integral type.
message IntermediateColumn {
string name = 1;
string type = 2; // Greenplum Database basic data types are supported
}
The GPSS Data API service definition includes messages that represent rows and columns of supported Greenplum Database data types.
Because Greenplum Database supports more data types than protobuf
, the GPSS Data API provides a mapping between the types as follows:
gRPC Type | Greenplum Type |
---|---|
Int32Value | integer, serial |
Int64Value | bigint, bigserial |
Float32Value | real |
Float64Value | double |
StringValue | text (any kind of data) |
BytesValue | bytea |
TimeStampValue | time, timestamp (without time zone) |
In the simplest case, all Greenplum data types can be mapped to a string.