This example describes how to create and run a simple Java Spark application that uses the to access a Kerberos-secured Greenplum Database cluster. The example also identifies configuration and access requirements for both Greenplum Database and Spark.

In this example, you:

  • Access the Greenplum Database cluster to obtain Kerberos configuration files, create a test table, and assign permissions.
  • Learn the Spark configuration settings required to run against a Kerberized Greenplum Database.
  • Are provided the Scala and Java Spark application code that uses the Connector to read from a table and write summary info to a different table residing in a Kerberos-secured Greenplum Database cluster.
  • Build and run the Java Spark application.

Assumptions

This example assumes the following:

  • Greenplum Database is secured with MIT Kerberos.
  • The Kerberos principal matches the Greenplum Database user name. For example, the Kerberos principal for the Greenplum user named bill is bill@<REALM>.
  • A Greenplum user accesses the database using a keytab file. The keytab contains the principal with which to access the Kerberos-secured Greenplum Database cluster.

Prerequisites

Before you begin this example, ensure the following:

  • The Greenplum Database cluster is Kerberos-secured.

  • You have access to the Kerberos krb5.conf configuration file from the Greenplum Database cluster.

  • You can identify a non-SUPERUSER Greenplum Database user and have access to this user's keytab file.

    Note:This example uses the Greenplum Database user name bill and keytab file name bill-keytab.
  • You have Greenplum Database SUPERUSER privileges.

Setting up and Configuring Greenplum Database

For this example, you will create a Greenplum Database table. You will also assign a Greenplum Database user the privileges required to access the table and to use the Connector to read from and write to the table.

Perform the following steps:

  1. Log in to Greenplum Database as the gpadmin user and set up the Greenplum environment:

    $ ssh gpadmin@gpmaster
    gpadmin@gpmaster$ . /usr/local/greenplum-db/greenplum_path.sh
    
  2. Identify or create a Greenplum database. This example uses a database named testdb. Ensure that the pg_hba.conf file is configured to allow bill to access the testdb database.

  3. Start the psql subsystem, connecting to the testdb database:

    gpadmin@gpmaster$ psql -d testdb
    
    testdb=#
    

    You are connected to testdb as the gpadmin SUPERUSER user.

  4. Create a test table named gpk_test_table, and insert some data into it:

    CREATE TABLE gpk_test_table (
      id int,
      name text,
      order_num bigint,
      total decimal(15,2),
      discount decimal(15,2),
      shipped bool,
      shipdate date)
    DISTRIBUTED BY (id);
    
    INSERT INTO gpk_test_table VALUES
        (1, 'buyer1', 12345, 100.23, 0.30, true, '2022-05-13'),
        (2, 'buyer2', 54321, 431.59, 0.50, false, '2021-02-01'),
        (3, 'buyer3', 20000, 22.07, 0.20, false, '2021-12-21'),
        (4, 'buyer4', 42351, 89.76, 0.10, true, '2022-3-08'),
        (5, 'buyer5', 00001, 8.23, 0.80, true, '2021-12-29');
    
  5. Assign Greenplum user bill the privileges required to read from the table and the privileges required to use the Connector to read from and write to tables as described in Role Privileges.

Constructing the JAAS Configuration File

A Java Authentication and Authorization Service (JAAS) configuration file provides runtime authentication information for a JVM application. In addition to identifying the Greenplum Database user name, you must also specify the file system location of the user keytab in this file:

pgjdbc {
  com.sun.security.auth.module.Krb5LoginModule required
  refreshKrb5Config=true
  useKeyTab=true
  keyTab=<path-to-keytab-file>
  principal=<greenplum-username>
  doNotPrompt=true
  debug=true;
};

You can specify an absolute path for <path-to-keytab-file> when the file exists on every Spark Worker at the exact same path. Use a relative path if you are submitting the keyTab file using spark-submit --files (described in the next section).

For this example, copy/paste the following text in a file named gscjaas.config:

pgjdbc {
  com.sun.security.auth.module.Krb5LoginModule required
  refreshKrb5Config=true
  useKeyTab=true
  keyTab=bill-keytab
  principal=bill
  doNotPrompt=true
  debug=true;
};

About Configuring Spark

The Spark Driver and Spark Executors must have access to the Kerberos krb5.conf configuration file, the Java Authentication and Authorization Service (JAAS) configuration file, and the Kerberos keytab file.

