製品をチェック

Apache Spark Driver の30日間無償トライアルをダウンロード

 30日間の無償トライアルへ

製品の詳細

Apache Spark アイコン Apache Spark JDBC Driver 相談したい

Apache Spark 連携のパワフルなJava アプリケーションを素早く作成して配布。

Elasticsearch へLogstash 経由でSpark データをロードする方法

全文検索サービスElasticsearch のETL モジュール「Logstash」とCData JDBC ドライバを使って、Spark データを簡単にロードする方法をご紹介。

加藤龍彦
ウェブデベロッパー

最終更新日:2022-07-22
spark ロゴ

CData

jdbc ロゴ画像
Elasticsearch Logstash ロゴ

こんにちは!ウェブ担当の加藤です。マーケ関連のデータ分析や整備もやっています。

Elasticsearch は、人気の分散型全文検索エンジンです。データを一元的に格納することで、超高速検索や、関連性の細かな調整、パワフルな分析が大規模に、手軽に実行可能になります。Elasticsearch にはデータのローディングを行うパイプラインツール「Logstash」があります。CData Drivers を利用することができるので、30日の無償評価版をダウンロードしてあらゆるデータソースを簡単にElasticsearch に取り込んで検索・分析を行うことができます。

この記事では、CData Driver for SparkSQL を使って、Spark のデータをLogstash 経由でElasticsearch にロードする手順を説明します。

Elasticsearch Logstash でCData JDBC Driver for SparkSQL を使用

  • CData JDBC Driver for SparkSQL をLogstash が稼働するマシンにインストールします。
  • 以下のパスにJDBC Driver がインストールされます(2022J の部分はご利用される製品バージョンによって異なります)。後ほどこのパスを使います。この.jar ファイル(製品版の場合は.lic ファイルも)をLogstash に配置します。
    C:\Program Files\CData\CData JDBC Driver for SparkSQL 2022J\lib\cdata.jdbc.sparksql.jar
  • 次に、Logstash とCData JDBC ドライバをつなぐ、JDBC Input Plugin をインストールします。JDBC Plugin は最新のLogstash だとデフォルトでついてきますが、バージョンによっては追加する必要があります。
    https://www.elastic.co/guide/en/logstash/5.4/plugins-inputs-jdbc.html
  • CData JDBC ドライバの.jar ファイルと.lic ファイルを、Logstashの「/logstash-core/lib/jars/」に移動します。

Logstash でElasticsearch にSpark データを送る

それでは、Logstash でElasticsearch にSpark データの転送を行うための設定ファイルを作成していきます。

  • Logstash のデータ処理定義であるlogstash.conf ファイルにSpark データを取得する処理を書きます。Input はJDBC、Output はElasticsearch にします。データローディングジョブの起動間隔は30秒に設定しています。
  • CData JDBC ドライバの.jar をjdbc driver ライブラリにして、クラス名を設定、Spark への接続プロパティをJDBC URL の形でせっていします。JDBC URL ではほかにも詳細な設定を行うことができるので、細かくは製品ドキュメントをご覧ください。
  • SparkSQL への接続

    SparkSQL への接続を確立するには以下を指定します。

    • Server:SparkSQL をホストするサーバーのホスト名またはIP アドレスに設定。
    • Port:SparkSQL インスタンスへの接続用のポートに設定。
    • TransportMode:SparkSQL サーバーとの通信に使用するトランスポートモード。有効な入力値は、BINARY およびHTTP です。デフォルトではBINARY が選択されます。
    • AuthScheme:使用される認証スキーム。有効な入力値はPLAIN、LDAP、NOSASL、およびKERBEROS です。デフォルトではPLAIN が選択されます。

    Databricks への接続

    Databricks クラスターに接続するには、以下の説明に従ってプロパティを設定します。Note:必要な値は、「クラスター」に移動して目的のクラスターを選択し、 「Advanced Options」の下にある「JDBC/ODBC」タブを選択することで、Databricks インスタンスで見つけることができます。

    • Server:Databricks クラスターのサーバーのホスト名に設定。
    • Port:443
    • TransportMode:HTTP
    • HTTPPath:Databricks クラスターのHTTP パスに設定。
    • UseSSL:True
    • AuthScheme:PLAIN
    • User:'token' に設定。
    • Password:パーソナルアクセストークンに設定(値は、Databricks インスタンスの「ユーザー設定」ページに移動して「アクセストークン」タブを選択することで取得できます)。

                input {
                    jdbc {
                        jdbc_driver_library => "../logstash-core/lib/jars/cdata.jdbc.sparksql.jar"
                        jdbc_driver_class => "Java::cdata.jdbc.sparksql.SparkSQLDriver"
                        jdbc_connection_string => "jdbc:sparksql:Server=127.0.0.1;"
                        jdbc_user => ""
                        jdbc_password => ""
                        schedule => "*/30 * * * * *"
                        statement => "SELECT City, Balance FROM Customers"
                    }
                }
                    
                    
                output {
                    Elasticsearch {
                        index => "sparksql_Customers"
                        document_id => "xxxx"
                    }
                }
            

Logstash でSpark のローディングを実行

それでは作成した「logstash.conf」ファイルを元にLogstash を実行してみます。

> logstash-7.8.0\bin\logstash -f logstash.conf

成功した旨のログが出ます。これでSpark データがElasticsearch にロードされました。

例えばKibana で実際にElasticsearch に転送されたデータを見てみます。

        GET sparksql_Customers/_search
        {
            "query": {
                "match_all": {}
            }
        }
    
Elasticsearch にロードされたSpark データをクエリ

データがElasticsearch に格納されていることが確認できました。

Elasticsearch にロードされたSpark データを確認

CData JDBC Driver for SparkSQL をLogstash で使うことで、Spark コネクタとして機能し、簡単にデータをElasticsearch にロードすることができました。ぜひ、30日の無償評価版をお試しください。

関連コンテンツ

トライアル・お問い合わせ

30日間無償トライアルで、CData のリアルタイムデータ連携をフルにお試しいただけます。記事や製品についてのご質問があればお気軽にお問い合わせください。