In this example, you use the VMware Greenplum Connector for Apache NiFi to load CSV-format data into Greenplum Database.
The CSV data represents department expense records, and includes department identifier (integer), month (integer), and expenses (decimal) fields. For example, a record for a department with identifier 123 that spent $456.78 in the month of September follows:
"123","09","456.78"
A record with the same department identifier and month identifies a new expense total for the month, replacing the previous amount.
You will use the Apache NiFi user interface to create a dataflow between the GetFile
and PutGreenplumRecord
processors.
In this flow:
GetFile
processor reads CSV files from the /tmp/gcan_data
directory on the NiFi system and generates record-based FlowFiles.PutGreenplumRecord
processor writes the data that it receives to a Greenplum Database table named gcan_dept_expense
located in the public
schema of a database named testdb
.Before you start this procedure, ensure that you:
For simplicity, this example assumes that Apache NiFi, Greenplum Database, and the Greenplum Streaming Server are running on the same host.
Step 1: Prepare the Example Environment
Step 2: Add and Configure the GetFile Processor
Step 3: Configure a GreenplumGPSSAdapter Controller Service
Step 4: Identify the Input Data Source, Format, and Schema
Step 5: Configure a Record Reader Controller Service
Step 6: Add and Configure the PutGreenplumRecord Processor
Step 7: Connect and Start the Processors
Step 8: Create the Greenplum Database and Table
Step 9: Trigger the Flow and Check Results
In this step, you create sample data files.
Log in to your Apache NiFi client system.
$ ssh user@nifihost
user@nifihost$
Create a working directory. For example:
user@nifihost$ mkdir gcan_work
user@nifihost$ cd gcan_work
Prepare some sample data:
Write some data into a CSV file named sample1.csv
:
user@nifihost$ echo '"dept_id","month","expenses"
"1313131","12","1313.13"
"3535353","11","761.35"
"7979797","10","4489.00"
"7979797","11","18.72"
"3535353","10","6001.94"
"7979797","12","173.18"
"1313131","10","492.83"
"3535353","12","81.12"
"1313131","11","368.27"' > sample1.csv
Write some data into a CSV file named sample2.csv
:
user@nifihost$ echo '"dept_id","month","expenses"
"1313131","11","555.55"
"7979797","10","5555.55"
"2222222","12","22.22"' > sample2.csv
The data added to this file represents an expense for a new department (2222222), and new/updated expense values for two existing departments/months.
Create an input directory and set the appropriate permissions:
user@nifihost$ mkdir /tmp/gcan_data
user@nifihost$ chmod a+rwx /tmp/gcan_data
You will copy the sample data files to the input directory later in this procedure.
Start the Apache NiFi user interface. For example, if your NiFi server is running on the local host on port number 9050, enter the following in a web browser window:
http://localhost:9050
Perform the following steps to add and configure a GetFile
processor instance:
Click the Processors icon in the Apache NiFi components toolbar and drag to the canvas.
This action opens the Add Processor dialog.
Search for the GetFile
Processor by typing in the Filter field.
Click Add.
This action adds a GetFile
processor component to the canvas.
Right-click on the component and select Configure from the context menu.
This action displays the Configure Processor dialog.
Select the PROPERTIES tab.
/tmp/gcan_data
.Perform the steps below to configure an instance of the GreenplumGPSSAdapter
controller service named GreenplumGPSSAdapter-testdb
:
Click on an empty area in the Apache NiFi canvas.
Click on the configure icon in the Operate Palette.
This action opens the NiFi Flow Configuration dialog.
Select the CONTROLLER SERVICES tab.
Click the + icon to add a new controller service.
This action opens the Add Controller Service dialog.
Type Greenplum
in the Filter field, select the GreenplumGPSSAdapter
entry, and click ADD.
This action adds a GreenplumGPSSAdapter
row to the table of currently defined controller services, and selects this row.
Click on the configure icon in the last column of the table to configure the service.
This action opens the Configure Controller Service dialog.
Select the SETTINGS tab, locate the Name field, and set the name to GreenplumGPSSAdapter-testdb
.
Select the PROPERTIES tab, locate the properties identified in the table below, and set each Value as specified:
Property Name | Value | Comments |
---|---|---|
Greenplum Streaming Server Host | localhost | Enter your host |
Greenplum Streaming Server Port | 5000 | Retain the default |
Greenplum Database Master Host | localhost | Enter your Greenplum master host |
Greenplum Database Master Port | 5432 | Retain the default |
Greenplum Database Name | testdb | |
Greenplum Database User Name | gpadmin | You can choose a different Greenplum user |
Greenplum Database User Password | changeme | Enter the password |
APPLY the Configure Controller Service changes.
Click the thunderbolt icon in the GreenplumGPSSAdapter-testdb
row to enable the controller service.
The Enable Controller Service dialog displays.
Click X in the upper right hand of the dialog to close the NiFi Flow Configuration window.
The source of the data is the GetFile
processor, and the data format is CSV.
Because the CSV file includes a header row, you could choose to have Apache NiFi infer the schema. For this exercise, you will explicitly define and specify the schema.
As decribed above, the CSV data represents department expense records, and includes department identifier (integer), month (integer), and expenses (decimal) fields:
"123","09","456.78"
The schema that corresponds to records of this format follows:
{
"name": "dept_expense_record",
"namespace": "nifi_csv_example",
"type": "record",
"fields": [
{ "name": "dept_id", "type": ["int", "null"] },
{ "name": "month", "type": ["int", "null"] },
{ "name": "expenses", "type": {"type": "bytes", "logicalType": "decimal", "precision": 11, "scale": 2 } }
]
}
You will specify this schema when you configure a record reader controller service for a PutGreenplumRecord
processor instance.
Perform the steps below to configure an instance of a CSV record reader controller service named CSVReader-dept-expenses
:
Click on an empty area in the Apache NiFi canvas.
Click on the configure icon in the Operate Palette.
This action opens the NiFi Flow Configuration dialog.
Select the CONTROLLER SERVICES tab.
Click the + icon to add a new controller service.
This action opens the Add Controller Service dialog.
Type CSV
in the Filter field, select the CSVReader
entry, and click ADD.
This action adds a CSVReader
row to the table of currently defined controller services, and selects this row.
Click on the configure icon in the last column of the table to configure the service.
This action opens the Configure Controller Service dialog.
Select the SETTINGS tab, locate the Name field, and set the name to CSVReader-dept-expenses
.
Select the PROPERTIES tab, locate the properties identified in the table below, and set each Value as specified:
Property Name | Value | Comments |
---|---|---|
Schema Access Strategy | Use ‘Schema Text’ Property | The Schema Text property value will specify the schema definition |
Treat First Line as Header | true | The first line of the file is the header |
Locate the Schema Text
property, and copy/paste the schema definition below into the Value field:
{
"name": "dept_expense_record",
"namespace": "nifi_csv_example",
"type": "record",
"fields": [
{ "name": "dept_id", "type": ["int", "null"] },
{ "name": "month", "type": ["int", "null"] },
{ "name": "expenses", "type": {"type": "bytes", "logicalType": "decimal", "precision": 11, "scale": 2 } }
]
}
Retain the default values for the other properties.
APPLY the Configure Controller Service changes.
Click the thunderbolt icon in the CSVReader-dept-expenses
row to enable the controller service.
The Enable Controller Service dialog displays.
Click X in the upper right hand of the dialog to close the NiFi Flow Configuration window.
Perform the following steps to add and configure a PutGreenplumRecord
processor instance:
Click the Processors icon in the Apache NiFi components toolbar and drag it to the canvas.
This action opens the Add Processor dialog.
Search for the PutGreenplumRecord
Processor by typing in the Filter field.
Click Add.
This action adds a PutGreenplumRecord
Processor component to the canvas.
Right-click on the component and select Configure from the context menu.
This action displays the Configure Processor dialog.
Select the SETTINGS tab.
Automatically terminate all relationships by checking the failure, retry, and success checkboxes.
Select the PROPERTIES tab.
Locate the Record Reader
property. Click in the Value field, then select CSVReader-dept-expenses
from the drop-down menu, and click OK.
Locate the Greenplum Adapter
property. Click in the Value field, select GreenplumGPSSAdapter-testdb
from the drop-down menu, and click OK.
Locate the properties identified in the table below and set each Value as specified:
Property Name | Value | Comments |
---|---|---|
Schema Name | public | Retain the default |
Table Name | gcan_dept_expense | You will create this table in the next step |
Operation Type | MERGE | Merge can both insert and update a table row |
Match Columns | dept_id, month | A table row is uniquely identified by these column values |
Translate Field Names | true | Retain the default |
Unmatched Field Behavior | Ignore Unmatched Fields | Retain the default |
Unmatched Column Behavior | Warn on Unmatched Columns | Log a warning message |
Rollback On Failure | false | Retain the default |
Maximum Record Batch Size | 100 |
APPLY the Configure Processor changes.
In this step, you create a connection between the GetFile
and PutGreenplumRecord
processors on the canvas, and then start the processors.
GetFile
component on the canvas.Click the arrow icon and drag over to the PutGreenplumRecord
component.
This action displays the Create Connection dialog.
No configuration is required; click ADD to create the connection.
A line/box that represents the connection is displayed on the NiFi canvas.
Right-click on the GetFile
component and select Start from the context menu to start the processor.
The icon next to the processor name changes to a green sideways triangle.
Right-click on the PutGreenplumRecord
component and select Start from the context menu to start the processor.
The icon next to the processor name changes to a green sideways triangle.
In this step, you create the Greenplum database testdb
if it does not yet exist, and create the target Greenplum table.
Open a new terminal window, log in to the Greenplum Database master host as the gpadmin
administrative user, and set up your Greenplum environment. For example:
$ ssh gpadmin@gpmaster
gpadmin@gpmaster$ . /usr/local/greenplum-db/greenplum_path.sh
Create a database named testdb
if one does not already exist:
gpadmin@gpmaster$ createdb testdb
Start the psql
subsystem:
gpadmin@gpmaster$ psql -d testdb
The Greenplum Streaming Server must be registered in the database to use the Connector. You can register the Greenplum Streaming Server as follows:
testdb=# CREATE EXTENSION IF NOT EXISTS gpss;
This command registers the extension only if it has not been previously registered.
Create the target Greenplum Database table named gcan_dept_expense
:
testdb=# CREATE TABLE gcan_dept_expense( dept_id int8, month int8, expenses decimal(11,2) );
This table definition matches the input data schema that you specified for the record reader in Step 5.
Stay in the psql
subsystem, you will be back.
You will individually copy the sample data files to /tmp/gcan_data
on the Apache NiFi system to trigger the flow. You will check the results by observing the Apache NiFi user interface and querying the Greenplum table.
You will also generate a sample file with bad data, trigger the flow, and check the results.
Copy the sample1.csv
data file to the input directory:
user@nifihost$ cp gcan_work/sample1.csv /tmp/gcan_data/
Examine the GetFile
and PutGreenplumRecord
processor components on the NiFi canvas, and notice when their statistics update.
Examine the contents of the Greenplum Database table. Enter the following command in the psql
terminal session that you used earlier:
testdb=# SELECT * FROM gcan_dept_expense ORDER BY dept_id, month;
dept_id | month | expenses
---------+-------+----------
1313131 | 10 | 492.83
1313131 | 11 | 555.55
1313131 | 12 | 1313.13
3535353 | 10 | 6001.94
3535353 | 11 | 761.35
3535353 | 12 | 81.12
7979797 | 10 | 5555.55
7979797 | 11 | 18.72
7979797 | 12 | 173.18
(9 rows)
Copy the sample2.csv
data file to the input directory:
user@nifihost$ cp gcan_work/sample2.csv /tmp/gcan_data/
Wait until flow between the GetFile
and PutGreenplumRecord
processor components is triggered.
Query the table again:
testdb=# SELECT * FROM gcan_dept_expense ORDER BY dept_id, month;
dept_id | month | expenses
---------+-------+----------
1313131 | 10 | 492.83
1313131 | 11 | 555.55
1313131 | 12 | 1313.13
2222222 | 12 | 22.22
3535353 | 10 | 6001.94
3535353 | 11 | 761.35
3535353 | 12 | 81.12
7979797 | 10 | 5555.55
7979797 | 11 | 18.72
7979797 | 12 | 173.18
(10 rows)
Notice the new row for department 2222222
, and the updated expenses
values for department 1313131
, month 11
and department 7979797
, month 10
.
Write a sample file with bad input data directly to the input directory:
user@nifihost$ echo '"dept_id","month","expenses"
"1313131","12","12222.22"
"7979797","zz","5555.55"' > /tmp/gcan_data/sample3.csv
This data includes the value zz
in what should be an int
field.
Observe the NiFi canvas and wait for the flow to triger. Notice that the PutGreenplumRecord
processor canvas component eventually displays a red box in the right-hand corner. Hover over the red box and view the warning message. The processor generates a NumberFormatException
when attempting to write the second record to the Greenplum table.
Query the table again. In this query, filter on the department identifier in the first record of the sample3.csv
data file to display only the table rows associated with that department:
testdb=# SELECT * FROM gcan_dept_expense WHERE dept_id=1313131 ORDER BY month;
dept_id | month | expenses
---------+-------+----------
1313131 | 10 | 492.83
1313131 | 11 | 555.55
1313131 | 12 | 1313.13
(3 rows)
Notice that the first record in sample3.csv
, even though correctly formatted, was not written to the table. The Connector must process all records in the FlowFile successfully before it will write the batch to Greenplum Database.