You provide Greenplum Database connection options and the read/write options required by the via generic key-value String pairs.

Connector Options

The Connector supports the options identified in the table below. An option is required unless otherwise specified.

Option Key Value Description Operation
url The JDBC connection string URL; see Constructing the Greenplum Database JDBC URL. Read, Write
dbschema The name of the Greenplum Database schema in which dbtable resides. This option also identifies the name of the schema in which the Connector creates temporary external tables. Optional, the default schema is the schema named public. Read, Write
dbtable The name of the Greenplum Database table. When reading from Greenplum Database, this table or view must reside in the Greenplum Database schema identified in the dbschema option value. When writing from Spark to Greenplum Database, the Connector creates the table in dbschema if the table does not already exist. Read, Write
driver The fully qualified class path of the custom JDBC driver. Optional, specify only when using a custom JDBC driver. Read, Write
user The Greenplum Database user/role name. Read, Write
password The Greenplum Database password for the user. You can omit the password if Greenplum Database is configured to not require a password for the specified user, or if you use kerberos authentication and provide the required authentication properties in the JDBC connection string URL. Optional. Read, Write
partitionColumn The name of the Greenplum Database table column to use for Spark partitioning. This column must be one of the Greenplum Database data types integer, bigint, serial, or bigserial. Optional, the default partition column is the internal Greenplum Database table column named gp_segment_id. Read
partitions The number of Spark partitions. Optional, and valid to specify only when the partitionColumn is not gp_segment_id. The default value is the number of primary segments in the Greenplum Database cluster. Read
truncate The table overwrite mode. Governs the Connector's table creation actions when you specify SaveMode.Overwrite on a write operation and the target Greenplum Database table exists. The default value is false; the Connector drops and then re-creates the target table before it writes any data. When true, the Connector truncates the target table before writing any data. Optional. Write
distributedBy The distribution column(s) of the Greenplum table. Governs the table creation action of the Connector when the target Greenplum Database table does not exist, or when you specify SaveMode.Overwrite on a write operation and truncate is false. The Connector (re)creates the table with random distribution by default. When you provide one or more distributedBy columns, the Connector (re)creates the table with a DISTRIBUTED BY clause that specifies these column names. Optional. Write
iteratorOptimization The write memory optimization mode. Optional. The default value is true; the Connector uses an Iterator to optimize memory. When false, the Connector materializes the data set in memory during the write operation. Write
server.port The gpfdist server process port number, or the environment variable name that identifies such, on the Spark worker node. Optional, by default the Connector selects and uses a random port. Read, Write
server.useHostname Use the Spark worker node host name to specify the gpfdist server address. Optional, the default value is false. Read, Write
server.hostEnv The name of the environment variable whose value identifies the Spark worker node hostname or IP address on which to start the gpfdist server process. Optional. Read, Write
server.nic The name of the Spark worker node network interface on which to start the gpfdist server, or the environment variable name that identifies such. Optional. Read, Write
server.timeout The amount of time, in milliseconds, after which no activity (bytes read or written) on the gpfdist connection triggers a timeout. Optional. The default timeout is 300,000 (5 minutes). Read, Write
pool.maxSize The maximum number of connections in the connection pool. Optional, the default value is 64. Read, Write
pool.timeoutMs The amount of time, in milliseconds, after which an inactive connection is considered idle. Optional, the default value is 10,000 (10 seconds). Read, Write
pool.minIdle The minimum number of idle connections maintained in the connection pool. Optional, the default value is 0. Read, Write

You can specify options to the Connector individually or in an options Map. The class methods of interest for setting Connector options are:

.option(key: String, value: String)

for specifying an individual option, and:

.options(options: Map[String, String])

for specifying an options map.

To specify an option individually, provide <option_key> and <value> strings to the DataFrameReader.option() method. For example, to specify the user option:

.option("user", "gpdb_role_name")

To construct a scala.collection.Map comprising more than one option, you provide the <option_key> and <value> strings for each option. For example, to create the options for a read operation:

val gscReadOptionMap = Map(
      "url" -> "jdbc:postgresql://gpdb-master:5432/testdb",
      "user" -> "bill",
      "password" -> "changeme",
      "dbschema" -> "myschema",
      "dbtable" -> "table1",
      "partitionColumn" -> "id"
)

To provide an options map to the Connector, specify it in the DataFrameReader.options() or DataFrameWriter.options() methods. For example, to provide the gscReadOptionMap map created above:

.options(gscReadOptionMap)

Specifying Partition Options

Partition options apply only when you read data from Greenplum Database into Spark.

