The Connector uses the Greenplum Database gpfdist
parallel file server to transfer data between Spark and Greenplum Database. When you run a Spark application that uses the Connector, the Connector starts a gpfdist
server process on each Spark worker node.
By default, the Connector starts the gpfdist
server process using the IP address of the Spark worker node and allows the operating system to select a random port for the server.
The Connector's default gpfdist
server addressing behavior may not meet your needs if the hosts in your Spark cluster are configured with multiple network interfaces, or when the Spark cluster is deployed in Kubernetes.
NoteUse these
DataFrame
configuration options when Spark is running on-premise or in the cloud. The Connector does not support setting these options when Spark is deployed in Kubernetes.
The Connector exposes the following DataFrame
options to set the gpfdist
server address:
Option Name | Description |
---|---|
server.port |
The gpfdist server port number. You can specify any combination of one or more: single port number, comma-separated list of port numbers, or range of port numbers. |
server.useHostname |
Use the Spark worker host name instead of the IP Address. |
server.hostEnv |
The name of an environment variable whose value identifies the hostname or IP address of the Spark worker on which to start the gpfdist server. |
server.nic |
The network interface on which to start the gpfdist server. |
You set these options in your code before you read or write the DataFrame
as described in Specifying the gpfdist Server Address Options.
The Connector allows you to set each of the port, host, and interface via an environment variable name of your choosing. When you set an option via an environment variable, you can configure a different value for each Spark worker node. Setting the per-DataFrame
Spark worker gpfdist
address options in your Spark application is described in detail in Specifying the gpfdist Server Address Options.
If you choose to specify an option via an environment variable, set the environment variable on each Spark worker node before you run the start-slave.sh
command on that node. For example, if you set server.port
to the environment variable named GSC_EXTERNAL_PORT
, and server.nic
to the environment variable named GSC_NIF
, you would start the Spark worker as follows:
user@spark-worker$ GSC_EXTERNAL_PORT="12900" GSC_NIF="eth1" start-slave.sh
You may also choose to set an environment variable in your spark-env.sh
file. For information about the Spark spark-env.sh
file, refer to the Environment Variables section of the Spark Configuration documentation.
In the default configuration, the Spark executor location that the Connector specifies in the external table LOCATION
clause is not routable from the Greenplum Database segments when Spark is deployed in Kubernetes; you must to explicitly configure the Connector.
An Ingress manages access to services in a Kubernetes cluster, exposing routes from outside the cluster to services running in the cluster, and uses rules to control the routes. The Connector requires that you create an Ingress
object ahead of time, it does not create the Ingress for you.
The Connector GpfdistIngressListener
service adds routes (paths) to the existing Ingress, and removes these routes when the application ends. You configure the behavior of the GpfdistIngressListener
via the Spark application properties described in the table below.
NoteUse these application properties to configure the Connector when Spark is deployed in Kubernetes. The Connector does not support specifying these properties for other types of Spark deployments.
Property Name | Default Value | Description |
---|---|---|
spark.extraListeners | (Required) The class name of the Connector Ingress listener - org.greenplum.GpfdistIngressListener . |
|
spark.greenplum.k8s.ingress.name | (Required) The name of an existing Ingress object in which the Connector adds rules. |
|
spark.greenplum.k8s.ingress.remove-paths | true | Specifies whether the Connector should remove added paths from the Ingress at the end of the application. |
spark.greenplum.k8s.ingress.use-loadbalancer-ip | false | Specifies whether the Connector updates spark.greenplum.gpfdist.host to the IP address of the load balancer associated with the Ingress. Note: If spark.greenplum.gpfdist.host is unset, this option must be set to true . |
spark.greenplum.k8s.ingress.update-location-port | false | Specifies whether the Connector automatically sets spark.greenplum.gpfdist.location-port to 80 or 443 (depending on the TLS configuration of the Ingress). Note: If spark.greenplum.gpfdist.location-port is unset and Spark is deployed in Kubernetes, this option must be set to true . |
spark.greenplum.gpfdist.host | The host to specify in the external table LOCATION clause. When the Ingress is configured to terminate TLS and the Connector is configured to use gpfdists:// (gpfdist.is-ssl is set to true ), the Connector uses the value of spark.greenplum.gpfdist.host to identify the Ingress rule in which it will add routes. |
|
spark.greenplum.gpfdist.listen-port | 0 | The port that the embedded Jetty HTTP server listens on. A 0 setting instructs the server to listen on a random available port. You must set the listen port to a positive value. |
spark.greenplum.gpfdist.location-port | unset | The port to specify in the external table LOCATION clause. If this is not set (the default), the Connector uses the port that the embedded Jetty HTTP server is listening on. Because an Ingress does not expose arbitrary ports, if you do choose to set this, you must set it to either 80 or 443 . |
spark.greenplum.gpfdist.is-ssl | false | Determines whether the Connector specifies gpfdist:// (false ) or gpfdists:// (true ) for the Greenplum Database external table protocol. Note: If this option is set to true , you must also provide a value for spark.greenplum.gpfdist.host . |
You must set the required and desired properties when you submit the Spark application. You may use one of the following methods to set the properties:
$SPARK_HOME/conf/spark-defaults.conf
, OR--conf <key>=<value>
to spark-submit
, OR--properties-file <path-to-file>
to spark-submit
.Example entries in a property file (option 3) named gsc_k8s_props.conf
located in the current directory :
spark.extraListeners org.greenplum.GpfdistIngressListener
spark.greenplum.k8.ingress.name my-gsc-ingress
spark.greenplum.gpfdist.host 192.168.49.2
spark.greenplum.gpfdist.listen-port 6155
spark.greenplum.gpfdist.location-port 80
Example spark-submit
command:
$ spark-submit --properties-file ./gsc_k8s_props.conf --jars greenplum-spark_2.12-2.2.0.jar ...
An Ingress named example-ingress
is defined with the following configuration:
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: example-ingress
annotations:
nginx.ingress.kubernetes.io/ssl-redirect: "false"
nginx.ingress.kubernetes.io/force-ssl-redirect: "false"
spec:
ingressClassName: nginx
# Because k8s does not allow creating empty ingresses, we provide this dummy
# rule. Additional rules will be added when new projects are created.
# issue: https://github.com/kubernetes/kubernetes/issues/82203
rules:
- http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: defaultbackend
port:
number: 8080
To use the Connector with this Ingress configuration, submit your Spark application with the following property settings:
spark.extraListeners org.greenplum.GpfdistIngressListener
spark.greenplum.k8s.ingress.name example-ingress
spark.greenplum.gpfdist.host 172.16.0.2
spark.greenplum.gpfdist.listen-port 6155
spark.greenplum.gpfdist.location-port 80
spark.greenplum.gpfdist.is-ssl false
Alternatively, you can submit the non SSL Spark application with these property settings.
Note:
spark.greenplum.gpfdist.host
is not set,spark.greenplum.k8s.ingress.use-loadbalancer-ip
must be set totrue
.spark.greenplum.location-port
is not set,spark.greenplum.k8s.ingress.update-location-port
must be set totrue
.
spark.extraListeners=org.greenplum.GpfdistIngressListener
spark.greenplum.k8s.ingress.name=example-ingress
spark.greenplum.k8s.ingress.use-loadbalancer-ip=true
spark.greenplum.k8s.ingress.update-location-port=true
spark.greenplum.gpfdist.listen-port=6155
spark.greenplum.gpfdist.is-ssl false
When the Ingress is configured to use TLS, the secure connection is between the Greenplum Database segments (gpfdists
) and the Ingress' load balancer located at the edge of the Kubernetes cluster.
Before you create a TLS-secured Ingress, you must:
Create a client certificate and key for Greenplum Database gpfdists
for TLS. Refer to gpfdists TLS Configuration for the configuration information.
Create a certificate and key for the Ingress.
Both the gpfdists
certificate and key and the Ingress certificate and key must be trusted by the same authority.
Create a TLS secret in Kubernetes based on the Ingress certificate and key. Refer to the TLS secrets Kubernetes documentation for more information.
Example command to generate a Kubernetes TLS secret named gsc-k8s-secret-tls
:
$ kubectl --namespace "<name>" create secret tls "gsc-k8s-secret-tls" --key "<path_to>/<key>" --cert "<path_to>/<cert>"
A TLS-terminated Ingress
named tls-example-ingress
is defined with the following configuration:
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: tls-example-ingress
spec:
ingressClassName: nginx
# Because k8s does not allow creating empty ingresses, we provide this dummy
# rule. Additional rules will be added when new projects are created.
# issue: https://github.com/kubernetes/kubernetes/issues/82203
rules:
- host: https-example.foo.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: defaultbackend
port:
number: 8080
tls:
- hosts:
- https-example.foo.com
secretName: gsc-k8s-secret-tls
To use the Connector with this Ingress configuration, submit your Spark application with the following property settings.
Note: You must provide a value for
spark.greenplum.gpfdist.host
.
spark.extraListeners org.greenplum.GpfdistIngressListener
spark.greenplum.k8s.ingress.name tls-example-ingress
spark.greenplum.gpfdist.host https-example.foo.com
spark.greenplum.gpfdist.listen-port 6155
spark.greenplum.gpfdist.location-port 443
spark.greenplum.gpfdist.is-ssl true
Alternatively, you can submit the Spark application with these property settings.
Note: You must provide a value for
spark.greenplum.gpfdist.host
.spark.greenplum.location-port
is not set,spark.greenplum.k8s.ingress.update-location-port
must be set totrue
.
spark.extraListeners org.greenplum.GpfdistIngressListener
spark.greenplum.k8s.ingress.name tls-example-ingress
spark.greenplum.gpfdist.host https-example.foo.com
spark.greenplum.gpfdist.listen-port 6155
spark.greenplum.k8s.ingress.update-location-port true
spark.greenplum.gpfdist.is-ssl true
The role that runs a Spark application using the Connector must be assigned this minimum Kubernetes object privilege set:
Kubernetes Object (resource) | Minimum Permission Set (verb) |
---|---|
Pod (pods ) |
create , get , watch , list , delete |
Service (services ) |
create , list , delete |
Ingress (located in apiGroup networking.k8s.io ) (ingresses ) |
create , get , patch |
ConfigMap (configmaps ) |
create , list , delete |
Greenplum Database uses the gpfdist
protocol to communicate with the gpfdist
server on each Spark worker node. The Connector exposes the server.timeout
DataFrame
option to specify the "no activity" timeout for gpfdist
server connections from Greenplum. The Connector defines activity as one or more bytes read from, or written to, the connection. The gpfdist
server on the Spark node throws a timeout exception when the server.timeout
period elapses without activity from Greenplum.
The default server.timeout
is 300,000 milliseconds (5 minutes).
You set the server.timeout
option in your code as described in Specifying the gpfdist Server Timeout.