Load Kafka Data to a Database Using Embulk

Ready to get started?

Download for a free trial:

Download Now

Learn more:

Apache Kafka JDBC Driver

Rapidly create and deploy powerful Java applications that integrate with Apache Kafka.



Use CData JDBC Driver for Kafka with the open source ETL/ELT tool Embulk to load Kafka data to a database.

Embulk is an open source bulk data loader. When paired with the CData JDBC Driver for Kafka, Embulk easily loads data from Kafka to any supported destination. In this article, we explain how to use the CData JDBC Driver for Kafka in Embulk to load Kafka data to a MySQL dtabase.

With built-in optimized data processing, the CData JDBC Driver offers unmatched performance for interacting with live Kafka data. When you issue complex SQL queries to Kafka, the driver pushes supported SQL operations, like filters and aggregations, directly to Kafka and utilizes the embedded SQL engine to process unsupported operations client-side (often SQL functions and JOIN operations).

Configure a JDBC Connection to Kafka Data

Before creating a bulk load job in Embulk, note the installation location for the JAR file for the JDBC Driver (typically C:\Program Files\CData\CData JDBC Driver for Kafka\lib).

Embulk supports JDBC connectivity, so you can easily connect to Kafka and execute SQL queries. Before creating a bulk load job, create a JDBC URL for authenticating with Kafka.

Set BootstrapServers and the Topic properties to specify the address of your Apache Kafka server, as well as the topic you would like to interact with.

Authorization Mechanisms

  • SASL Plain: The User and Password properties should be specified. AuthScheme should be set to 'Plain'.
  • SASL SSL: The User and Password properties should be specified. AuthScheme should be set to 'Scram'. UseSSL should be set to true.
  • SSL: The SSLCert and SSLCertPassword properties should be specified. UseSSL should be set to true.
  • Kerberos: The User and Password properties should be specified. AuthScheme should be set to 'Kerberos'.

You may be required to trust the server certificate. In such cases, specify the TrustStorePath and the TrustStorePassword if necessary.

Built-in Connection String Designer

For assistance in constructing the JDBC URL, use the connection string designer built into the Kafka JDBC Driver. Either double-click the JAR file or execute the jar file from the command-line.

java -jar cdata.jdbc.apachekafka.jar

Fill in the connection properties and copy the connection string to the clipboard.

Below is a typical JDBC connection string for Kafka:

jdbc:apachekafka:User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;

Load Kafka Data in Embulk

After installing the CData JDBC Driver and creating a JDBC connection string, install the required Embulk plugins.

Install Embulk Input & Output Plugins

  1. Install the JDBC Input Plugin in Embulk.
    https://github.com/embulk/embulk-input-jdbc/tree/master/embulk-input-jdbc
  2. embulk gem install embulk-input-jdbc
  3. In this article, we use MySQL as the destination database. You can also choose SQL Server, PostgreSQL, or Google BigQuery as the destination using the output Plugins.
    https://github.com/embulk/embulk-output-jdbc/tree/master/embulk-output-mysql embulk gem install embulk-output-mysql

With the input and output plugins installed, we are ready to load Kafka data into MySQL using Embulk.

Create a Job to Load Kafka Data

Start by creating a config file in Embulk, using a name like apachekafka-mysql.yml.

  1. For the input plugin options, use the CData JDBC Driver for Kafka, including the path to the driver JAR file, the driver class (e.g. cdata.jdbc.apachekafka.ApacheKafkaDriver), and the JDBC URL from above
  2. For the output plugin options, use the values and credentials for the MySQL database

Sample Config File (apachekafka-mysql.yml)

in: type: jdbc driver_path: C:\Program Files\CData\CData JDBC Driver for Kafka 20xx\lib\cdata.jdbc.apachekafka.jar driver_class: cdata.jdbc.apachekafka.ApacheKafkaDriver url: jdbc:apachekafka:User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic; table: "SampleTable_1" out: type: mysql host: localhost database: DatabaseName user: UserId password: UserPassword table: "SampleTable_1" mode: insert

After creating the file, run the Embulk job.

embulk run apachekafka-mysql.yml

After running the the Embulk job, find the Salesforce data in the MySQL table.

Load Filtered Kafka Data

In addition to loading data directly from a table, you can use a custom SQL query to have more granular control of the data loaded. You can also perform increment loads by setting a last updated column in a SQL WHERE clause in the query field.

in: type: jdbc driver_path: C:\Program Files\CData\CData JDBC Driver for Kafka 20xx\lib\cdata.jdbc.apachekafka.jar driver_class: cdata.jdbc.apachekafka.ApacheKafkaDriver url: jdbc:apachekafka:User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic; query: "SELECT Id, Column1 FROM SampleTable_1 WHERE [RecordId] = 1" out: type: mysql host: localhost database: DatabaseName user: UserId password: UserPassword table: "SampleTable_1" mode: insert

More Information & Free Trial

By using CData JDBC Driver for Kafka as a connector, Embulk can integrate Kafka data into your data load jobs. And with drivers for more than 200+ other enterprise sources, you can integrate any enterprise SaaS, big data, or NoSQL source as well. Download a 30-day free trial and get started today.