A Spark DataFrame is a fault-tolerant collection of elements partitioned across the Spark cluster nodes. Spark operates on these elements in parallel.

Greenplum Database distributes its table data across segments running on segment hosts.

The Connector provides two options to configure the mapping between Spark partitions and Greenplum Database segment data, partitionColumn and partitions.

partitionColumn

The partitionColumn option that you specify must have the integer, bigint, serial, or bigserial Greenplum Database data type. The partitionColumn you identify must match the case of the Greenplum table column, but need not be the column specified with a DISTRIBUTED BY (<column>) clause when you created the Greenplum Database table.

If you do not provide a partitionColumn, the Connector uses the internal Greenplum Database table column named gp_segment_id by default. If you choose to explicitly set partitionColumn to gp_segment_id, specify this column name in lowercase.

When partitionColumn is gp_segment_id, you cannot set the partitions options.

partitions

By default, the Connector creates one Spark partition per Greenplum Database segment. You can set the partitions option to specify a different number of Spark partitions when partitionColumn does not specify gp_segment_id.

Specifying the Connector Server Options

The Connector uses the Greenplum Database gpfdist protocol to transfer data between Spark and Greenplum Database.

Use one or more of the following options to specify the address and port number, or the no activity connection timeout, for the gpfdist server running on a Spark worker node:

Refer to Configuring the Connector Server Address for additional information about these options.

server.port

You can specify one, a comma-separated list, and/or a range of port numbers, or an environment variable name for the server.port option. To set a port number, identify the specific port number.

For example, to set server.port to a single port number as a single option:

.option("server.port", "12900")

To set server.port to include a range of port numbers within a Map of options:

"server.port" -> "8999-9001, 9004"

When you specify more than one port number, the Connector traverses the list or range in order, and uses the first available port number.

If you choose to specify the port number with an environment variable, prefix the environment variable name with env.. For example, to identify the environment variable named GSC_EXTERNAL_PORT as the server.port option value:

"server.port" -> "env.GSC_EXTERNAL_PORT",

Note: Setting server.port to env.GPFDIST_PORT results in the same behavior as that of Connector version 1.2.0.

server.useHostname

server.useHostname is a boolean option that identifies whether to use the Spark worker host name or the IP address to identify the gpfdist server on that node. When set to true, the Connector uses the Spark worker host name as the address of the gpfdist server.

The default server.useHostname value is false. When false and neither server.hostEnv nor server.nic are set (the default), the Connector uses the Spark worker IP address to address the gpfdist server.

To set server.useHostname as a single option:

"server.useHostname" -> "true"

server.hostEnv

server.hostEnv specifies the name of the environment variable whose value identifies the Spark worker node host name or IP address on which to start the gpfdist server. This environment variable will typically identify the same information as the Spark SPARK_LOCAL_IP environment variable.

The Connector examines the server.hostEnv option only when server.useHostname is set to false.

To set server.hostEnv as a single option:

"server.hostEnv" -> "env.MY_LOCAL_SPARK_IP"

server.nic

server.nic identifies the name of the Spark worker node network interface on which to start the gpfdist server, or the environment variable name that identifies such.

The Connector examines this option only when server.useHostname is false and no server.hostEnv is specified.

To directly set server.nic within a Map of options:

"server.nic" -> "eth1"

server.timeout

server.timeout identifies the maximum amount of time in milliseconds that a gpfdist service connection may go without activity (bytes read or written) before the connection is timed out.

The timeout is reset when one or more bytes are read from or written to the connection.

The default server.timeout is 300,000 (5 minutes), and the maximum timeout that you can specify is 2 hours (7,200,000 milliseconds). Set server.timeout to 0 to disable the no activity connection timeout.

Specifying Connection Pool Options

The Connector provides connection pool configuration options. These options are named with the pool. prefix:

Option Key Value Description
pool.maxSize The maximum number of connections in the connection pool. Optional, the default value is 64.
pool.timeoutMs The amount of time, in milliseconds, after which an inactive connection is considered idle. Optional, the default value is 10,000 (10 seconds).
pool.minIdle The minimum number of idle connections maintained in the connection pool. Optional, the default value is 0.

To set each connection pool option as a single option:

.option("pool.maxSize", "50")
.option("pool.minIdle", "5")
.option("pool.timeoutMs", "7500")

The first DataFrame that you create with a specific connection string URL, username, and password combination defines the configuration of that connection pool. The Connector ignores connection pool options specified on subsequent DataFrames created with the same URL/username/password combination.

Refer to JDBC Connection Pooling for additional information about connection pool configuration options.

check-circle-line exclamation-circle-line close-line
Scroll to top icon