ETL/ELT のEmbulk を使ってKafka データをDB にロードする方法

OSS のETL/ELT ツールEmbulk のJDBC Plugin を使って、Kafka データを簡単にDM に同期する方法。

杉本和也
リードエンジニア

最終更新日:2019-07-20
kafka ロゴ

CData

jdbc ロゴ画像
Embulk ロゴ

こんにちは!リードエンジニアの杉本です。

Embulk は、大量のデータをDB、クラウドデータストア、DWH にロードできるオープンソースETL ツールです。近頃のトレンドでは、1社で複数のオンプレアプリやSaaS を使っており、データ分析にはETL/ELT ツールを使ってデータを丸ごとDB/DWH にロードしてから、分析やビジュアライズをすることが主流になっています。Embulk には、いろいろなプラグインがあり、多様なInput とOutput 処理をサポートしています。この記事では、Embulk のJDBC Input Plugin と CData Driver for ApacheKafka を使って、Kafka のデータを簡単にDB にロードします。この例ではロード先のDB にはMySQL を使います。

Embulk でCData JDBC Driver for ApacheKafka データをロード

  • CData JDBC Driver for ApacheKafka をEmbulk と同じマシンにインストールします。
  • 以下のパスにJDBC Driver がインストールされます。後ほどこのパスを使います。
    C:\Program Files\CData\CData JDBC Driver for ApacheKafka 2019J\lib\cdata.jdbc.apachekafka.jar
  • 次に、EmbulkとCData JDBC Driverをつなぎこむための、JDBC Input Plugin をインストールします。
    https://github.com/embulk/embulk-input-jdbc/tree/master/embulk-input-jdbc
  • embulk gem install embulk-input-jdbc
  • 今回はロード先DB としてMySQL を使います。ほかにもSQL Server、PostgreSQL、Google BigQuery などを使うことも可能です。
    https://github.com/embulk/embulk-output-jdbc/tree/master/embulk-output-mysql embulk gem install embulk-output-mysql
  • config ファイルを作成し、Kafka -> MySQL のジョブを作成します。apachekafka-mysql.yml というファイル名で作成しました。
  •             
                    in:
                        type: jdbc
                        driver_path: C:\Program Files\CData\CData JDBC Driver for ApacheKafka 2019J\lib\cdata.jdbc.apachekafka.jar
                        driver_class: cdata.jdbc.apachekafka.ApacheKafkaDriver
                        url: jdbc:apachekafka:User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;
                        table: "SampleTable_1"
                    out: 
                        type: mysql
                        host: localhost
                        database: DatabaseName
                        user: UserId
                        password: UserPassword
                        table: "SampleTable_1"
                        mode: insert
                
            
  • ポイントはJDBC URLです。

    BootstrapServers およびTopic プロパティを設定して、Apache Kafka サーバーのアドレスと、対話するトピックを指定します。

    認可メカニズム

    • SASL PlainUser およびPassword プロパティを指定する必要があります。AuthScheme は'Plain' に設定します。
    • SASL SSLUser およびPassword プロパティを指定する必要があります。AuthScheme は'Scram' に、UseSSL はtrue に設定します。
    • SSLSSLCert およびSSLCertPassword プロパティを指定する必要があります。UseSSL はtrue に設定します。
    • KerberosUser およびPassword プロパティを指定する必要があります。AuthScheme は'Kerberos' に設定します。

    サーバー証明書を信頼する必要がある場合があります。そのような場合は、必要に応じてTrustStorePath およびTrustStorePassword を指定してください。

  • テーブル名は取得したいテーブル名を入れます。
  • これで準備完了です。あとは「embulk run」で実行するだけです。
  • embulk run apachekafka-mysql.yml
  • 実行後、MySQL Workbenchからテーブルを確認してみると、データが取得できているはずです。

クエリ条件でフィルタリングしたデータのロード

ちなみに、上記の例ではテーブル名を直接指定しましたが、以下のようにSQL Queryを書いてもいいです。 Where句で作成日や修正日を指定すれば、最新のデータだけを対象にすることも可能です。

        
            in:
            type: jdbc
            driver_path: C:\Program Files\CData\CData JDBC Driver for ApacheKafka 2019J\lib\cdata.jdbc.apachekafka.jar
            driver_class: cdata.jdbc.apachekafka.ApacheKafkaDriver
            url: jdbc:apachekafka:User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;
                query: "SELECT Id, Column1 FROM SampleTable_1 WHERE [RecordId] = 1"
            out: 
                type: mysql
                host: localhost
                database: DatabaseName
                user: UserId
                password: UserPassword
                table: "SampleTable_1"
                mode: insert
        
    

CData JDBC Driver for ApacheKafka をEmbulk で使うことで、Kafka コネクタとして機能し、簡単にデータを取得して同期することができました。ぜひ、30日の無償評価版をお試しください。

関連コンテンツ

トライアル・お問い合わせ

30日間無償トライアルで、CData のリアルタイムデータ連携をフルにお試しいただけます。記事や製品についてのご質問があればお気軽にお問い合わせください。