Amazon S3 でホストされているCData JDBC ドライバーを使用してAWS Glue ジョブからElasticsearch にデータ連携。
AWS Glue はAmazon のETL サービスであり、これを使用すると、簡単にデータプレパレーションを行い、ストレージおよび分析用に読み込むことができます。AWS Glue と一緒にPySpark モジュールを使用すると、JDBC 接続を経由でデータを処理するジョブを作成し、そのデータをAWS データストアに直接読み込むことができます。ここでは、CData JDBC Driver for Elasticsearch をAmazon S3 バケットにアップロードし、Elasticsearch からデータを抽出してCSV ファイルとしてS3 に保存するためのAWS Glue ジョブを作成して実行する方法について説明します。
※製品について詳しい情報をご希望の方は以下からお進みください。
- Elasticsearch にほかのBI、ETL、開発ツールから接続したい:Elasticsearch データ連携ガイドおよびチュートリアルのリストへ
- Elasticsearch Drivers について詳細を知りたい:ドライバー詳細情報ページへ
- ほかのデータソースに連携したい:CData Drivers 一覧へ
- ドライバーの30日の無償トライアル版を使いたい:トライアル版ダウンロードページへ
- 製品の利用やライセンスについて相談したい:sales@cdata.co.jp までメールにてご相談ください。
CData JDBC driver for Elasticsearch をAmazon S3 バケットにアップロード
CData JDBC Driver for Elasticsearch をAWS Glue から使用するには、ドライバーの.jar ファイル(および必要なライセンスファイル)をAmazon S3 のバケットに配置する必要があります。
- Amazon S3 コンソールを開きます。
- バケットを選択、もしくは作成します。
- [アップロード]をクリックします。
- JDBC Driver の.jar ファイル(cdata.jdbc.elasticsearch.jar) をインストールディレクトリのlib フォルダから選択してアップロードします。
Amazon Glue Job を設定
- [分析]->[AWS Glue]をクリックします。
- AWS Glue コンソールで、[ETL]->[ジョブ]をクリックします。
- [ジョブの追加]をクリックして新しいGlue ジョブを作成します。
- ジョブのプロパティを設定します:
- 名前: ElasticsearchGlueJob など任意のジョブ名
- IAM ロール: AWSGlueServiceRole もしくは AmazonS3FullAccessSelect の権限があるIAM ロールを設定(JDBC Driver がAmazon S3 バケットにあるため)。
- Type: [Spark]を選択。
- Glue version: ドロップダウンからバージョンを選択。
- このジョブ実行: [ユーザーが作成する新しいスクリプト]を選択。
スクリプトプロパティの設定: - スクリプトファイル名: GlueElasticsearchJDBC などのスクリプトファイル名。
- スクリプトが保存されているS3 パス: S3 バケットを入力もしくは選択。
- 一時ディレクトリ: S3 バケットを入力もしくは選択
- ETL 言語: [Python]を選択
- セキュリティ設定、スクリプトライブラリおよびジョブパラメータを展開。依存JARS パスは、JDBC の.jar ファイルをアップロードしたS3 バケットに設定。.jar ファイル名 s3://mybucket/cdata.jdbc.elasticsearch.jar も含めます。
- [次へ]をクリックすると、ほかのAWS エンドポイントへの接続オプション追加ができます。Redshift、MySQL などに接続する際にはここで接続を作成できます。
- [ジョブの保存とスクリプトの編集]をクリックします。
- 開いたエディタで、Python スクリプトを記述します。サンプルは以下です。
サンプルGlue スクリプト
CData JDBC driver でElasticsearch に接続するには、JDBC URL を作成します。さらにライセンスとしてJDBC URL にRTK プロパティを設定する必要があります。RTK は通常のライセンスと異なりますので、CData まで直接ご連絡をください。
接続するには、Server およびPort 接続プロパティを設定します。 認証には、User とPassword プロパティ、PKI (public key infrastructure)、またはその両方を設定します。 PKI を使用するには、SSLClientCert、SSLClientCertType、SSLClientCertSubject、およびSSLClientCertPassword プロパティを設定します。
本製品は、認証とTLS/SSL 暗号化にX-Pack Security を使用しています。TLS/SSL で接続するには、Server 値に'https://' を接頭します。Note: PKI を 使用するためには、TLS/SSL およびクライアント認証はX-Pack 上で有効化されていなければなりません。
接続されると、X-Pack では、設定したリルムをベースにユーザー認証およびロールの許可が実施されます。
ビルトイン接続文字列デザイナー
JDBC URL の作成をサポートするビルトインの接続文字列デザイナーがあります。ドライバーの.jar ファイルをダブルクリックするか、コマンドラインで.jar ファイルを実行するとデザイナーが開きます。
java -jar cdata.jdbc.elasticsearch.jar
必要項目を入力すると、デザインs-下部に接続文字列が生成されますのでクリップボードにコピーして使います。

CData JDBC driver をPySpark で使用して、AWS Glue モジュールでElasticsearch データを取得して、S3 にCSV 形式で保存するシンプルなスクリプト例は以下です。.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sparkContext = SparkContext()
glueContext = GlueContext(sparkContext)
sparkSession = glueContext.spark_session
##Use the CData JDBC driver to read Elasticsearch data from the Orders table into a DataFrame
##Note the populated JDBC URL and driver class name
source_df = sparkSession.read.format("jdbc").option("url","jdbc:elasticsearch:RTK=5246...;Server=127.0.0.1;Port=9200;User=admin;Password=123456;").option("dbtable","Orders").option("driver","cdata.jdbc.elasticsearch.ElasticsearchDriver").load()
glueJob = Job(glueContext)
glueJob.init(args['JOB_NAME'], args)
##Convert DataFrames to AWS Glue's DynamicFrames Object
dynamic_dframe = DynamicFrame.fromDF(source_df, glueContext, "dynamic_df")
##Write the DynamicFrame as a file in CSV format to a folder in an S3 bucket.
##It is possible to write to any Amazon data store (SQL Server, Redshift, etc) by using any previously defined connections.
retDatasink4 = glueContext.write_dynamic_frame.from_options(frame = dynamic_dframe, connection_type = "s3", connection_options = {"path": "s3://mybucket/outfiles"}, format = "csv", transformation_ctx = "datasink4")
glueJob.commit()
Glueジョブを実行する
スクリプト記述後、Glue ジョブを実行します。実行した取得/ロードのジョブが完了するとAWS Glue コンソールのジョブページでステータスが確認できます。成功するとS3 バケットにElasticsearch データのCSV ファイルが生成されています。
このようにCData JDBC Driver for Elasticsearch をAWS Glue で使用することで、Elasticsearch データをAWS Glue で自在に扱うことができます。Glue の外部データへの接続性を拡張するJDBC Driver を是非お試しください。