OSS のETL/ELT ツールEmbulk のJDBC Plugin を使って、Elasticsearch データを簡単にDM に同期する方法。
Embulk は、大量のデータをDB、クラウドデータストア、DWH にロードできるオープンソースETL ツールです。近頃のトレンドでは、1社で複数のオンプレアプリやSaaS を使っており、データ分析にはETL/ELT ツールを使ってデータを丸ごとDB/DWH にロードしてから、分析やビジュアライズをすることが主流になっています。Embulk には、いろいろなプラグインがあり、多様なInput とOutput 処理をサポートしています。この記事では、Embulk のJDBC Input Plugin と CData Drivers for Elasticsearch を使って、Elasticsearch のデータを簡単にDB にロードします。この例ではロード先のDB にはMySQL を使います。

※製品について詳しい情報をご希望の方は以下からお進みください。
- Elasticsearch にほかのBI、ETL、開発ツールから接続したい:Elasticsearch データ連携ガイドおよびチュートリアルのリストへ
- Elasticsearch Drivers について詳細を知りたい:ドライバー詳細情報ページへ
- ほかのデータソースに連携したい:CData Drivers 一覧へ
- ドライバーの30日の無償トライアル版を使いたい:トライアル版ダウンロードページへ
- 製品の利用やライセンスについて相談したい:sales@cdata.co.jp までメールにてご相談ください。
Embulk でCData JDBC Driver for Elasticsearch データをロード
- CData JDBC Driver for Elasticsearch をEmbulk と同じマシンにインストールします。
-
以下のパスにJDBC Driver がインストールされます。後ほどこのパスを使います。
C:\Program Files\CData\CData JDBC Driver for Elasticsearch 2019J\lib\cdata.jdbc.elasticsearch.jar
- 次に、EmbulkとCData JDBC Driverをつなぎこむための、JDBC Input Plugin をインストールします。
https://github.com/embulk/embulk-input-jdbc/tree/master/embulk-input-jdbc - 今回はロード先DB としてMySQL を使います。ほかにもSQL Server、PostgreSQL、Google BigQuery などを使うことも可能です。
https://github.com/embulk/embulk-output-jdbc/tree/master/embulk-output-mysqlembulk gem install embulk-output-mysql
- config ファイルを作成し、Elasticsearch -> MySQL のジョブを作成します。elasticsearch-mysql.yml というファイル名で作成しました。
embulk gem install embulk-input-jdbc
in:
type: jdbc
driver_path: C:\Program Files\CData\CData JDBC Driver for Elasticsearch 2019J\lib\cdata.jdbc.elasticsearch.jar
driver_class: cdata.jdbc.elasticsearch.ElasticsearchDriver
url: jdbc:elasticsearch:Server=127.0.0.1;Port=9200;User=admin;Password=123456;
table: "Orders"
out:
type: mysql
host: localhost
database: DatabaseName
user: UserId
password: UserPassword
table: "Orders"
mode: insert
接続するには、Server およびPort 接続プロパティを設定します。 認証には、User とPassword プロパティ、PKI (public key infrastructure)、またはその両方を設定します。 PKI を使用するには、SSLClientCert、SSLClientCertType、SSLClientCertSubject、およびSSLClientCertPassword プロパティを設定します。
本製品は、認証とTLS/SSL 暗号化にX-Pack Security を使用しています。TLS/SSL で接続するには、Server 値に'https://' を接頭します。Note: PKI を 使用するためには、TLS/SSL およびクライアント認証はX-Pack 上で有効化されていなければなりません。
接続されると、X-Pack では、設定したリルムをベースにユーザー認証およびロールの許可が実施されます。
embulk run elasticsearch-mysql.yml
クエリ条件でフィルタリングしたデータのロード
ちなみに、上記の例ではテーブル名を直接指定しましたが、以下のようにSQL Queryを書いてもいいです。 Where句で作成日や修正日を指定すれば、最新のデータだけを対象にすることも可能です。
in:
type: jdbc
driver_path: C:\Program Files\CData\CData JDBC Driver for Elasticsearch 2019J\lib\cdata.jdbc.elasticsearch.jar
driver_class: cdata.jdbc.elasticsearch.ElasticsearchDriver
url: jdbc:elasticsearch:Server=127.0.0.1;Port=9200;User=admin;Password=123456;
query: "SELECT OrderName, Freight FROM Orders WHERE [RecordId] = 1"
out:
type: mysql
host: localhost
database: DatabaseName
user: UserId
password: UserPassword
table: "Orders"
mode: insert
CData JDBC Driver for Elasticsearch をEmbulk で使うことで、Elasticsearch コネクタとして機能し、簡単にデータを取得して同期することができました。ぜひ、30日の無償評価版 をお試しください。