The Greenplum Streaming Server (GPSS) is a gRPC server. GPSS uses gRPC protocol buffers (protobuf) to define the GPSS client interfaces and their message interchange format. With protocol buffers, the structure of the data (messages) and the operations supported (services) are defined in a .proto file, an ordinary text file. Refer to the Protocol Buffers Language Guide for detailed information about this data serialization framework.

The GPSS Batch Data API .proto file defines the methods that clients can invoke to obtain metadata information from, and write data to, Greenplum Database. For example, a GPSS client that you develop can submit a request to list the tables that reside in a specific Greenplum schema, or to insert data into a specific Greenplum table.

The GPSS Batch Data API service definition follows. Copy/paste the contents to a file named gpss.proto, and note the file system location.

syntax = "proto3";
import "google/protobuf/empty.proto";
import "google/protobuf/struct.proto";
import "google/protobuf/timestamp.proto";

package api;

option java_multiple_files = true;

// Connect service Request message
message ConnectRequest {
  string Host = 1;      // Host address of Greenplum coordinator; must be accessible from gpss server system
  int32 Port = 2;       // Greenplum coordinator port
  string Username = 3;  // User or role name that gpss uses to access Greenplum 
  string Password = 4;  // User password
  string DB = 5;        // Database name
  bool UseSSL = 6;      // Use SSL or not; ignored, use the gpss config file to config SSL
  int32 SessionTimeout = 7;      // Release the session after idle for specified number of seconds
}

// Connect service Response message
message Session {
  string ID = 1;  // Id of client connection to gpss
}

// Operation mode
enum Operation {
  Insert = 0;  // Insert all data into table; behavior of duplicate key or data depends upon the constraints of the target table.
  Merge = 1;   // Insert and Update
  Update = 2;  // Update the value of "UpdateColumns" if "MatchColumns" match
  Read = 3;    // Not supported
}

// Required parameters of the Insert operation
message InsertOption {
  repeated string InsertColumns = 1;    // Names of the target table columns the insert operation should update; used in 'INSERT INTO', useful for partial loading
  bool TruncateTable = 2;               // Truncate table before loading?
  int64 ErrorLimitCount = 4;            // Error limit count; used by external table
  int32 ErrorLimitPercentage = 5;       // Error limit percentage; used by external table
}

// Required parameters of the Update operation
message UpdateOption {
  repeated string MatchColumns = 1;     // Names of the target table columns to compare when determining to update or not
  repeated string UpdateColumns = 2;    // Names of the target table columns to update if MatchColumns match
  string Condition = 3;                 // Optional additional match condition; SQL syntax and used after the 'WHERE' clause
  int64 ErrorLimitCount = 4;            // Error limit count; used by external table
  int32 ErrorLimitPercentage = 5;       // Error limit percentage; used by external table
}

// Required parameters of the Merge operation
// Merge operation creates a session-level temp table in StagingSchema
message MergeOption {
  repeated string InsertColumns = 1;
  repeated string MatchColumns = 2;
  repeated string UpdateColumns = 3;
  string Condition = 4;
  int64 ErrorLimitCount = 5;
  int32 ErrorLimitPercentage = 6;
}

// Open service Request message
message OpenRequest {
  Session Session = 1;      // Session ID returned by Connect
  string SchemaName = 2;    // Name of the Greenplum Database schema
  string TableName = 3;     // Name of the Greenplum Database table
  string PreSQL = 4;        // SQL to execute before gpss loads the data
  string PostSQL = 5;       // SQL to execute after gpss loads the data
  int32 Timeout = 6;        // Time to wait before aborting the operation (seconds); not supported
  string Encoding = 7;      // Encoding of text data; not supported
  string StagingSchema = 8; // Schema in which gpss creates external and temp tables; default is to create these tables in the same schema as the target table

  FormatAvro avro = 9;
  FormatBinary binary = 10;
  FormatCSV csv = 11;
  FormatDelimited delimited = 12;
  FormatJSON json = 13;
  FormatCustom custom = 14;
  string proto = 15;
  }

  oneof Option {            // Identify the type of write operation to perform
    InsertOption InsertOption = 100;
    UpdateOption UpdateOption = 101;
    MergeOption MergeOption = 102;
  }
}

message DBValue {
  oneof DBType {
    int32 Int32Value = 1;
    int64 Int64Value = 2;
    float Float32Value = 5;
    double Float64Value = 6;
    string StringValue = 7;  // Includes types whose values are presented as string but are not a real string type in Greenplum; for example: macaddr, time with time zone, box, etc.
    bytes BytesValue = 8;
    google.protobuf.Timestamp TimeStampValue = 10;  // Time without timezone
    google.protobuf.NullValue NullValue = 11;
    string OtherValue = 12;
  }
}

message Row {
  repeated DBValue Columns = 1;
}

message RowData {    
  bytes Data = 1;     // A single protobuf-encoded Row
}

// Write service Request message
message WriteRequest {
  Session Session = 1;
  repeated RowData Rows = 2;     // The data to load into the target table
}

// Close service Response message
message TransferStats {          // Status of the data load operation
  int64 SuccessCount = 1;        // Number of rows successfully loaded
  int64 ErrorCount = 2;          // Number of error lines if Errorlimit is not reached
  repeated string ErrorRows = 3; // Number of rows with incorrectly-formatted data; not supported
}

// Close service Request message
message CloseRequest {
  Session session = 1;
  int32 MaxErrorRows = 2;        // -1: returns all, 0: nothing, >0: max rows
  bool Abort = 3;
}

// ListSchema service request message
message ListSchemaRequest {
  Session Session = 1;
}

message Schema {
  string Name = 1;
  string Owner = 2;
}

