The PXF HDFS connector supports SequenceFile format binary data. This section describes how to use PXF to read and write HDFS SequenceFile data, including how to create, insert, and query data in external tables that reference files in the HDFS data store.

PXF supports reading or writing SequenceFile files compressed with the default, bzip2, and gzip codecs.

Prerequisites

Ensure that you have met the PXF Hadoop Prerequisites before you attempt to read data from or write data to HDFS.

Creating the External Table

The PXF HDFS connector hdfs:SequenceFile profile supports reading and writing HDFS data in SequenceFile binary format. When you insert records into a writable external table, the block(s) of data that you insert are written to one or more files in the directory that you specified.

Note: External tables that you create with a writable profile can only be used for INSERT operations. If you want to query the data that you inserted, you must create a separate readable external table that references the HDFS directory.

Use the following syntax to create a Greenplum Database external table that references an HDFS directory: 

CREATE [WRITABLE] EXTERNAL TABLE <table_name> 
    ( <column_name> <data_type> [, ...] | LIKE <other_table> )
LOCATION ('pxf://<path-to-hdfs-dir>
    ?PROFILE=hdfs:SequenceFile[&SERVER=<server_name>][&<custom-option>=<value>[...]]')
FORMAT 'CUSTOM' (<formatting-properties>)
[DISTRIBUTED BY (<column_name> [, ... ] ) | DISTRIBUTED RANDOMLY];

The specific keywords and values used in the Greenplum Database CREATE EXTERNAL TABLE command are described in the table below.

Keyword Value
<path‑to‑hdfs‑dir> The path to the directory in the HDFS data store. When the <server_name> configuration includes a pxf.fs.basePath property setting, PXF considers <path‑to‑hdfs‑dir> to be relative to the base path specified. Otherwise, PXF considers it to be an absolute path. <path‑to‑hdfs‑dir> must not specify a relative path nor include the dollar sign ($) character.
PROFILE The PROFILE keyword must specify hdfs:SequenceFile.
SERVER=<server_name> The named server configuration that PXF uses to access the data. PXF uses the default server if not specified.
<custom‑option> <custom-option>s are described below.
FORMAT Use FORMATCUSTOM’ with (FORMATTER='pxfwritable_export') (write) or (FORMATTER='pxfwritable_import') (read).
DISTRIBUTED BY If you want to load data from an existing Greenplum Database table into the writable external table, consider specifying the same distribution policy or <column_name> on both tables. Doing so will avoid extra motion of data between segments on the load operation.

SequenceFile format data can optionally employ record or block compression and a specific compression codec.

When you use the hdfs:SequenceFile profile to write SequenceFile format data, you must provide the name of the Java class to use for serializing/deserializing the binary data. This class must provide read and write methods for each data type referenced in the data schema.

You specify the compression type and codec, and the Java serialization/deserialization class, via custom options to the CREATE EXTERNAL TABLE LOCATION clause. The hdfs:SequenceFile profile supports the following custom options:

Option Value Description
COMPRESSION_CODEC The compression codec alias. Supported compression codecs include: default, bzip2, gzip, and uncompressed. If this option is not provided, Greenplum Database performs no data compression.
COMPRESSION_TYPE The compression type to employ; supported values are RECORD (the default) or BLOCK.
DATA_SCHEMA The name of the writer serialization/deserialization class. The jar file in which this class resides must be in the PXF classpath. This option is required for the hdfs:SequenceFile profile and has no default value. (Note: The equivalent option named DATA-SCHEMA is deprecated and may be removed in a future release.)
IGNORE_MISSING_PATH A Boolean value that specifies the action to take when <path-to-hdfs-dir> is missing or invalid. The default value is false, PXF returns an error in this situation. When the value is true, PXF ignores missing path errors and returns an empty fragment.

Reading and Writing Binary Data

Use the HDFS connector hdfs:SequenceFile profile when you want to read or write SequenceFile format data to HDFS. Files of this type consist of binary key/value pairs. SequenceFile format is a common data transfer format between MapReduce jobs.

Example: Writing Binary Data to HDFS

In this example, you create a Java class named PxfExample_CustomWritable that will serialize/deserialize the fields in the sample schema used in previous examples. You will then use this class to access a writable external table that you create with the hdfs:SequenceFile profile and that uses the default PXF server.

Perform the following procedure to create the Java class and writable table.

  1. Prepare to create the sample Java class:

    $ mkdir -p pxfex/com/example/pxf/hdfs/writable/dataschema
    $ cd pxfex/com/example/pxf/hdfs/writable/dataschema
    $ vi PxfExample_CustomWritable.java
    
  2. Copy and paste the following text into the PxfExample_CustomWritable.java file:

    package com.example.pxf.hdfs.writable.dataschema;
    
    import org.apache.hadoop.io.*;
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.lang.reflect.Field;
    
    /**
     * PxfExample_CustomWritable class - used to serialize and deserialize data with
     * text, int, and float data types
     */
    public class PxfExample_CustomWritable implements Writable {
    
        public String st1, st2;
        public int int1;
        public float ft;
    
        public PxfExample_CustomWritable() {
            st1 = new String("");
            st2 = new String("");
            int1 = 0;
            ft = 0.f;
        }
    
        public PxfExample_CustomWritable(int i1, int i2, int i3) {
    
            st1 = new String("short_string___" + i1);
            st2 = new String("short_string___" + i1);
            int1 = i2;
            ft = i1 * 10.f * 2.3f;
    
        }
    
        String GetSt1() {
            return st1;
        }
    
        String GetSt2() {
            return st2;
        }
    
        int GetInt1() {
            return int1;
        }
    
        float GetFt() {
            return ft;
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
    
            Text txt = new Text();
            txt.set(st1);
            txt.write(out);
            txt.set(st2);
            txt.write(out);
    
            IntWritable intw = new IntWritable();
            intw.set(int1);
            intw.write(out);
    
            FloatWritable fw = new FloatWritable();
            fw.set(ft);
            fw.write(out);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
    
            Text txt = new Text();
            txt.readFields(in);
            st1 = txt.toString();
            txt.readFields(in);
            st2 = txt.toString();
    
            IntWritable intw = new IntWritable();
            intw.readFields(in);
            int1 = intw.get();
    
            FloatWritable fw = new FloatWritable();
            fw.readFields(in);
            ft = fw.get();
        }
    
        public void printFieldTypes() {
            Class myClass = this.getClass();
            Field[] fields = myClass.getDeclaredFields();
    
            for (int i = 0; i < fields.length; i++) {
                System.out.println(fields[i].getType().getName());
            }
        }
    }
    
  3. Compile and create a Java class JAR file for PxfExample_CustomWritable. Provide a classpath that includes the hadoop-common.jar file for your Hadoop distribution. For example, if you installed the Hortonworks Data Platform Hadoop client:

    $ javac -classpath /usr/hdp/current/hadoop-client/hadoop-common.jar  PxfExample_CustomWritable.java
    $ cd ../../../../../../
    $ jar cf pxfex-customwritable.jar com
    $ cp pxfex-customwritable.jar /tmp/
    

    (Your Hadoop library classpath may differ.)

  4. Copy the pxfex-customwritable.jar file to the Greenplum Database coordinator host. For example:

    $ scp pxfex-customwritable.jar gpadmin@coordinator:/home/gpadmin
    
  5. Log in to your Greenplum Database coordinator host:

    $ ssh gpadmin@<coordinator>
    
  6. Copy the pxfex-customwritable.jar JAR file to the user runtime library directory, and note the location. For example, if PXF_BASE=/usr/local/pxf-gp6:

    gpadmin@coordinator$ cp /home/gpadmin/pxfex-customwritable.jar /usr/local/pxf-gp6/lib/pxfex-customwritable.jar
    
  7. Synchronize the PXF configuration to the Greenplum Database cluster:

    gpadmin@coordinator$ pxf cluster sync
    
  8. Restart PXF on each Greenplum Database host as described in Restarting PXF.

  9. Use the PXF hdfs:SequenceFile profile to create a Greenplum Database writable external table. Identify the serialization/deserialization Java class you created above in the DATA_SCHEMA <custom-option>. Use BLOCK mode compression with bzip2 when you create the writable table.

    postgres=# CREATE WRITABLE EXTERNAL TABLE pxf_tbl_seqfile (location text, month text, number_of_orders integer, total_sales real)
                LOCATION ('pxf://data/pxf_examples/pxf_seqfile?PROFILE=hdfs:SequenceFile&DATA_SCHEMA=com.example.pxf.hdfs.writable.dataschema.PxfExample_CustomWritable&COMPRESSION_TYPE=BLOCK&COMPRESSION_CODEC=bzip2')
              FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
    

    Notice that the 'CUSTOM' FORMAT <formatting-properties> specifies the built-in pxfwritable_export formatter.

  10. Write a few records to the pxf_seqfile HDFS directory by inserting directly into the pxf_tbl_seqfile table. For example:

    postgres=# INSERT INTO pxf_tbl_seqfile VALUES ( 'Frankfurt', 'Mar', 777, 3956.98 );
    postgres=# INSERT INTO pxf_tbl_seqfile VALUES ( 'Cleveland', 'Oct', 3812, 96645.37 );
    
  11. Recall that Greenplum Database does not support directly querying a writable external table. To read the data in pxf_seqfile, create a readable external Greenplum Database referencing this HDFS directory:

    postgres=# CREATE EXTERNAL TABLE read_pxf_tbl_seqfile (location text, month text, number_of_orders integer, total_sales real)
                LOCATION ('pxf://data/pxf_examples/pxf_seqfile?PROFILE=hdfs:SequenceFile&DATA_SCHEMA=com.example.pxf.hdfs.writable.dataschema.PxfExample_CustomWritable')
              FORMAT 'CUSTOM' (FORMATTER='pxfwritable_import');
    

    You must specify the DATA_SCHEMA <custom-option> when you read HDFS data via the hdfs:SequenceFile profile. You need not provide compression-related options.

  12. Query the readable external table read_pxf_tbl_seqfile:

    gpadmin=# SELECT * FROM read_pxf_tbl_seqfile ORDER BY total_sales;
    
     location  | month | number_of_orders | total_sales 
    -----------+-------+------------------+-------------
     Frankfurt | Mar   |              777 |     3956.98
     Cleveland | Oct   |             3812 |     96645.4
    (2 rows)
    

Reading the Record Key

When a Greenplum Database external table references SequenceFile or another data format that stores rows in a key-value format, you can access the key values in Greenplum queries by using the recordkey keyword as a field name.

The field type of recordkey must correspond to the key type, much as the other fields must match the HDFS data. 

You can define recordkey to be any of the following Hadoop types:

  • BooleanWritable
  • ByteWritable
  • DoubleWritable
  • FloatWritable
  • IntWritable
  • LongWritable
  • Text

If no record key is defined for a row, Greenplum Database returns the id of the segment that processed the row.

Example: Using Record Keys

Create an external readable table to access the record keys from the writable table pxf_tbl_seqfile that you created in Example: Writing Binary Data to HDFS. Define the recordkey in this example to be of type int8.

postgres=# CREATE EXTERNAL TABLE read_pxf_tbl_seqfile_recordkey(recordkey int8, location text, month text, number_of_orders integer, total_sales real)
                LOCATION ('pxf://data/pxf_examples/pxf_seqfile?PROFILE=hdfs:SequenceFile&DATA_SCHEMA=com.example.pxf.hdfs.writable.dataschema.PxfExample_CustomWritable')
          FORMAT 'CUSTOM' (FORMATTER='pxfwritable_import');
gpadmin=# SELECT * FROM read_pxf_tbl_seqfile_recordkey;
 recordkey |  location   | month | number_of_orders | total_sales 
-----------+-------------+-------+------------------+-------------
         2 | Frankfurt   | Mar   |              777 |     3956.98
         1 | Cleveland   | Oct   |             3812 |     96645.4
(2 rows)

You did not define a record key when you inserted the rows into the writable table, so the recordkey identifies the segment on which the row data was processed.

check-circle-line exclamation-circle-line close-line
Scroll to top icon