本記事では CData サポート担当からこんなことを聞かれたらどこを確認すべきか?という観点で、よく頂くお問合せ内容をご紹介します。
記事はこちら →Pythonエコシステムには、多くのモジュールがあり、システム構築を素早く効率的に行うことができます。CData Python Connector for Elasticsearch は、pandas、Matplotlib モジュール、SQLAlchemy ツールキットから使用することで Elasticsearch にデータ連携するPython アプリケーションを構築し、Elasticsearch data をビジュアライズできます。 本記事では、SQLAlchemy でElasticsearch に連携して、データを取得、、更新、挿入、削除 する方法を説明します。
CData Python Connector は、ビルトインされた効率的なデータプロセスにより、リアルタイムElasticsearch data データにPython からアクセスし、高いパフォーマンスと接続性を発揮します。Elasticsearch に複雑なクエリを投げる際に、ドライバーはフィルタリング、集計などがサポートされている場合、SQL 処理を直接Elasticsearch 側に行わせ、サポートされていないSQL 処理については、組み込まれたSQL エンジンによりクライアント側で処理を行います(特にJOIN やSQL 関数など)。
Elasticsearch data への連携は、RDB ソースへのアクセスと同感覚で行うことができます。必要な接続プロパティを使って接続文字列を作成します。本記事では、接続文字列をcreate_engine 関数のパラメータとして送ります。
接続するには、Server およびPort 接続プロパティを設定します。 認証には、User とPassword プロパティ、PKI (public key infrastructure)、またはその両方を設定します。 PKI を使用するには、SSLClientCert、SSLClientCertType、SSLClientCertSubject、およびSSLClientCertPassword プロパティを設定します。
本製品は、認証とTLS/SSL 暗号化にX-Pack Security を使用しています。TLS/SSL で接続するには、Server 値に'https://' を接頭します。Note: PKI を 使用するためには、TLS/SSL およびクライアント認証はX-Pack 上で有効化されていなければなりません。
接続されると、X-Pack では、設定したリルムをベースにユーザー認証およびロールの許可が実施されます。
以下の手順でSQLAlchemy をインストールして、Python オブジェクトからElasticsearch に接続します。
pip でSQLAlchemy ツールキットをインストールします:
pip install sqlalchemy
モジュールのインポートを忘れずに行います:
import sqlalchemy
次は、接続文字列で接続を確立します。create_engine 関数を使って、Elasticsearch data に連携するEngne を作成します。
engine = create_engine("elasticsearch///?Server=127.0.0.1&Port=9200&User=admin&Password=123456")
接続を確立したら、OR マッパーでモデル化するテーブルのマッピングクラスを宣言します。本記事では、Orders テーブルを使います。sqlalchemy.ext.declarative.declarative_base 関数を使って、新しいクラスにフィールド(カラム)を定義します。
base = declarative_base() class Orders(base): __tablename__ = "Orders" OrderName = Column(String,primary_key=True) Freight = Column(String) ...
マッピングクラスができたので、セッションオブジェクトを使ってデータソースをクエリすることができます。セッションにEngine をバインドして、セッションのquery メソッドにマッピングクラスを提供します。
engine = create_engine("elasticsearch///?Server=127.0.0.1&Port=9200&User=admin&Password=123456") factory = sessionmaker(bind=engine) session = factory() for instance in session.query(Orders).filter_by(ShipCity="New York"): print("OrderName: ", instance.OrderName) print("Freight: ", instance.Freight) print("---------")
ほかの方法としては、execute メソッドを適切なテーブルオブジェクトに使うことが可能です。以下のコードはアクティブなsession に対して有効です。
Orders_table = Orders.metadata.tables["Orders"] for instance in session.execute(Orders_table.select().where(Orders_table.c.ShipCity == "New York")): print("OrderName: ", instance.OrderName) print("Freight: ", instance.Freight) print("---------")
より複雑なクエリとして、JOIN、集計、Limit などが利用可能です。詳細はヘルプドキュメントをご覧ください。
Elasticsearch data への挿入には、マップされたクラスのインスタンスを定義し、アクティブな session に追加します。commit 関数を呼び出して、Elasticsearch にすべての追加インスタンスを送ります。
new_rec = Orders(OrderName="placeholder", ShipCity="New York") session.add(new_rec) session.commit()
Elasticsearch data の更新には、更新するレコードをフィルタクエリとともにフェッチします。そして、フィールドの値を変更し、セッションでcommit 関数を呼んで、Elasticsearch にレコードを追加します。
updated_rec = session.query(Orders).filter_by(SOME_ID_COLUMN="SOME_ID_VALUE").first() updated_rec.ShipCity = "New York" session.commit()
Elasticsearch data の削除には、フィルタクエリと一緒に対象となるレコードをフェッチします。そして、アクティブsession でレコードを削除し、セッションでcommit 関数を呼び出して、該当するレコードの削除を実行します。
deleted_rec = session.query(Orders).filter_by(SOME_ID_COLUMN="SOME_ID_VALUE").first() session.delete(deleted_rec) session.commit()
Elasticsearch Python Connector の30日の無償トライアル をぜひダウンロードして、Elasticsearch data への接続をPython アプリやスクリプトから簡単に作成しましょう。