You provide Greenplum Database connection options and the read/write options required by the via generic key-value String
pairs.
The Connector supports the options identified in the table below. An option is required unless otherwise specified.
Option Key | Value Description | Operation |
---|---|---|
url | The JDBC connection string URL; see Constructing the Greenplum Database JDBC URL. | Read, Write |
dbschema | The name of the Greenplum Database schema in which dbtable resides. This option also identifies the name of the schema in which the Connector creates temporary external tables. Optional, the default schema is the schema named public . |
Read, Write |
dbtable | The name of the Greenplum Database table. When reading from Greenplum Database, this table or view must reside in the Greenplum Database schema identified in the dbschema option value. When writing from Spark to Greenplum Database, the Connector creates the table in dbschema if the table does not already exist. |
Read, Write |
driver | The fully qualified class path of the custom JDBC driver. Optional, specify only when using a custom JDBC driver. | Read, Write |
user | The Greenplum Database user/role name. | Read, Write |
password | The Greenplum Database password for the user. You can omit the password if Greenplum Database is configured to not require a password for the specified user, or if you use kerberos authentication and provide the required authentication properties in the JDBC connection string URL. Optional. | Read, Write |
partitionColumn | The name of the Greenplum Database table column to use for Spark partitioning. This column must be one of the Greenplum Database data types integer , bigint , serial , or bigserial . Optional, the default partition column is the internal Greenplum Database table column named gp_segment_id . |
Read |
partitions | The number of Spark partitions. Optional, and valid to specify only when the partitionColumn is not gp_segment_id . The default value is the number of primary segments in the Greenplum Database cluster. |
Read |
truncate | The table overwrite mode. Governs the Connector's table creation actions when you specify SaveMode.Overwrite on a write operation and the target Greenplum Database table exists. The default value is false ; the Connector drops and then re-creates the target table before it writes any data. When true , the Connector truncates the target table before writing any data. Optional. |
Write |
distributedBy | The distribution column(s) of the Greenplum table. Governs the table creation action of the Connector when the target Greenplum Database table does not exist, or when you specify SaveMode.Overwrite on a write operation and truncate is false . The Connector (re)creates the table with random distribution by default. When you provide one or more distributedBy columns, the Connector (re)creates the table with a DISTRIBUTED BY clause that specifies these column names. Optional. |
Write |
iteratorOptimization | The write memory optimization mode. Optional. The default value is true ; the Connector uses an Iterator to optimize memory. When false , the Connector materializes the data set in memory during the write operation. |
Write |
server.port | The gpfdist server process port number, or the environment variable name that identifies such, on the Spark worker node. Optional, by default the Connector selects and uses a random port. |
Read, Write |
server.useHostname | Use the Spark worker node host name to specify the gpfdist server address. Optional, the default value is false . |
Read, Write |
server.hostEnv | The name of the environment variable whose value identifies the Spark worker node hostname or IP address on which to start the gpfdist server process. Optional. |
Read, Write |
server.nic | The name of the Spark worker node network interface on which to start the gpfdist server, or the environment variable name that identifies such. Optional. |
Read, Write |
server.timeout | The amount of time, in milliseconds, after which no activity (bytes read or written) on the gpfdist connection triggers a timeout. Optional. The default timeout is 300,000 (5 minutes). |
Read, Write |
pool.maxSize | The maximum number of connections in the connection pool. Optional, the default value is 64. | Read, Write |
pool.timeoutMs | The amount of time, in milliseconds, after which an inactive connection is considered idle. Optional, the default value is 10,000 (10 seconds). | Read, Write |
pool.minIdle | The minimum number of idle connections maintained in the connection pool. Optional, the default value is 0. | Read, Write |
You can specify options to the Connector individually or in an options Map
. The class methods of interest for setting Connector options are:
.option(key: String, value: String)
for specifying an individual option, and:
.options(options: Map[String, String])
for specifying an options map.
To specify an option individually, provide <option_key> and <value> strings to the DataFrameReader.option()
method. For example, to specify the user
option:
.option("user", "gpdb_role_name")
To construct a scala.collection.Map
comprising more than one option, you provide the <option_key> and <value> strings for each option. For example, to create the options for a read operation:
val gscReadOptionMap = Map(
"url" -> "jdbc:postgresql://gpdb-master:5432/testdb",
"user" -> "bill",
"password" -> "changeme",
"dbschema" -> "myschema",
"dbtable" -> "table1",
"partitionColumn" -> "id"
)
To provide an options map to the Connector, specify it in the DataFrameReader.options()
or DataFrameWriter.options()
methods. For example, to provide the gscReadOptionMap
map created above:
.options(gscReadOptionMap)
Partition options apply only when you read data from Greenplum Database into Spark.
A Spark DataFrame
is a fault-tolerant collection of elements partitioned across the Spark cluster nodes. Spark operates on these elements in parallel.
Greenplum Database distributes its table data across segments running on segment hosts.
The Connector provides two options to configure the mapping between Spark partitions and Greenplum Database segment data, partitionColumn
and partitions
.
The partitionColumn
option that you specify must have the integer
, bigint
, serial
, or bigserial
Greenplum Database data type. The partitionColumn
you identify must match the case of the Greenplum table column, but need not be the column specified with a DISTRIBUTED BY (<column>)
clause when you created the Greenplum Database table.
If you do not provide a partitionColumn
, the Connector uses the internal Greenplum Database table column named gp_segment_id
by default. If you choose to explicitly set partitionColumn
to gp_segment_id
, specify this column name in lowercase.
When partitionColumn
is gp_segment_id
, you cannot set the partitions
options.
By default, the Connector creates one Spark partition per Greenplum Database segment. You can set the partitions
option to specify a different number of Spark partitions when partitionColumn
does not specify gp_segment_id
.
The Connector uses the Greenplum Database gpfdist
protocol to transfer data between Spark and Greenplum Database.
Use one or more of the following options to specify the address and port number, or the no activity connection timeout, for the gpfdist
server running on a Spark worker node:
Refer to Configuring the Connector Server Address for additional information about these options.
You can specify one, a comma-separated list, and/or a range of port numbers, or an environment variable name for the server.port
option. To set a port number, identify the specific port number.
For example, to set server.port
to a single port number as a single option:
.option("server.port", "12900")
To set server.port
to include a range of port numbers within a Map
of options:
"server.port" -> "8999-9001, 9004"
When you specify more than one port number, the Connector traverses the list or range in order, and uses the first available port number.
If you choose to specify the port number with an environment variable, prefix the environment variable name with env.
. For example, to identify the environment variable named GSC_EXTERNAL_PORT
as the server.port
option value:
"server.port" -> "env.GSC_EXTERNAL_PORT",
Note: Setting server.port
to env.GPFDIST_PORT
results in the same behavior as that of Connector version 1.2.0.
server.useHostname
is a boolean option that identifies whether to use the Spark worker host name or the IP address to identify the gpfdist
server on that node. When set to true
, the Connector uses the Spark worker host name as the address of the gpfdist
server.
The default server.useHostname
value is false
. When false
and neither server.hostEnv
nor server.nic
are set (the default), the Connector uses the Spark worker IP address to address the gpfdist
server.
To set server.useHostname
as a single option:
"server.useHostname" -> "true"
server.hostEnv
specifies the name of the environment variable whose value identifies the Spark worker node host name or IP address on which to start the gpfdist
server. This environment variable will typically identify the same information as the Spark SPARK_LOCAL_IP
environment variable.
The Connector examines the server.hostEnv
option only when server.useHostname
is set to false
.
To set server.hostEnv
as a single option:
"server.hostEnv" -> "env.MY_LOCAL_SPARK_IP"
server.nic
identifies the name of the Spark worker node network interface on which to start the gpfdist
server, or the environment variable name that identifies such.
The Connector examines this option only when server.useHostname
is false
and no server.hostEnv
is specified.
To directly set server.nic
within a Map
of options:
"server.nic" -> "eth1"
server.timeout
identifies the maximum amount of time in milliseconds that a gpfdist
service connection may go without activity (bytes read or written) before the connection is timed out.
The timeout is reset when one or more bytes are read from or written to the connection.
The default server.timeout
is 300,000 (5 minutes), and the maximum timeout that you can specify is 2 hours (7,200,000 milliseconds). Set server.timeout
to 0 to disable the no activity connection timeout.
The Connector provides connection pool configuration options. These options are named with the pool.
prefix:
Option Key | Value Description |
---|---|
pool.maxSize | The maximum number of connections in the connection pool. Optional, the default value is 64. |
pool.timeoutMs | The amount of time, in milliseconds, after which an inactive connection is considered idle. Optional, the default value is 10,000 (10 seconds). |
pool.minIdle | The minimum number of idle connections maintained in the connection pool. Optional, the default value is 0. |
To set each connection pool option as a single option:
.option("pool.maxSize", "50")
.option("pool.minIdle", "5")
.option("pool.timeoutMs", "7500")
The first DataFrame
that you create with a specific connection string URL, username, and password combination defines the configuration of that connection pool. The Connector ignores connection pool options specified on subsequent DataFrame
s created with the same URL/username/password combination.
Refer to JDBC Connection Pooling for additional information about connection pool configuration options.