About Locating Files

You can locate the files on the host on which you run spark-submit, or you can pre-load the files to the Spark Workers instead of passing the files to the Spark Executor.

When you provide the --files option to spark-submit when you launch the application, Spark makes the files available to the Spark Executors. For example:

$ spark-submit --files gscjaas.config,bill-keytab,/etc/krb5.conf ...

You specify the file system locations of the krb5.conf and JAAS configuration files via JVM options that you provide to spark-submit. You specify the file system location of the user keytab via a property that you include in the JAAS configuration file.

Note: The table identifies the file names for this example; in practice, you can choose your own file name.
Property Name File Name (Example) Value Location and Description
java.security.krb5.conf krb5.conf The file system location of the krb5.conf file; you set this property via a JVM option.
java.security.auth.login.config gscjaas.config The file system location of the JAAS configuration file; you set this property via a JVM option.
keyTab bill-keytab The file system location of the Kerberos keytab file for the Greenplum Database user; you set this property in the JAAS configuration file.

About Specifying JVM Options

You specify the relative file system locations of the krb5.conf and JAAS configuration files via JVM options that you provide to spark-submit.

For example, you might set the following environment variables:

DRIVER_JVM_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf \
  -Djava.security.auth.login.config=gscjaas.config"
# executor JVM option are relative paths; the files are passed
# through using spark-submit
EXECUTOR_JVM_OPTS="-Djava.security.krb5.conf=krb5.conf \
  -Djava.security.auth.login.config=gscjaas.config"

And reference them in the spark-submit command line as follows:

$ spark-submit ... \
    --conf spark.driver.extraJavaOptions="${DRIVER_JVM_OPTS}" \
    --conf spark.executor.extraJavaOptions="${EXECUTOR_JVM_OPTS}" \
    ...

Compiling the Application

In this section, you are provided both Scala and Java language versions of the application code. Choose your language, copy the code into a text file, and then compile it to package it into a simple JAR file that you can include in the spark-submit command.

Example Scala Code

import org.apache.spark.sql.functions.{avg, count, sum}
import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}

object SparkDemo extends App {
  val spark = SparkSession.builder()
    .appName("greenplum-spark-kerberos-demo")
    .getOrCreate()

  /*
   * Define variables for relevant connection properties
   */
  val host: String = "gpmaster"
  val port: Int = 5432
  val database: String = "testdb"
  val user: String = "bill"

  val jdbcUrl = s"jdbc:postgresql://$host:$port/$database"

  /*
   * Define a Dataset that points to data in a Greenplum table.
   *
   * Spark is lazy -- the data is not moved until it is needed!
   */
  val testData: Dataset[Row] = spark
    .read
    .format("greenplum")
    .option("url", jdbcUrl)
    .option("user", user)
    .option("dbtable", "gpk_test_table")
    .load

  /*
   * Count the number of rows in the Dataset.
   *
   * Should be the same as the number of rows in GPDB table. Data is moved
   * from GPDB to Spark here in order for Spark to count the rows. The data is
   * not persisted in Spark.
   */
  testData.show();

  val rowCount: Long = testData.count();
  println(s"\nThere are $rowCount");

  /*
   * Define a query on the Dataset to perform some analysis.
   *
   * This creates a new Dataset the references the original DataFrame but
   * does not run the query yet.
   */
  val pricingSummaryReport: Dataset[Row] = i
   testData.where("shipdate <= date_add('2022-05-01', -30)")
    .groupBy("order_num", "shipped")
    .agg(
      sum("total").as("sum_totals"),
      avg("discount").as("avg_disc"),
      count("*").as("count_order"))
    .orderBy("order_num", "shipped")

  println("\nThe schema of the resulting DataSet is")
  pricingSummaryReport.printSchema()

  /*
   * Prints the Dataset to the console.
   *
   * Data is moved again from GPDB to Spark since the data is not persisted.
   */
  pricingSummaryReport.show()

  /*
   * Write the second Dataset back to GPDB as a new table.
   *
   * Data is moved again from GPDB to Spark since the data is not persisted.
   */
println("\nWriting pricing summary report to the Greenplum table 'pricing_summary'");
  pricingSummaryReport.write
    .format("greenplum")
    .option("url", jdbcUrl)
    .option("user", user)
    .option("dbtable", "pricing_summary")
    .mode(SaveMode.Overwrite)
    .save()

