Python でAvro データを変換・出力するETL 処理を作る方法

CData Python Connector for Avro とpetl モジュールを使って、Avro データを変換後にCSV ファイルに吐き出すETL 処理を実装します。

古川えりか
コンテンツスペシャリスト


avro ロゴ

CData

python ロゴ画像
Python ロゴ

こんにちは!ドライバー周りのヘルプドキュメントを担当している古川です。

Pythonエコシステムには多くのモジュールがあり、システム構築を素早く効率的に行うことができます。本記事では、CData Python Connector for Avro とpetl フレームワークを使って、Avro データにPython から接続してデータを変換、CSV に出力するETL 変換を実装してみます。

CData Python Connector は効率的なデータ処理によりAvro データ にPython から接続し、高いパフォーマンスを発揮します。Avro にデータをクエリする際、ドライバーはフィルタリング、集計などがサポートされている場合SQL 処理を直接Avro 側に行わせ、サポートされていないSQL 処理については、組み込みのSQL エンジンによりクライアント側で処理を行います(JOIN やSQL 関数など)。

Avro データへの接続

RDB と同感覚でAvro データへの連携を行うことができます。必要な接続プロパティを使って接続文字列を作成します。本記事では、接続文字列をcreate_engine 関数のパラメータとして送ります。

URI 接続プロパティをAvro ファイルの場所に設定して、ローカルのAvro ファイルに接続します。

CData Avro Connector をインストールしたら、次のように必要なモジュールをインストールし、Python オブジェクトでAvro にアクセスします。

必要なモジュールのインストール

pip で必要なモジュールおよびフレームワークをインストールします:

pip install petl
pip install pandas

Python でAvro データをETL 処理するアプリを構築

モジュールとフレームワークをインストールしたら、ETL アプリケーションを組んでいきます。コードのスニペットは以下の通りです。フルコードは記事の末尾に付いています。

CData Connector を含むモジュールをインポートします:

import petl as etl
import pandas as pd
import cdata.avro as mod

接続文字列で接続を確立します。connect 関数を使って、CData Avro Connector からAvro への接続を行います

cnxn = mod.connect("URI=C:/folder/table.avroInitiateOAuth=GETANDREFRESH;OAuthSettingsLocation=/PATH/TO/OAuthSettings.txt")")

Avro をクエリするSQL 文の作成

Avro にはSQL でデータアクセスが可能です。SampleTable_1 エンティティからのデータを読み出します。

sql = "SELECT Id, Column1 FROM SampleTable_1 WHERE Column2 = 'value_2'"

Avro データ のETL 処理

DataFrame に格納されたクエリ結果を使って、petl でExtract(取得)、Transform(加工)、Load(ロード)を組みます。この例では、Avro データ を取得して、Column1 カラムでデータをソートして、CSV ファイルにデータをロードします。

table1 = etl.fromdb(cnxn,sql)

table2 = etl.sort(table1,'Column1')

etl.tocsv(table2,'sampletable_1_data.csv')

CData Python Connector for Avro を使えば、データベースを扱う場合と同感覚で、Avro データ を扱うことができ、petl のようなETL パッケージから直接データにアクセスが可能になります。

おわりに

Avro Python Connector の30日の無償トライアル をぜひダウンロードして、Avro データ への接続をPython アプリやスクリプトから簡単に作成しましょう。



フルソースコード

import petl as etl
import pandas as pd
import cdata.avro as mod

cnxn = mod.connect("URI=C:/folder/table.avroInitiateOAuth=GETANDREFRESH;OAuthSettingsLocation=/PATH/TO/OAuthSettings.txt")")

sql = "SELECT Id, Column1 FROM SampleTable_1 WHERE Column2 = 'value_2'"

table1 = etl.fromdb(cnxn,sql)

table2 = etl.sort(table1,'Column1')

etl.tocsv(table2,'sampletable_1_data.csv')

関連コンテンツ

トライアル・お問い合わせ

30日間無償トライアルで、CData のリアルタイムデータ連携をフルにお試しいただけます。記事や製品についてのご質問があればお気軽にお問い合わせください。