ノーコードでクラウド上のデータとの連携を実現。
詳細はこちら →Phoenix Connector の30日間無償トライアルをダウンロード
30日間の無償トライアルへ製品の詳細
Phoenix Python Connector 相談したいApache Phoenix へのデータ連携用のPython Connector ライブラリ。 pandas、SQLAlchemy、Dash、petl などの主要なPython ツールにApache Phoenix をシームレスに統合。
CData
こんにちは!ウェブ担当の加藤です。マーケ関連のデータ分析や整備もやっています。
Pythonエコシステムには多くのモジュールがあり、システム構築を素早く効率的に行うことができます。本記事では、CData Python Connector for ApachePhoenix とpetl フレームワークを使って、Phoenix データにPython から接続してデータを変換、CSV に出力するETL 変換を実装してみます。
CData Python Connector は効率的なデータ処理によりPhoenix データ にPython から接続し、高いパフォーマンスを発揮します。Phoenix にデータをクエリする際、ドライバーはフィルタリング、集計などがサポートされている場合SQL 処理を直接Phoenix 側に行わせ、サポートされていないSQL 処理については、組み込みのSQL エンジンによりクライアント側で処理を行います(JOIN やSQL 関数など)。
pip で必要なモジュールおよびフレームワークをインストールします:
pip install petl pip install pandas
モジュールとフレームワークをインストールしたら、ETL アプリケーションを組んでいきます。コードのスニペットは以下の通りです。フルコードは記事の末尾に付いています。
CData Connector を含むモジュールをインポートします。
import petl as etl import pandas as pd import cdata.apachephoenix as mod
接続文字列で接続を確立します。connect 関数を使って、CData Phoenix Connector からPhoenix への接続を行います
cnxn = mod.connect("Server=localhost;Port=8765;")
Phoenix Query Server 経由でApache Phoenix に接続します。デフォルトのポートと異なる場合は、Server とPort プロパティを設定してApache Phoenix に接続します。Servre プロパティは通常、Apache Phoenix をホストしているサーバーのホスト名またはIP アドレスです。
デフォルトでは、認証は使用されません(プレーン)。サーバーに認証が設定されている場合は、AuthScheme をNEGOTIATE に設定して、 必要な場合にはUser とPassword プロパティを設定して、Kerberos で認証します。
Phoenix にはSQL でデータアクセスが可能です。MyTable エンティティからのデータを読み出します。
sql = "SELECT Id, Column1 FROM MyTable WHERE Id = '123456'"
DataFrame に格納されたクエリ結果を使って、petl でETL(抽出・変換・ロード)パイプラインを組みます。この例では、Phoenix データ を取得して、Column1 カラムでデータをソートして、CSV ファイルにデータをロードします。
table1 = etl.fromdb(cnxn,sql) table2 = etl.sort(table1,'Column1') etl.tocsv(table2,'mytable_data.csv')
CData Python Connector for ApachePhoenix を使えば、データベースを扱う場合と同感覚で、Phoenix データ を扱うことができ、petl のようなETL パッケージから直接データにアクセスが可能になります。
Phoenix Python Connector の30日の無償トライアル をぜひダウンロードして、Phoenix データ への接続をPython アプリやスクリプトから簡単に作成しましょう。
import petl as etl import pandas as pd import cdata.apachephoenix as mod cnxn = mod.connect("Server=localhost;Port=8765;") sql = "SELECT Id, Column1 FROM MyTable WHERE Id = '123456'" table1 = etl.fromdb(cnxn,sql) table2 = etl.sort(table1,'Column1') etl.tocsv(table2,'mytable_data.csv')