Ready to get started?

Download a free trial of the AlloyDB Driver to get started:

 Download Now

Learn more:

AlloyDB Icon AlloyDB JDBC Driver

Rapidly create and deploy powerful Java applications that integrate with AlloyDB.

Perform Batch Operations with AlloyDB Data in Apache NiFi



Connect to AlloyDB data and perform batch operations in Apache NiFi using the CData JDBC Driver.

Apache NiFi supports powerful and scalable directed graphs of data routing, transformation, and system mediation logic. When paired with the CData JDBC Driver for AlloyDB, NiFi can work with live AlloyDB data. This article shows how to read data from a CSV file and perform batch operations (INSERT/UPDATE/DELETE) using the CData JDBC Driver for AlloyDB data in Apache NiFi (version 1.9.0 or later).

With built-in optimized data processing, the CData JDBC Driver offers unmatched performance for interacting with live AlloyDB data. When you issue complex SQL queries to AlloyDB, the driver pushes supported SQL operations, like filters and aggregations, directly to AlloyDB and utilizes the embedded SQL engine to process unsupported operations client-side (often SQL functions and JOIN operations). Its built-in dynamic metadata querying allows you to work with and analyze AlloyDB data using native data types.

Generate a JDBC URL

We need a JDBC URL to connect to AlloyDB data from Apachi NiFi.

Built-in Connection String Designer

For assistance in constructing the JDBC URL, use the connection string designer built into the AlloyDB JDBC Driver. Either double-click the JAR file or execute the jar file from the command-line.

java -jar cdata.jdbc.alloydb.jar

Fill in the connection properties and copy the connection string to the clipboard.

The following connection properties are usually required in order to connect to AlloyDB.

  • Server: The host name or IP of the server hosting the AlloyDB database.
  • User: The user which will be used to authenticate with the AlloyDB server.
  • Password: The password which will be used to authenticate with the AlloyDB server.

You can also optionally set the following:

  • Database: The database to connect to when connecting to the AlloyDB Server. If this is not set, the user's default database will be used.
  • Port: The port of the server hosting the AlloyDB database. This property is set to 5432 by default.

Authenticating with Standard Authentication

Standard authentication (using the user/password combination supplied earlier) is the default form of authentication.

No further action is required to leverage Standard Authentication to connect.

Authenticating with pg_hba.conf Auth Schemes

There are additional methods of authentication available which must be enabled in the pg_hba.conf file on the AlloyDB server.

Find instructions about authentication setup on the AlloyDB Server here.

Authenticating with MD5 Authentication

This authentication method must be enabled by setting the auth-method in the pg_hba.conf file to md5.

Authenticating with SASL Authentication

This authentication method must be enabled by setting the auth-method in the pg_hba.conf file to scram-sha-256.

Authenticating with Kerberos

The authentication with Kerberos is initiated by AlloyDB Server when the ∏ is trying to connect to it. You should set up Kerberos on the AlloyDB Server to activate this authentication method. Once you have Kerberos authentication set up on the AlloyDB Server, see the Kerberos section of the help documentation for details on how to authenticate with Kerberos.

Batch Operations (INSERT/UPDATE/DELETE) in Apache NiFi

The sample flow presented below is based on the following NiFi Processors:

  • ListFile - Retrieves a listing of files from the local filesystem and creates a FlowFile for each retrieved file.
  • FetchFile - Reads the content of the FlowFile received from the ListFile processor.
  • PutDatabaseRecord - Uses a specified RecordReader to input records from a flow file coming from the FetchFile processor. These records are translated to SQL statements and executed as a single transaction.
  • LogAttribute - Emits attributes of the FlowFile at the specified log level.

This is what our finished product looks like:

Disclaimers

1. The column names of the CSV file must match the column names of the data source table records to be inserted/updated/deleted.

2. Apache NiFi versions earlier than 1.9.0 do not support the Maximum Batch Size property in the PutDatabaseRecord processor.

Configurations

