Python でKafka データをETL

詳細情報をご希望ですか?

無償トライアル:

ダウンロードへ

製品の詳細情報へ:

Apache Kafka Python Connector

Apache Kafka データ連携用Python コネクタライブラリ。Apache Kafka データをPandas、SQLAlchemy、Dash、petl などの人気のPython ツールにシームレスに統合。



CData Python Connector for Kafka を使って、Python petl でKafka data のETL 連携・パイプラインアプリケーションを作成。

Pythonエコシステムには、多くのモジュールがあり、システム構築を素早く効率的に行うことができます。CData Python Connector for Kafka とpetl フレームワークを使って、Kafka に連携するPython アプリや、Kafka データをETL することが可能です。本記事では、CData Python Connector をpetl と一緒に使い、ETL 処理を実装します。

CData Python Connector は、ビルトインされた効率的なデータプロセスにより、リアルタイムKafka data データにPython からアクセスし、高いパフォーマンスと接続性を発揮します。Kafka に複雑なクエリを投げる際に、ドライバーはフィルタリング、集計などがサポートされている場合、SQL 処理を直接Kafka 側に行わせ、サポートされていないSQL 処理については、組み込まれたSQL エンジンによりクライアント側で処理を行います(特にJOIN やSQL 関数など)。

Kafka Data への接続

Kafka data への連携は、RDB ソースへのアクセスと同感覚で行うことができます。必要な接続プロパティを使って接続文字列を作成します。本記事では、接続文字列をcreate_engine 関数のパラメータとして送ります。

BootstrapServers およびTopic プロパティを設定して、Apache Kafka サーバーのアドレスと、対話するトピックを指定します。

認可メカニズム

  • SASL PlainUser およびPassword プロパティを指定する必要があります。AuthScheme は'Plain' に設定します。
  • SASL SSLUser およびPassword プロパティを指定する必要があります。AuthScheme は'Scram' に、UseSSL はtrue に設定します。
  • SSLSSLCert およびSSLCertPassword プロパティを指定する必要があります。UseSSL はtrue に設定します。
  • KerberosUser およびPassword プロパティを指定する必要があります。AuthScheme は'Kerberos' に設定します。

サーバー証明書を信頼する必要がある場合があります。そのような場合は、必要に応じてTrustStorePath およびTrustStorePassword を指定してください。

CData Kafka Connector をインストールしたら、次のように必要なモジュールをインストールし、Python オブジェクトでKafka にアクセスします。

必要なモジュールのインストール

pip で必要なモジュールおよびフレームワークをインストールします:

pip install petl
pip install pandas

Python でKafka データをETL 処理するアプリを構築

モジュールとフレームワークをインストールしたら、ETL アプリケーションを組んでいきます。コードのスニペットは以下の通りです。フルコードは記事の末尾に付いています。

CData Connector を含むモジュールをインポートします:

import petl as etl
import pandas as pd
import cdata.apachekafka as mod

接続文字列で接続を確立します。connect 関数を使って、CData Kafka Connector からKafka への接続を行います

cnxn = mod.connect("User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;")

Kafka をクエリするSQL 文の作成

Kafka にはSQL でデータアクセスが可能です。SampleTable_1 エンティティからのデータを読み出します。

sql = "SELECT Id, Column1 FROM SampleTable_1 WHERE Column2 = '100'"

Kafka Data のETL 処理

DataFrame に格納されたクエリ結果を使って、petl でExtract(取得)、Transform(加工)、Load(ロード)を組みます。この例では、Kafka data を取得して、Column1 カラムでデータをソートして、CSV ファイルにデータをロードします。

table1 = etl.fromdb(cnxn,sql)

table2 = etl.sort(table1,'Column1')

etl.tocsv(table2,'sampletable_1_data.csv')

CData Python Connector for Kafka を使えば、データベースを扱う場合と同感覚で、Kafka data を扱うことができ、petl のようなETL パッケージから直接データにアクセスが可能になります。

製品の無償トライアル情報

Kafka Python Connector の30日の無償トライアル をぜひダウンロードして、Kafka data への接続をPython アプリやスクリプトから簡単に作成しましょう。



フルソースコード

import petl as etl
import pandas as pd
import cdata.apachekafka as mod

cnxn = mod.connect("User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;")

sql = "SELECT Id, Column1 FROM SampleTable_1 WHERE Column2 = '100'"

table1 = etl.fromdb(cnxn,sql)

table2 = etl.sort(table1,'Column1')

etl.tocsv(table2,'sampletable_1_data.csv')