全文検索のElasticsearch のETL/ELT モジュールのLogstash とJDBC Driver を使い、Salesforce Einstein データを簡単にロードする方法。
Elasticsearch は、人気の分散型の全文検索エンジンです。データを一元的に格納することで、超高速検索や、関連性の細かな調整、パワフルな分析が大規模に、手軽に実行可能になります。Elasticsearch にはデータのローディングを行うパイプラインツール「Logstash」があります。CData Drivers を利用することができるので、CData JDBC Drivers の対応するあらゆるデータソースを簡単にElasticsearch に取り込んで検索・分析を行うことができます。
この記事では、CData Drivers for Salesforce Einstein を使って、Salesforce Einstein のデータをLogstash 経由でElasticsearch にロードする手順を説明します。

Elasticsearch Logstash でCData JDBC Driver for Salesforce Einstein を使用
- CData JDBC Driver for Salesforce Einstein をLogstash が稼働するマシンにインストールします。
-
以下のパスにJDBC Driver がインストールされます。後ほどこのパスを使います。この.jar ファイルと製品版の場合は.lic ファイルをLogstash に配置して使います。
C:\Program Files\CData\CData JDBC Driver for Salesforce Einstein 2019J\lib\cdata.jdbc.sfeinsteinanalytics.jar
- 次に、Logstash とCData JDBC Driver をつなぐ、JDBC Input Plugin をインストールします。JDBC Plugin は最新のLogstash だとデフォルトでついてきますが、バージョンによっては追加する必要があります。
https://www.elastic.co/guide/en/logstash/5.4/plugins-inputs-jdbc.html - CData JDBC Driver の.jar ファイルと.lic ファイルを、Logstashの「/logstash-core/lib/jars/」に移動します。
Logstash でElasticsearch にSalesforce Einstein データを送る
それでは、LogstashでElasticsearch にSalesforce Einstein データ転送を行うための設定ファイルを作成していきます。
- Logstash のデータ処理定義であるlogstash.conf ファイルにSalesforce Einstein データを取得する処理書きます。Input はJDBC、Output はElasticsearch にします。データローディングジョブの起動間隔は30秒に設定しています。
- CData JDBC Driver の.jar をjdbc driver ライブラリにして、クラス名を設定、Salesforce Einstein への接続プロパティをJDBC URL の形でせっていします。JDBC URL ではほかにも詳細な設定を行うことができるので、細かくは製品ドキュメントをご覧ください。
Salesforce Einstein Analytics はOAuth 2 認証標準を利用しています。Salesforce Einstein Analytics にアプリケーションを登録し、OAuthClientId およびOAuthClientSecret を取得する必要があります。
詳しくは、ヘルプドキュメントの「Salesforce Einstein Analytics への接続」を参照してください。
input {
jdbc {
jdbc_driver_library => "../logstash-core/lib/jars/cdata.jdbc.sfeinsteinanalytics.jar"
jdbc_driver_class => "Java::cdata.jdbc.sfeinsteinanalytics.SFEinsteinAnalyticsDriver"
jdbc_connection_string => "jdbc:sfeinsteinanalytics:OAuthClientId=MyConsumerKey;OAuthClientSecret=MyConsumerSecret;CallbackURL=http://localhost:portNumber;InitiateOAuth=REFRESH"
jdbc_user => ""
jdbc_password => ""
schedule => "*/30 * * * * *"
statement => "SELECT Name, CloseDate FROM Dataset_Opportunity"
}
}
output {
Elasticsearch {
index => "sfeinsteinanalytics_Dataset_Opportunity"
document_id => "xxxx"
}
}
Logstash でSalesforce Einstein のローディングを実行
それでは作成した「logstash.conf」ファイルを元にLogstash を実行してみます。
> logstash-7.8.0\bin\logstash -f logstash.conf
成功した旨のログが出ます。これでSalesforce Einstein データがElasticsearch にロードされました。
例えばKibana で実際にElasticsearch に転送されたデータを見てみます。
GET sfeinsteinanalytics_Dataset_Opportunity/_search
{
"query": {
"match_all": {}
}
}

データがElasticsearch に格納されていることが確認できました。

CData JDBC Driver for Salesforce Einstein をLogstash で使うことで、Salesforce Einstein コネクタとして機能し、簡単にデータをElasticsearch にロードすることができました。ぜひ、30日の無償評価版 をお試しください。