  spark.stop()
}

Example Java Code

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;

import static org.apache.spark.sql.functions.avg;
import static org.apache.spark.sql.functions.count;
import static org.apache.spark.sql.functions.expr;
import static org.apache.spark.sql.functions.sum;

public class SparkJavaDemo {
    public static void main(String[] args) {



        /* Spin up the spark session
        */
        SparkSession spark = SparkSession
               .builder()
               .appName("java-gp-spark-demo")
               .getOrCreate();

        /*
         * Define variables for relevant connection properties
         */
        final String host = "gpmaster";
        final Integer port = 5432;
        final String database = "testdb";
        final String user = "bill";

        final String jdbcUrl = String.format(
          "jdbc:postgresql://%s:%d/%s", host, port, database);

        /*
         * Define a Dataset that points to data in a Greenplum table.
         *
         * Spark is lazy -- the data is not moved until it is needed!
         */
        Dataset<Row> test_data = spark.read()
                .format("greenplum")
                .option("url", jdbcUrl)
                .option("user", user)
                .option("dbtable", "gpk_test_table")
                .load();

        /*
         * Count the number of rows in the Dataset.
         *
         * Should be the same as the number of rows in GPDB table. Data is
         * moved from GPDB to Spark here in order for Spark to count the rows.
         * The data is not persisted in Spark.
         */
        test_data.show();

        long rowCount = test_data.count();
        System.out.printf("\nThere are %d rows in the table", rowCount);

        /*
         * Define a query on the Dataset to perform some analysis.
         *
         * This creates a new Dataset the references the original DataFrame but
         * does not run the query yet.
         */
        Dataset<Row> pricingSummaryReport = test_data
                .where("shipdate <= date_add('2022-05-01', -30)")
                .groupBy("shipped")
                .agg(
                        sum("total").as("sum_totals"),
                        avg("discount").as("avg_disc"),
                        count("*").as("count_order"))
                .orderBy("shipped");

        System.out.println("\nThe schema of the resulting DataSet is");
        pricingSummaryReport.printSchema();

        /*
         * Prints the Dataset to the console.
         *
         * Data is moved again from GPDB to Spark since the data is not persisted.
         */
        pricingSummaryReport.show();

        /*
         * Write the second Dataset back to GPDB as a new table.
         *
         * Data is moved again from GPDB to Spark since the data is not persisted.
         */
        System.out.println(
          "\nWriting pricing summary report to the Greenplum table 'pricing_summary'");

        pricingSummaryReport.write()
                .format("greenplum")
                .option("url", jdbcUrl)
                .option("user", user)
                .option("dbtable", "pricing_summary")
                .mode(SaveMode.Overwrite)
                .save();

        spark.stop();
    }
}

Running the Application

Run the Spark application.

For example, if you compiled the sample code into a JAR file named gpk_demo.jar, the bill-keytab keytab and gscjaas.config JAAS configuration files are located in the current working directory, and the krb5.conf file resides in /etc/, you might create and run a script that includes the following commands:

#!/usr/bin/env bash

export JAVA_HOME="<your-java-home>"
export SPARK_HOME="<your-spark-home>"

SPARK_MASTER_URL="spark://<your-spark-master-host>:7077"
# driver JVM options
DRIVER_JVM_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf \
    -Djava.security.auth.login.config=gscjaas.config"
EXECUTOR_JVM_OPTS="-Djava.security.krb5.conf=krb5.conf \
    -Djava.security.auth.login.config=gscjaas.config"

${SPARK_HOME}/bin/spark-submit \
    --master "${SPARK_MASTER_URL}" \
    --files gscjaas.config,bill-keytab,/etc/krb5.conf \
    --conf spark.driver.extraJavaOptions="${DRIVER_JVM_OPTS}" \
    --conf spark.executor.extraJavaOptions="${EXECUTOR_JVM_OPTS}" \
    --jars greenplum-connector-apache-spark-scala_2.12-2.1.2.jar \
    gpk_demo.jar

Replace <your-java-home>, <your-spark-home>, and <your-spark-master-host> with the values specific to your Spark environment.

The application writes messages to stdout, including the number of rows that it read from the gpk_test_table table and the data set schema. You may also choose to examine the Greenplum Database pricing_summary table.

check-circle-line exclamation-circle-line close-line
Scroll to top icon