// ListSchema service response message
message Schemas {
  repeated Schema Schemas = 1;
}

// ListTable service request message
message ListTableRequest {
  Session Session = 1;
  string Schema = 2;    // 'public' is the default if no Schema is provided
}

// DescribeTable service request message
message DescribeTableRequest {
  Session Session = 1;
  string SchemaName = 2;
  string TableName = 3;
}

enum RelationType {
  Table = 0;
  View = 1;
  Index = 2;
  Sequence = 3;
  Special = 4;
  Other = 255;
}

message TableInfo {
  string Name = 1;
  RelationType Type = 2;
}

// ListTable service response message
message Tables {
  repeated TableInfo Tables = 1;
}

// DescribeTable service response message
message Columns {
  repeated ColumnInfo Columns = 1;
}

message ColumnInfo {
  string Name = 1;            // Column name
  string DatabaseType = 2;    // Greenplum data type

  bool HasLength = 3;         // Contains length information?
  int64 Length = 4;           // Length if HasLength is true

  bool HasPrecisionScale = 5; // Contains precision or scale information?
  int64 Precision = 6;
  int64 Scale = 7;

  bool HasNullable = 8;       // Contains Nullable constraint?
  bool Nullable = 9;
}

service Gpss {
  // Establish a connection to Greenplum Database; returns a Session object
  rpc Connect(ConnectRequest) returns (Session) {}

  // Disconnect, freeing all resources allocated for a session
  rpc Disconnect(Session) returns (google.protobuf.Empty) {}

  // Prepare and open a table for write
  rpc Open(OpenRequest) returns(google.protobuf.Empty) {}

  // Write data to table
  rpc Write(WriteRequest) returns(google.protobuf.Empty) {}

  // Close a write operation
  rpc Close(CloseRequest) returns(TransferStats) {}

  // List all available schemas in a database
  rpc ListSchema(ListSchemaRequest) returns (Schemas) {}

  // List all tables and views in a schema
  rpc ListTable(ListTableRequest) returns (Tables) {}

  // Decribe table metadata(column name and column type)
  rpc DescribeTable(DescribeTableRequest) returns (Columns) {}
}

// The format of the source data.
// If there is an intermediate column inside Format,
// then the source data will be transformed to the intermediate column.
// If there is no source_column_name in Format,
// then the column name will be the Target table column name,
// and the source column data type will be matched with Target column type.
message SourceDataFormat {
  oneof unit {
    FormatAvro avro = 1;
    FormatBinary binary = 2;
    FormatCSV csv = 3;
    FormatDelimited delimited = 4;
    FormatJSON json = 5;
    FormatCustom custom = 6;
    string protobuf = 7;
  }
}

message FormatAvro {
  string source_column_name = 1; // The source column name
  string schema_url = 2; // If specified, gpss requests the avro schema from url
  bool   bytes_to_base64 = 3; // When true and schema_url is specified, gpss converts bytes field in avro message to base64-encoded string
  bool   ignore_deserialize_error = 4; // When true, gpss ignores avro deserialize errors, and puts data into log error
  string schema_path_on_gpdb = 5; // Used for standalone avro schema; if exists, gpss retrieves the avro schema from the path on every node in the greenplum cluster
  string schema_ca_on_gpdb = 6; // The path to the specified CA certificate file for gpss verifying the peer; the CA file must exist at that path on every greenplum segment
  string schema_cert_on_gpdb = 7; // The path to the specified client certificate file for gpss connecting to HTTPS schema registry; required if the registry's client authentication is enabled
  string schema_key_on_gpdb = 8; // The path to the specified private key file for gpss connecting to HTTPS schema registry; required if the registry's client authentication is enabled
  string schema_min_tls_version = 9; // The minimum transport layer security (TLS) version that gpss requests on the registry connection; the default value is 1.0, and gpss supports minimum TLS versions of 1.0, 1.1, 1.2, and 1.3
}

message FormatBinary {
  string source_column_name = 1; // The source column name
}

message FormatCSV {
  repeated IntermediateColumn columns = 1; // Source column, move to format.Column c1:bin, c2:json ...
  string delimiter = 2;
  string quote = 3;
  string null = 4;
  string escape = 5;
  string force_not_null = 6;
  string newline = 7;
  bool fill_missing_fields = 8;
  bool header = 9;
}

message FormatDelimited {
  repeated IntermediateColumn columns = 1; // The source column names
  string delimiter = 2;
}

message FormatJSON {
  IntermediateColumn column = 1; // The source column name
}

message FormatCustom {
  repeated IntermediateColumn columns = 1;
  string name = 2;
  repeated string options = 3;
}

// IntermediateColumn is an intermediate result after parsing SourceDataFormat,
// IntermediateColumn looks like a virtual table column. It
// will be used to filter or convert types.
// The Source Data is parsed to a table column style data.
//   source column: name and type, the type must be valid.
//   ex: convert a string "123" to 123 integer.
// Caution: the FormatJSON is not a decomposed format, json is treated as an integral type.
message IntermediateColumn {
  string name = 1;
  string type = 2; // Greenplum Database basic data types are supported
}

Data Type Mapping

The GPSS Data API service definition includes messages that represent rows and columns of supported Greenplum Database data types.

Because Greenplum Database supports more data types than protobuf, the GPSS Data API provides a mapping between the types as follows:

gRPC Type Greenplum Type
Int32Value integer, serial
Int64Value bigint, bigserial
Float32Value real
Float64Value double
StringValue text (any kind of data)
BytesValue bytea
TimeStampValue time, timestamp (without time zone)

In the simplest case, all Greenplum data types can be mapped to a string.

check-circle-line exclamation-circle-line close-line
Scroll to top icon