You provide Greenplum Database connection options and the read/write options required by the via generic key-value String pairs.

Connector Options

The Connector supports the following options when defining a Spark DataFrame. An option is required unless otherwise specified.

Option Key Value Description Operation
url The JDBC connection string URL; see Constructing the Greenplum Database JDBC URL. Read, Write
dbschema The name of the Greenplum Database schema in which dbtable resides. Optional, the default schema is the schema named public. Read, Write
dbtable The name of the Greenplum Database table. When reading from Greenplum Database, this table or view must reside in the Greenplum Database schema identified in the dbschema option value. When writing from Spark to Greenplum Database, the Connector creates the table in dbschema if the table does not already exist. Read, Write
driver The fully qualified class path of the custom JDBC driver. Optional, specify only when using a custom JDBC driver. Read, Write
user The Greenplum Database user/role name. Read, Write
password The Greenplum Database password for the user. You can omit the password if Greenplum Database is configured to not require a password for the specified user, or if you use kerberos authentication and provide the required authentication properties in the JDBC connection string URL. Optional. Read, Write
partitionColumn The name of the Greenplum Database table column to use for Spark partitioning. This column must be one of the Greenplum Database data types identified in the Numeric Types PostgreSQL documentation. Optional, the default partition column is the internal Greenplum Database table column named gp_segment_id. Read
partitions The number of Spark partitions. Optional, and valid to specify only when the partitionColumn is not gp_segment_id. The default value is the number of primary segments in the Greenplum Database cluster. Read
gpdb.matchDistributionPolicy The external table distribution policy matching mode. Governs how the Connector sets the distribution policy on the external tables that it creates to write to Spark. The default value is false, the Connector does not match the external table distribution policy with that of the source Greenplum table. Refer to About the External Table Distribution Policy and Data Motion for more information about this setting and the implications of turning it on. Read
truncate The table overwrite mode. Governs the Connector's table creation actions when you specify SaveMode.Overwrite on a write operation and the target Greenplum Database table exists. The default value is false; the Connector drops and then re-creates the target table before it writes any data. When true, the Connector truncates the target table before writing any data. Optional. Write
distributedBy The distribution column(s) of the Greenplum table. Governs the table creation action of the Connector when the target Greenplum Database table does not exist, or when you specify SaveMode.Overwrite on a write operation and truncate is false. The Connector (re)creates the table with random distribution by default. When you provide one or more distributedBy columns, the Connector (re)creates the table with a DISTRIBUTED BY clause that specifies these column names. Optional. Write
iteratorOptimization The write memory optimization mode. Optional. The default value is true; the Connector uses an Iterator to optimize memory. When false, the Connector materializes the data set in memory during the write operation. Write
server.port The gpfdist server process port number, or the environment variable name that identifies such, on the Spark worker node. Optional, by default the Connector selects and uses a random port. Read, Write
server.useHostname Use the Spark worker node host name to specify the gpfdist server address. Optional, the default value is false. Read, Write
server.hostEnv The name of the environment variable whose value identifies the Spark worker node hostname or IP address on which to start the gpfdist server process. Optional. Read, Write
server.nic The name of the Spark worker node network interface on which to start the gpfdist server, or the environment variable name that identifies such. Optional. Read, Write
server.timeout The amount of time, in milliseconds, after which no activity (bytes read or written) on the gpfdist connection triggers a timeout. Optional. The default timeout is 300,000 (5 minutes). Read, Write
pool.hikari.<hikari-property-name> A connection pool property identified in the HikariCP Configuration documentation to configure an aspect of the pool. Read, Write
pool.maxSize The maximum number of connections in the connection pool. Optional, the default value is 64. Read, Write
pool.timeoutMs The amount of time, in milliseconds, after which an inactive connection is considered idle. Optional, the default value is 10,000 (10 seconds). Read, Write
pool.minIdle The minimum number of idle connections maintained in the connection pool. Optional, the default value is 0. Read, Write

You can specify options to the Connector individually or in an options Map. The class methods of interest for setting Connector options are:

.option(key: String, value: String)

for specifying an individual option, and:

.options(options: Map[String, String])

for specifying an options map.

To specify an option individually, provide <option_key> and <value> strings to the DataFrameReader.option() method. For example, to specify the user option:

.option("user", "gpdb_role_name")

To construct a scala.collection.Map comprising more than one option, you provide the <option_key> and <value> strings for each option. For example, to create the options for a read operation:

val gscReadOptionMap = Map(
      "url" -> "jdbc:postgresql://gpdb-master:5432/testdb",
      "user" -> "bill",
      "password" -> "changeme",
      "dbschema" -> "myschema",
      "dbtable" -> "table1",
      "partitionColumn" -> "id"
)

To provide an options map to the Connector, specify it in the DataFrameReader.options() or DataFrameWriter.options() methods. For example, to provide the gscReadOptionMap map created above:

.options(gscReadOptionMap)

Specifying Partition Options

Partition options apply only when you read data from Greenplum Database into Spark.

A Spark DataFrame is a fault-tolerant collection of elements partitioned across the Spark cluster nodes. Spark operates on these elements in parallel.

Greenplum Database distributes its table data across segments running on segment hosts.

The Connector provides two options to configure the mapping between Spark partitions and Greenplum Database segment data, partitionColumn and partitions.

partitionColumn

The partitionColumn option that you specify must be a Numeric Data Type. The partitionColumn you identify must match the case of the Greenplum table column, but need not be the column specified with a DISTRIBUTED BY (<column>) clause when you created the Greenplum Database table.

If you do not provide a partitionColumn, the Connector uses the internal Greenplum Database table column named gp_segment_id by default. If you choose to explicitly set partitionColumn to gp_segment_id, specify this column name in lowercase.

