This example describes how to create and run a simple Java Spark application that uses the to access a Kerberos-secured Greenplum Database cluster. The example also identifies configuration and access requirements for both Greenplum Database and Spark.
In this example, you:
This example assumes the following:
bill
is bill@<REALM>
.Before you begin this example, ensure the following:
The Greenplum Database cluster is Kerberos-secured.
You have access to the Kerberos krb5.conf
configuration file from the Greenplum Database cluster.
You can identify a non-SUPERUSER
Greenplum Database user and have access to this user's keytab file.
bill
and keytab file name
bill-keytab
.
You have Greenplum Database SUPERUSER
privileges.
For this example, you will create a Greenplum Database table. You will also assign a Greenplum Database user the privileges required to access the table and to use the Connector to read from and write to the table.
Perform the following steps:
Log in to Greenplum Database as the gpadmin
user and set up the Greenplum environment:
$ ssh gpadmin@gpmaster
gpadmin@gpmaster$ . /usr/local/greenplum-db/greenplum_path.sh
Identify or create a Greenplum database. This example uses a database named testdb
. Ensure that the pg_hba.conf
file is configured to allow bill
to access the testdb
database.
Start the psql
subsystem, connecting to the testdb
database:
gpadmin@gpmaster$ psql -d testdb
testdb=#
You are connected to testdb
as the gpadmin
SUPERUSER
user.
Create a test table named gpk_test_table
, and insert some data into it:
CREATE TABLE gpk_test_table (
id int,
name text,
order_num bigint,
total decimal(15,2),
discount decimal(15,2),
shipped bool,
shipdate date)
DISTRIBUTED BY (id);
INSERT INTO gpk_test_table VALUES
(1, 'buyer1', 12345, 100.23, 0.30, true, '2022-05-13'),
(2, 'buyer2', 54321, 431.59, 0.50, false, '2021-02-01'),
(3, 'buyer3', 20000, 22.07, 0.20, false, '2021-12-21'),
(4, 'buyer4', 42351, 89.76, 0.10, true, '2022-3-08'),
(5, 'buyer5', 00001, 8.23, 0.80, true, '2021-12-29');
Assign Greenplum user bill
the privileges required to read from the table and the privileges required to use the Connector to read from and write to tables as described in Role Privileges.
A Java Authentication and Authorization Service (JAAS) configuration file provides runtime authentication information for a JVM application. In addition to identifying the Greenplum Database user name, you must also specify the file system location of the user keytab in this file:
pgjdbc {
com.sun.security.auth.module.Krb5LoginModule required
refreshKrb5Config=true
useKeyTab=true
keyTab=<path-to-keytab-file>
principal=<greenplum-username>
doNotPrompt=true
debug=true;
};
You can specify an absolute path for <path-to-keytab-file>
when the file exists on every Spark Worker at the exact same path. Use a relative path if you are submitting the keyTab
file using spark-submit --files
(described in the next section).
For this example, copy/paste the following text in a file named gscjaas.config
:
pgjdbc {
com.sun.security.auth.module.Krb5LoginModule required
refreshKrb5Config=true
useKeyTab=true
keyTab=bill-keytab
principal=bill
doNotPrompt=true
debug=true;
};
The Spark Driver and Spark Executors must have access to the Kerberos krb5.conf
configuration file, the Java Authentication and Authorization Service (JAAS) configuration file, and the Kerberos keytab file.
You can locate the files on the host on which you run spark-submit
, or you can pre-load the files to the Spark Workers instead of passing the files to the Spark Executor.
When you provide the --files
option to spark-submit
when you launch the application, Spark makes the files available to the Spark Executors. For example:
$ spark-submit --files gscjaas.config,bill-keytab,/etc/krb5.conf ...
You specify the file system locations of the krb5.conf
and JAAS configuration files via JVM options that you provide to spark-submit
. You specify the file system location of the user keytab via a property that you include in the JAAS configuration file.
Property Name | File Name (Example) | Value Location and Description |
---|---|---|
java.security.krb5.conf | krb5.conf | The file system location of the krb5.conf file; you set this property via a JVM option. |
java.security.auth.login.config | gscjaas.config | The file system location of the JAAS configuration file; you set this property via a JVM option. |
keyTab | bill-keytab | The file system location of the Kerberos keytab file for the Greenplum Database user; you set this property in the JAAS configuration file. |
You specify the relative file system locations of the krb5.conf
and JAAS configuration files via JVM options that you provide to spark-submit
.
For example, you might set the following environment variables:
DRIVER_JVM_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf \
-Djava.security.auth.login.config=gscjaas.config"
# executor JVM option are relative paths; the files are passed
# through using spark-submit
EXECUTOR_JVM_OPTS="-Djava.security.krb5.conf=krb5.conf \
-Djava.security.auth.login.config=gscjaas.config"
And reference them in the spark-submit
command line as follows:
$ spark-submit ... \
--conf spark.driver.extraJavaOptions="${DRIVER_JVM_OPTS}" \
--conf spark.executor.extraJavaOptions="${EXECUTOR_JVM_OPTS}" \
...
In this section, you are provided both Scala and Java language versions of the application code. Choose your language, copy the code into a text file, and then compile it to package it into a simple JAR file that you can include in the spark-submit
command.
import org.apache.spark.sql.functions.{avg, count, sum}
import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
object SparkDemo extends App {
val spark = SparkSession.builder()
.appName("greenplum-spark-kerberos-demo")
.getOrCreate()
/*
* Define variables for relevant connection properties
*/
val host: String = "gpmaster"
val port: Int = 5432
val database: String = "testdb"
val user: String = "bill"
val jdbcUrl = s"jdbc:postgresql://$host:$port/$database"
/*
* Define a Dataset that points to data in a Greenplum table.
*
* Spark is lazy -- the data is not moved until it is needed!
*/
val testData: Dataset[Row] = spark
.read
.format("greenplum")
.option("url", jdbcUrl)
.option("user", user)
.option("dbtable", "gpk_test_table")
.load
/*
* Count the number of rows in the Dataset.
*
* Should be the same as the number of rows in GPDB table. Data is moved
* from GPDB to Spark here in order for Spark to count the rows. The data is
* not persisted in Spark.
*/
testData.show();
val rowCount: Long = testData.count();
println(s"\nThere are $rowCount");
/*
* Define a query on the Dataset to perform some analysis.
*
* This creates a new Dataset the references the original DataFrame but
* does not run the query yet.
*/
val pricingSummaryReport: Dataset[Row] = i
testData.where("shipdate <= date_add('2022-05-01', -30)")
.groupBy("order_num", "shipped")
.agg(
sum("total").as("sum_totals"),
avg("discount").as("avg_disc"),
count("*").as("count_order"))
.orderBy("order_num", "shipped")
println("\nThe schema of the resulting DataSet is")
pricingSummaryReport.printSchema()
/*
* Prints the Dataset to the console.
*
* Data is moved again from GPDB to Spark since the data is not persisted.
*/
pricingSummaryReport.show()
/*
* Write the second Dataset back to GPDB as a new table.
*
* Data is moved again from GPDB to Spark since the data is not persisted.
*/
println("\nWriting pricing summary report to the Greenplum table 'pricing_summary'");
pricingSummaryReport.write
.format("greenplum")
.option("url", jdbcUrl)
.option("user", user)
.option("dbtable", "pricing_summary")
.mode(SaveMode.Overwrite)
.save()
spark.stop()
}
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.avg;
import static org.apache.spark.sql.functions.count;
import static org.apache.spark.sql.functions.expr;
import static org.apache.spark.sql.functions.sum;
public class SparkJavaDemo {
public static void main(String[] args) {
/* Spin up the spark session
*/
SparkSession spark = SparkSession
.builder()
.appName("java-gp-spark-demo")
.getOrCreate();
/*
* Define variables for relevant connection properties
*/
final String host = "gpmaster";
final Integer port = 5432;
final String database = "testdb";
final String user = "bill";
final String jdbcUrl = String.format(
"jdbc:postgresql://%s:%d/%s", host, port, database);
/*
* Define a Dataset that points to data in a Greenplum table.
*
* Spark is lazy -- the data is not moved until it is needed!
*/
Dataset<Row> test_data = spark.read()
.format("greenplum")
.option("url", jdbcUrl)
.option("user", user)
.option("dbtable", "gpk_test_table")
.load();
/*
* Count the number of rows in the Dataset.
*
* Should be the same as the number of rows in GPDB table. Data is
* moved from GPDB to Spark here in order for Spark to count the rows.
* The data is not persisted in Spark.
*/
test_data.show();
long rowCount = test_data.count();
System.out.printf("\nThere are %d rows in the table", rowCount);
/*
* Define a query on the Dataset to perform some analysis.
*
* This creates a new Dataset the references the original DataFrame but
* does not run the query yet.
*/
Dataset<Row> pricingSummaryReport = test_data
.where("shipdate <= date_add('2022-05-01', -30)")
.groupBy("shipped")
.agg(
sum("total").as("sum_totals"),
avg("discount").as("avg_disc"),
count("*").as("count_order"))
.orderBy("shipped");
System.out.println("\nThe schema of the resulting DataSet is");
pricingSummaryReport.printSchema();
/*
* Prints the Dataset to the console.
*
* Data is moved again from GPDB to Spark since the data is not persisted.
*/
pricingSummaryReport.show();
/*
* Write the second Dataset back to GPDB as a new table.
*
* Data is moved again from GPDB to Spark since the data is not persisted.
*/
System.out.println(
"\nWriting pricing summary report to the Greenplum table 'pricing_summary'");
pricingSummaryReport.write()
.format("greenplum")
.option("url", jdbcUrl)
.option("user", user)
.option("dbtable", "pricing_summary")
.mode(SaveMode.Overwrite)
.save();
spark.stop();
}
}
Run the Spark application.
For example, if you compiled the sample code into a JAR file named gpk_demo.jar
, the bill-keytab
keytab and gscjaas.config
JAAS configuration files are located in the current working directory, and the krb5.conf
file resides in /etc/
, you might create and run a script that includes the following commands:
#!/usr/bin/env bash
export JAVA_HOME="<your-java-home>"
export SPARK_HOME="<your-spark-home>"
SPARK_MASTER_URL="spark://<your-spark-master-host>:7077"
# driver JVM options
DRIVER_JVM_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf \
-Djava.security.auth.login.config=gscjaas.config"
EXECUTOR_JVM_OPTS="-Djava.security.krb5.conf=krb5.conf \
-Djava.security.auth.login.config=gscjaas.config"
${SPARK_HOME}/bin/spark-submit \
--master "${SPARK_MASTER_URL}" \
--files gscjaas.config,bill-keytab,/etc/krb5.conf \
--conf spark.driver.extraJavaOptions="${DRIVER_JVM_OPTS}" \
--conf spark.executor.extraJavaOptions="${EXECUTOR_JVM_OPTS}" \
--jars greenplum-connector-apache-spark-scala_2.12-2.1.2.jar \
gpk_demo.jar
Replace <your-java-home>
, <your-spark-home>
, and <your-spark-master-host>
with the values specific to your Spark environment.
The application writes messages to stdout
, including the number of rows that it read from the gpk_test_table
table and the data set schema. You may also choose to examine the Greenplum Database pricing_summary
table.