製品をチェック

Apache Kafka Driver の30日間無償トライアルをダウンロード

 30日間の無償トライアルへ

製品の詳細

Apache Kafka アイコン Apache Kafka JDBC Driver 相談したい

Apache Kafka データに連携するJava アプリケーションを素早く、簡単に開発できる便利なドライバー。

ETL ツールのEmbulk を使ってKafka データをDB にロードする方法

OSS のETL ツールEmbulk のJDBC Plugin を使って、Kafka データを簡単にDB に同期する方法をご紹介します。

杉本和也
リードエンジニア

最終更新日:2023-10-13
kafka ロゴ

CData

jdbc ロゴ画像
Embulk ロゴ

こんにちは!リードエンジニアの杉本です。

Embulk は、大量のデータをDB、クラウドデータストア、DWH にロードできるオープンソースETL ツールです。近頃のトレンドでは1社で複数のオンプレアプリやSaaS を使っており、データ分析にはETL ツールを使ってデータを丸ごとDB/DWH にロードしてから、分析や可視化をすることが主流になっています。Embulk にはいろいろなプラグインがあり、多様なInput とOutput 処理をサポートしています。この記事では、Embulk のJDBC Input Plugin と CData Driver for ApacheKafka を使って、Kafka のデータを簡単にDB にロードする方法をご紹介します。この例ではロード先のDB にはMySQL を使います。

Embulk でKafka のデータをロード

  • CData JDBC Driver for ApacheKafka をEmbulk と同じマシンにインストールします。CData JDBC ドライバは30日間の無償トライアルが利用できるので、サイドバーからお気軽にダウンロードしてご利用ください。
  • 以下のパスにJDBC Driver がインストールされます。後ほどこのパスを使います。
    C:\Program Files\CData\CData JDBC Driver for ApacheKafka 2019J\lib\cdata.jdbc.apachekafka.jar
  • 次に、EmbulkとCData JDBC Driverをつなぎこむための、JDBC Input Plugin をインストールします。以下のリンクからダウンロードできます。

    https://github.com/embulk/embulk-input-jdbc/tree/master/embulk-input-jdbc

    インストール用のコマンドはこちら:

    embulk gem install embulk-input-jdbc
  • 今回はロード先DB としてMySQL を使います。ほかにもSQL Server、PostgreSQL、Google BigQuery などを使うことも可能です。ロードに必要な以下のプラグインをインストールしましょう。

    https://github.com/embulk/embulk-output-jdbc/tree/master/embulk-output-mysql

    コマンドはこちら。

    embulk gem install embulk-output-mysql
  • config ファイルを作成し、Kafka -> MySQL のジョブを作成します。apachekafka-mysql.yml というファイル名で、以下の内容で作成しました。
  • in: type: jdbc driver_path: C:\Program Files\CData\CData JDBC Driver for ApacheKafka 2024J\lib\cdata.jdbc.apachekafka.jar driver_class: cdata.jdbc.apachekafka.ApacheKafkaDriver url: jdbc:apachekafka:User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic; table: "SampleTable_1" out: type: mysql host: localhost database: DatabaseName user: UserId password: UserPassword table: "SampleTable_1" mode: insert
  • JDBC URL の内容は、ご利用の環境や資格情報を入力してください。

    BootstrapServers およびTopic プロパティを設定して、Apache Kafka サーバーのアドレスと、対話するトピックを指定します。

    認可メカニズム

    • SASL PlainUser およびPassword プロパティを指定する必要があります。AuthScheme は'Plain' に設定します。
    • SASL SSLUser およびPassword プロパティを指定する必要があります。AuthScheme は'Scram' に、UseSSL はtrue に設定します。
    • SSLSSLCert およびSSLCertPassword プロパティを指定する必要があります。UseSSL はtrue に設定します。
    • KerberosUser およびPassword プロパティを指定する必要があります。AuthScheme は'Kerberos' に設定します。

    サーバー証明書を信頼する必要がある場合があります。そのような場合は、必要に応じてTrustStorePath およびTrustStorePassword を指定してください。

  • テーブル名は取得したいテーブル名を入れます。
  • これで準備完了です。あとは「embulk run」で実行するだけです。
  • embulk run apachekafka-mysql.yml
  • 実行後MySQL Workbenchからテーブルを確認してみると、データが取得できているはずです。

クエリ条件でフィルタリングしたデータのロード

ちなみに、上記の例ではテーブル名を直接指定しましたが、以下のようにSQL クエリを書いてもいいです。 WHERE 句で作成日や修正日を指定すれば、最新のデータだけを対象にすることも可能です。

in: type: jdbc driver_path: C:\Program Files\CData\CData JDBC Driver for ApacheKafka 2019J\lib\cdata.jdbc.apachekafka.jar driver_class: cdata.jdbc.apachekafka.ApacheKafkaDriver url: jdbc:apachekafka:User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic; query: "SELECT Id, Column1 FROM SampleTable_1 WHERE [RecordId] = 1" out: type: mysql host: localhost database: DatabaseName user: UserId password: UserPassword table: "SampleTable_1" mode: insert

おわりに

CData JDBC Driver for ApacheKafka をEmbulk で使うことで、Kafka と連携して簡単にデータを取得できます。ぜひ、30日の無償評価版をお試しください。

関連コンテンツ

トライアル・お問い合わせ

30日間無償トライアルで、CData のリアルタイムデータ連携をフルにお試しいただけます。記事や製品についてのご質問があればお気軽にお問い合わせください。