製品をチェック

Apache Kafka Connector の30日間無償トライアルをダウンロード

 30日間の無償トライアルへ

製品の詳細

Apache Kafka アイコン Apache Kafka Python Connector 相談したい

Apache Kafka データ連携用Python コネクタライブラリ。Apache Kafka データをPandas、SQLAlchemy、Dash、petl などの人気のPython ツールにシームレスに統合。

SQLAlchemy ORM を使って、Python でKafka データに連携する方法

CData Python Connector を使って、Python アプリケーションおよびスクリプトからSQLAlchemy 経由でKafka にOR マッピング可能に。

加藤龍彦
デジタルマーケティング

最終更新日:2023-09-23
kafka ロゴ

CData

python ロゴ画像
Python ロゴ

こんにちは!ウェブ担当の加藤です。マーケ関連のデータ分析や整備もやっています。

Pythonエコシステムには、多くのモジュールがあり、システム構築を素早く効率的に行うことができます。CData Python Connector for ApacheKafka は、pandas、Matplotlib モジュール、SQLAlchemy ツールキットから使用することで Kafka にデータ連携するPython アプリケーションを構築し、Kafka データを可視化できます。 本記事では、SQLAlchemy でKafka に連携して、データを取得、、更新、挿入、削除 する方法を説明します。

CData Python Connectors の特徴

CData Python Connectors は、以下のような特徴を持った製品です。

  1. Kafka をはじめとする、CRM、MA、会計ツールなど多様なカテゴリの270種類以上のSaaS / オンプレデータソースに対応
  2. Python をはじめとする多様なデータ分析・BI ツールにKafka データを連携
  3. ノーコードでの手軽な接続設定

CData Python Connectors では、1.データソースとしてKafka の接続を設定、2.Python からPython Connectors との接続を設定、という2つのステップだけでデータソースに接続できます。以下に具体的な設定手順を説明します。

必要なモジュールのインストール

pip でSQLAlchemy ツールキットをインストールします:

pip install sqlalchemy

モジュールのインポートを忘れずに行います:

import sqlalchemy

Python でKafka データをモデル化

次は、接続文字列で接続を確立します。create_engine 関数を使って、Kafka データに連携するEngne を作成します。

engine = create_engine("apachekafka///?User=admin&Password=pass&BootStrapServers=https://localhost:9091&Topic=MyTopic")

BootstrapServers およびTopic プロパティを設定して、Apache Kafka サーバーのアドレスと、対話するトピックを指定します。

認可メカニズム

  • SASL PlainUser およびPassword プロパティを指定する必要があります。AuthScheme は'Plain' に設定します。
  • SASL SSLUser およびPassword プロパティを指定する必要があります。AuthScheme は'Scram' に、UseSSL はtrue に設定します。
  • SSLSSLCert およびSSLCertPassword プロパティを指定する必要があります。UseSSL はtrue に設定します。
  • KerberosUser およびPassword プロパティを指定する必要があります。AuthScheme は'Kerberos' に設定します。

サーバー証明書を信頼する必要がある場合があります。そのような場合は、必要に応じてTrustStorePath およびTrustStorePassword を指定してください。

Kafka データのマッピングクラスの宣言

接続を確立したら、OR マッパーでモデル化するテーブルのマッピングクラスを宣言します。本記事では、SampleTable_1 テーブルを使います。sqlalchemy.ext.declarative.declarative_base 関数を使って、新しいクラスにフィールド(カラム)を定義します。

base = declarative_base()
class SampleTable_1(base):
	__tablename__ = "SampleTable_1"
	Id = Column(String,primary_key=True)
	Column1 = Column(String)
	...

Kafka データをクエリ

マッピングクラスができたので、セッションオブジェクトを使ってデータソースをクエリすることができます。セッションにEngine をバインドして、セッションのquery メソッドにマッピングクラスを提供します。

query メソッドを使う

engine = create_engine("apachekafka///?User=admin&Password=pass&BootStrapServers=https://localhost:9091&Topic=MyTopic")
factory = sessionmaker(bind=engine)
session = factory()
for instance in session.query(SampleTable_1).filter_by(Column2="100"):
	print("Id: ", instance.Id)
	print("Column1: ", instance.Column1)
	print("---------")

ほかの方法としては、execute メソッドを適切なテーブルオブジェクトに使うことが可能です。以下のコードはアクティブなsession に対して有効です。

execute メソッドを使う

SampleTable_1_table = SampleTable_1.metadata.tables["SampleTable_1"]
for instance in session.execute(SampleTable_1_table.select().where(SampleTable_1_table.c.Column2 == "100")):
	print("Id: ", instance.Id)
	print("Column1: ", instance.Column1)
	print("---------")

より複雑なクエリとして、JOIN、集計、Limit などが利用可能です。詳細はヘルプドキュメントをご覧ください。

Kafka データの挿入(INSERT)

Kafka データへの挿入には、マップされたクラスのインスタンスを定義し、アクティブな session に追加します。commit 関数を呼び出して、Kafka にすべての追加インスタンスを送ります。

new_rec = SampleTable_1(Id="placeholder", Column2="100")
session.add(new_rec)
session.commit()

Kafka データを更新(UPDATE)

Kafka データの更新には、更新するレコードをフィルタクエリとともにフェッチします。そして、フィールドの値を変更し、セッションでcommit 関数を呼んで、Kafka にレコードを追加します。

updated_rec = session.query(SampleTable_1).filter_by(SOME_ID_COLUMN="SOME_ID_VALUE").first()
updated_rec.Column2 = "100"
session.commit()

Kafka データを削除(DELETE)

Kafka データの削除には、フィルタクエリと一緒に対象となるレコードをフェッチします。そして、アクティブsession でレコードを削除し、セッションでcommit 関数を呼び出して、該当するレコードの削除を実行します。

deleted_rec = session.query(SampleTable_1).filter_by(SOME_ID_COLUMN="SOME_ID_VALUE").first()
session.delete(deleted_rec)
session.commit()

Kafka からPython へのデータ連携には、ぜひCData Python Connector をご利用ください

このようにCData Python Connector と併用することで、270を超えるSaaS、NoSQL データをPython からコーディングなしで扱うことができます。30日の無償評価版が利用できますので、ぜひ自社で使っているクラウドサービスやNoSQL と合わせて活用してみてください。

日本のユーザー向けにCData Python Connector は、UI の日本語化、ドキュメントの日本語化、日本語でのテクニカルサポートを提供しています。

関連コンテンツ

トライアル・お問い合わせ

30日間無償トライアルで、CData のリアルタイムデータ連携をフルにお試しいただけます。記事や製品についてのご質問があればお気軽にお問い合わせください。