The Connector uses the Greenplum Database gpfdist parallel file server to transfer data between Spark and Greenplum Database. When you run a Spark application that uses the Connector, the Connector starts a gpfdist server process on each Spark worker node.

By default, the Connector starts the gpfdist server process using the IP address of the Spark worker node and allows the operating system to select a random port for the server.

The Connector's default gpfdist server addressing behavior may not meet your needs if the hosts in your Spark cluster are configured with multiple network interfaces, or when the Spark cluster is deployed in Kubernetes.

Configuring the gpfdist Server Address for Multi-Homed Systems

Note

Use these DataFrame configuration options when Spark is running on-premise or in the cloud. The Connector does not support setting these options when Spark is deployed in Kubernetes.

The Connector exposes the following DataFrame options to set the gpfdist server address:

Option Name Description
server.port The gpfdist server port number. You can specify any combination of one or more: single port number, comma-separated list of port numbers, or range of port numbers.
server.useHostname Use the Spark worker host name instead of the IP Address.
server.hostEnv The name of an environment variable whose value identifies the hostname or IP address of the Spark worker on which to start the gpfdist server.
server.nic The network interface on which to start the gpfdist server.

You set these options in your code before you read or write the DataFrame as described in Specifying the gpfdist Server Address Options.

Per-Spark-Worker Configuration Using Environment Variables

The Connector allows you to set each of the port, host, and interface via an environment variable name of your choosing. When you set an option via an environment variable, you can configure a different value for each Spark worker node. Setting the per-DataFrame Spark worker gpfdist address options in your Spark application is described in detail in Specifying the gpfdist Server Address Options.

If you choose to specify an option via an environment variable, set the environment variable on each Spark worker node before you run the start-slave.sh command on that node. For example, if you set server.port to the environment variable named GSC_EXTERNAL_PORT, and server.nic to the environment variable named GSC_NIF, you would start the Spark worker as follows:

user@spark-worker$ GSC_EXTERNAL_PORT="12900" GSC_NIF="eth1" start-slave.sh

You may also choose to set an environment variable in your spark-env.sh file. For information about the Spark spark-env.sh file, refer to the Environment Variables section of the Spark Configuration documentation.

Configuring the Connector When Spark is Deployed in Kubernetes (Beta)

In the default configuration, the Spark executor location that the Connector specifies in the external table LOCATION clause is not routable from the Greenplum Database segments when Spark is deployed in Kubernetes; you must to explicitly configure the Connector.

An Ingress manages access to services in a Kubernetes cluster, exposing routes from outside the cluster to services running in the cluster, and uses rules to control the routes. The Connector requires that you create an Ingress object ahead of time, it does not create the Ingress for you.

The Connector GpfdistIngressListener service adds routes (paths) to the existing Ingress, and removes these routes when the application ends. You configure the behavior of the GpfdistIngressListener via the Spark application properties described in the table below.

Note

Use these application properties to configure the Connector when Spark is deployed in Kubernetes. The Connector does not support specifying these properties for other types of Spark deployments.

Property Name Default Value Description
spark.extraListeners (Required) The class name of the Connector Ingress listener - org.greenplum.GpfdistIngressListener.
spark.greenplum.k8s.ingress.name (Required) The name of an existing Ingress object in which the Connector adds rules.
spark.greenplum.k8s.ingress.remove-paths true Specifies whether the Connector should remove added paths from the Ingress at the end of the application.
spark.greenplum.k8s.ingress.use-loadbalancer-ip false Specifies whether the Connector updates spark.greenplum.gpfdist.host to the IP address of the load balancer associated with the Ingress. Note: If spark.greenplum.gpfdist.host is unset, this option must be set to true.
spark.greenplum.k8s.ingress.update-location-port false Specifies whether the Connector automatically sets spark.greenplum.gpfdist.location-port to 80 or 443 (depending on the TLS configuration of the Ingress). Note: If spark.greenplum.gpfdist.location-port is unset and Spark is deployed in Kubernetes, this option must be set to true.
spark.greenplum.gpfdist.host The host to specify in the external table LOCATION clause. When the Ingress is configured to terminate TLS and the Connector is configured to use gpfdists:// (gpfdist.is-ssl is set to true), the Connector uses the value of spark.greenplum.gpfdist.host to identify the Ingress rule in which it will add routes.
spark.greenplum.gpfdist.listen-port 0 The port that the embedded Jetty HTTP server listens on. A 0 setting instructs the server to listen on a random available port. You must set the listen port to a positive value.
spark.greenplum.gpfdist.location-port unset The port to specify in the external table LOCATION clause. If this is not set (the default), the Connector uses the port that the embedded Jetty HTTP server is listening on. Because an Ingress does not expose arbitrary ports, if you do choose to set this, you must set it to either 80 or 443.
spark.greenplum.gpfdist.is-ssl false Determines whether the Connector specifies gpfdist:// (false) or gpfdists:// (true) for the Greenplum Database external table protocol. Note: If this option is set to true, you must also provide a value for spark.greenplum.gpfdist.host.

You must set the required and desired properties when you submit the Spark application. You may use one of the following methods to set the properties:

  1. Editing $SPARK_HOME/conf/spark-defaults.conf, OR
  2. Passing --conf <key>=<value> to spark-submit, OR
  3. Adding key-value pairs to a properties file and passing --properties-file <path-to-file> to spark-submit.

Example entries in a property file (option 3) named gsc_k8s_props.conf located in the current directory :

spark.extraListeners org.greenplum.GpfdistIngressListener
spark.greenplum.k8.ingress.name  my-gsc-ingress
spark.greenplum.gpfdist.host  192.168.49.2
spark.greenplum.gpfdist.listen-port  6155
spark.greenplum.gpfdist.location-port  80