When partitionColumn is gp_segment_id, you cannot set the partitions options.

partitions

By default, the Connector creates one Spark partition per Greenplum Database segment. You can set the partitions option to specify a different number of Spark partitions when partitionColumn does not specify gp_segment_id.

About the External Table Distribution Policy and Data Motion

The Connector uses external tables to transfer data from Greenplum Database to Spark. In the ideal scenario, all Greenplum Database segments are (roughly) equally involved in the data transfer, and there is little to no motion on the Greenplum side before the Connector moves data from Greenplum to Spark.

In the default case, the Connector creates the external tables with random distribution. While this ensures that all Greenplum segments are (roughly) equally involved in the data transfer, it requires data motion on the Greenplum side before the Connector sends the data.

When you instruct the Connector to set the external table distribution policy to match that of the Greenplum Database source table (gpdb.matchDistributionPolicy is set to true), the Connector creates the external tables specifying the distribution columns and distribution policy of that of the source Greenplum table. This minimizes data motion prior to sending the data to Spark.

When gpdb.matchDistributionPolicy is true and the Spark resilient distributed dataset (RDD) is partitioned by the gp_segment_id (the default partitionColumn), the Connector creates Spark partitions with predicates similar to WHERE gp_segment_id = 0. While this scenario avoids a Greenplum redistribute motion, only one segment sends data to that Spark partition, and may result in slower running queries.

If the source Greenplum table has no distribution policy (for example, it is a view), the Connector creates the external tables with random distribution regardless of the gpdb.matchDistributionPolicy setting.

Specifying the gpfdist Server Address Options

Important

When Spark is deployed in Kubernetes, you must use application properties to specify the gpfdist server address. Do not use the per-DataFrame server.* options. Refer to Configuring the Connector When Spark is Deployed in Kubernetes (Beta) for more information.

The Connector uses the Greenplum Database gpfdist protocol to transfer data between Spark and Greenplum Database. Use one or more of the following DataFrame options to specify the address and port number for the gpfdist server running on a Spark worker node:

server.port

You can specify one, a comma-separated list, and/or a range of port numbers, or an environment variable name for the server.port option. To set a port number, identify the specific port number.

For example, to set server.port to a single port number as a single option:

.option("server.port", "12900")

To set server.port to include a range of port numbers within a Map of options:

"server.port" -> "8999-9001, 9004"

When you specify more than one port number, the Connector traverses the list or range in order, and uses the first available port number.

If you choose to specify the port number with an environment variable, prefix the environment variable name with env.. For example, to identify the environment variable named GSC_EXTERNAL_PORT as the server.port option value:

"server.port" -> "env.GSC_EXTERNAL_PORT",

Note: Setting server.port to env.GPFDIST_PORT results in the same behavior as that of Connector version 1.2.0.

server.useHostname

server.useHostname is a boolean option that identifies whether to use the Spark worker host name or the IP address to identify the gpfdist server on that node. When set to true, the Connector uses the Spark worker host name as the address of the gpfdist server.

The default server.useHostname value is false. When false and neither server.hostEnv nor server.nic are set (the default), the Connector uses the Spark worker IP address to address the gpfdist server.

To set server.useHostname as a single option:

"server.useHostname" -> "true"

server.hostEnv

server.hostEnv specifies the name of the environment variable whose value identifies the Spark worker node host name or IP address on which to start the gpfdist server. This environment variable will typically identify the same information as the Spark SPARK_LOCAL_IP environment variable.

The Connector examines the server.hostEnv option only when server.useHostname is set to false.

To set server.hostEnv as a single option:

"server.hostEnv" -> "env.MY_LOCAL_SPARK_IP"

server.nic

server.nic identifies the name of the Spark worker node network interface on which to start the gpfdist server, or the environment variable name that identifies such.

The Connector examines this option only when server.useHostname is false and no server.hostEnv is specified.

To directly set server.nic within a Map of options:

"server.nic" -> "eth1"

Specifying the gpfdist Server Timeout

server.timeout identifies the maximum amount of time in milliseconds that a gpfdist service connection may go without activity (bytes read or written) before the connection is timed out.

The timeout is reset when one or more bytes are read from or written to the connection.

The default server.timeout is 300,000 (5 minutes), and the maximum timeout that you can specify is 2 hours (7,200,000 milliseconds). Set server.timeout to 0 to deactivate the no activity connection timeout.

To directly set server.timeout within a Map of options:

"server.timeout" -> "600000"

Specifying Connection Pool Options

Starting in Connector version 2.1.2, you can directly configure Hikari connection pool options by including property names of the form pool.hikari.<hikari-property-name> when you construct the Connector options. <hikari-property-name> is case-sensitive. Refer to the HikariCP Configuration documentation for more information about the specific connection pool properties that are available for you to set.

The Connector still exposes connection pool configuration options named pool.<property>. The existing connection pool options and their Hikari equivalents follow:

Existing Connection Pool Option Name Hikari Option Name
pool.maxSize pool.hikari.maximumPoolSize
pool.timeoutMs pool.hikari.idleTimeout
pool.minIdle pool.hikari.minimumIdle

The Hikari value takes precedence when you specify both options.

The following examples individually set specific connection pool options:

.option("pool.maxSize", "50")
.option("pool.hikari.minimumIdle", "20")
.option("pool.hikari.autoCommit", "false")

The first DataFrame that you create with a specific connection string URL, username, and password combination defines the configuration of that connection pool. The Connector ignores connection pool options specified on subsequent DataFrames created with the same URL/username/password combination.

Refer to JDBC Connection Pooling for additional information about connection pool configuration options.

check-circle-line exclamation-circle-line close-line
Scroll to top icon