In order to perform batch INSERT, UPDATE or DELETE, the NiFi Processors should be configured similar to the following:

  1. Configure the ListFile processor: Set the Input Directory property to the local folder path from where to pull the CSV files. Set the File Filter property to a regular expression to pick up only the files whose names match the expression. i.e., if the CSV file's full path is C:\Users\Public\Documents\InsertNiFi.csv, the properties should be configured like in the following image:
  2. Configure the FetchFile processor
  3. Leave the FetchFile processor's property configurations to their default values:
  4. Configure the PutDatabaseRecord processor
    • INSERT operation

      Configure the PutDatabaseRecord processor similar to the following in order to perform Batch INSERT Operations:

      • Set the Record Reader property to a CSV Reader Controller Service. Configure the CSV Reader Controller Service to match the format of your CSV file.
      • Set the Statement Type property to INSERT.
      • Set the Database Connection Pooling Service to the DBCPConnection Pool that holds the driver configuration. Please note that the driver should be configured to use Bulk API.

        Property Value
        Database Connection URL jdbc:alloydb:User=alloydb;Password=admin;Database=alloydb;Server=127.0.0.1;Port=5432
        Database Driver Class Name cdata.jdbc.alloydb.AlloyDBDriver

      • Set the Catalog Name property to the name of the catalog that your table is part of.
      • Set the Schema Name property to the name of the schema that your table is part of.
      • Set the Table Name property to the name of the table that you want to INSERT into.
      • Set the Maximum Batch Size property to the maximum number of records that you want to be included in a single batch.

      UPDATE Operation

      Configure the PutDatabaseRecord processor similar to the following in order to perform Batch UPDATE Operations:

      • Set the Record Reader property to a CSV Reader Controller Service. Configure the CSV Reader Controller Service to match the format of your CSV file.
      • Set the Statement Type property to UPDATE.
      • Set the Database Connection Pooling Service to the DBCPConnection Pool that holds the driver configuration. Please note that the driver should be configured to use Bulk API. Use the same Database Connection URL format as seen above.
      • Set the Catalog Name property to the name of the catalog that your table is part of.
      • Set the Schema Name property to the name of the schema that your table is part of.
      • Set the Table Name property to the name of the table that you want to UPDATE.
      • Set the Update Keys property to the name of the columns that are required for an UPDATE.
      • Set the Maximum Batch Size property to the maximum number of records that you want to be included in a single batch.

      DELETE Operation

      Configure the PutDatabaseRecord processor similar to the following in order to perform Batch DELETE Operations:

      • Set the Record Reader property to a CSV Reader Controller Service. Configure the CSV Reader Controller Service to match the format of your CSV file.
      • Set the Statement Type property to DELETE.
      • Set the Database Connection Pooling Service to the DBCPConnection Pool that holds the driver configuration. Please note that the driver should be configured to use Bulk API. Use the same Database Connection URL format as seen above.
      • Set the Catalog Name property to the name of the catalog that your table is part of.
      • Set the Schema Name property to the name of the schema that your table is part of.
      • Set the Table Name property to the name of the table that you want to UPDATE.
      • In comparison to INSERT and UPDATE statement types, the DELETE operation does not expose a Maximum Batch Size property. However, the operations are still processed in batches. If not changed, the maximum number of records per batch is 2000, adhering to the default value. In order to change the value of the Maximum Batch Size used for DELETE operations, change the statement type to INSERT or UPDATE, then change the value of the Maximum Batch Size property, and click Apply Changes. Finally, reopen the processor's configuration, change the Statement Type back to DELETE, and click Apply Changes.

  5. Configure the LogAttribute processor

    Finally, configure the LogAttribute processor by specifying the Attributes that you would like to log or ignore, as well as the log level, based on your use case.

Free Trial & More Information

Download a free, 30-day trial of the CData JDBC Driver for AlloyDB and start working with your live AlloyDB data in Apache NiFi. Reach out to our Support Team if you have any questions.