Example spark-submit command:

$ spark-submit --properties-file ./gsc_k8s_props.conf --jars greenplum-spark_2.12-2.2.0.jar ...

Example Configuration (non TLS)

An Ingress named example-ingress is defined with the following configuration:

---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: example-ingress
  annotations:
    nginx.ingress.kubernetes.io/ssl-redirect: "false"
    nginx.ingress.kubernetes.io/force-ssl-redirect: "false"
spec:
  ingressClassName: nginx
  # Because k8s does not allow creating empty ingresses, we provide this dummy
  # rule. Additional rules will be added when new projects are created.
  # issue: https://github.com/kubernetes/kubernetes/issues/82203
  rules:
  - http:
      paths:
      - path: /
        pathType: Prefix
        backend:
          service:
            name: defaultbackend
            port:
              number: 8080

To use the Connector with this Ingress configuration, submit your Spark application with the following property settings:

spark.extraListeners org.greenplum.GpfdistIngressListener
spark.greenplum.k8s.ingress.name example-ingress
spark.greenplum.gpfdist.host 172.16.0.2
spark.greenplum.gpfdist.listen-port 6155
spark.greenplum.gpfdist.location-port 80
spark.greenplum.gpfdist.is-ssl false

Alternatively, you can submit the non SSL Spark application with these property settings.

Note: spark.greenplum.gpfdist.host is not set, spark.greenplum.k8s.ingress.use-loadbalancer-ip must be set to true. spark.greenplum.location-port is not set, spark.greenplum.k8s.ingress.update-location-port must be set to true.

spark.extraListeners=org.greenplum.GpfdistIngressListener
spark.greenplum.k8s.ingress.name=example-ingress
spark.greenplum.k8s.ingress.use-loadbalancer-ip=true
spark.greenplum.k8s.ingress.update-location-port=true
spark.greenplum.gpfdist.listen-port=6155
spark.greenplum.gpfdist.is-ssl false

About the Connector and Kubernetes TLS Configuration

When the Ingress is configured to use TLS, the secure connection is between the Greenplum Database segments (gpfdists) and the Ingress' load balancer located at the edge of the Kubernetes cluster.

Before you create a TLS-secured Ingress, you must:

  1. Create a client certificate and key for Greenplum Database gpfdists for TLS. Refer to gpfdists TLS Configuration for the configuration information.

  2. Create a certificate and key for the Ingress.

    Both the gpfdists certificate and key and the Ingress certificate and key must be trusted by the same authority.

  3. Create a TLS secret in Kubernetes based on the Ingress certificate and key. Refer to the TLS secrets Kubernetes documentation for more information.

    Example command to generate a Kubernetes TLS secret named gsc-k8s-secret-tls:

    $ kubectl --namespace "<name>" create secret tls "gsc-k8s-secret-tls" --key "<path_to>/<key>" --cert "<path_to>/<cert>"
    

Example Configuration (TLS)

A TLS-terminated Ingress named tls-example-ingress is defined with the following configuration:

---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: tls-example-ingress
spec:
  ingressClassName: nginx
  # Because k8s does not allow creating empty ingresses, we provide this dummy
  # rule. Additional rules will be added when new projects are created.
  # issue: https://github.com/kubernetes/kubernetes/issues/82203
  rules:
  - host: https-example.foo.com
    http:
      paths:
      - path: /
        pathType: Prefix
        backend:
          service:
            name: defaultbackend
            port:
              number: 8080
  tls:
  - hosts:
    - https-example.foo.com
    secretName: gsc-k8s-secret-tls

To use the Connector with this Ingress configuration, submit your Spark application with the following property settings.

Note: You must provide a value for spark.greenplum.gpfdist.host.

spark.extraListeners org.greenplum.GpfdistIngressListener
spark.greenplum.k8s.ingress.name tls-example-ingress
spark.greenplum.gpfdist.host https-example.foo.com
spark.greenplum.gpfdist.listen-port 6155
spark.greenplum.gpfdist.location-port 443
spark.greenplum.gpfdist.is-ssl true

Alternatively, you can submit the Spark application with these property settings.

Note: You must provide a value for spark.greenplum.gpfdist.host. spark.greenplum.location-port is not set, spark.greenplum.k8s.ingress.update-location-port must be set to true.

spark.extraListeners org.greenplum.GpfdistIngressListener
spark.greenplum.k8s.ingress.name tls-example-ingress
spark.greenplum.gpfdist.host https-example.foo.com
spark.greenplum.gpfdist.listen-port 6155
spark.greenplum.k8s.ingress.update-location-port true
spark.greenplum.gpfdist.is-ssl true

About the Required Kubernetes Privileges

The role that runs a Spark application using the Connector must be assigned this minimum Kubernetes object privilege set:

Kubernetes Object (resource) Minimum Permission Set (verb)
Pod (pods) create, get, watch, list, delete
Service (services) create, list, delete
Ingress (located in apiGroup networking.k8s.io) (ingresses) create, get, patch
ConfigMap (configmaps) create, list, delete

Configuring the gpfdist Server Timeout

Greenplum Database uses the gpfdist protocol to communicate with the gpfdist server on each Spark worker node. The Connector exposes the server.timeout DataFrame option to specify the "no activity" timeout for gpfdist server connections from Greenplum. The Connector defines activity as one or more bytes read from, or written to, the connection. The gpfdist server on the Spark node throws a timeout exception when the server.timeout period elapses without activity from Greenplum.

The default server.timeout is 300,000 milliseconds (5 minutes).

You set the server.timeout option in your code as described in Specifying the gpfdist Server Timeout.

check-circle-line exclamation-circle-line close-line
Scroll to top icon