製品をチェック

無償トライアル:

無償トライアルへ

製品の情報と無償トライアルへ:

Apache Kafka Python Connector

Apache Kafka データ連携用Python コネクタライブラリ。Apache Kafka データをPandas、SQLAlchemy、Dash、petl などの人気のPython ツールにシームレスに統合。

データ連携でお困りですか?

お問い合わせ

Python pandas を使ってKafka データをビジュアライズ


CData Python Connector for ApacheKafka を使えば、Python でKafka をpandas やその他の標準モジュールでで呼び出し、データ分析やビジュアライズが可能になります。


kafka ロゴ画像
python ロゴ画像

Python

pandas ロゴ画像

Python エコシステムには多くのモジュールがあり、システム構築を素早く効率的に行うのに役立ちます。CData Python Connector for ApacheKafka は、pandas、Matplotlib モジュール、SQLAlchemy ツールキットから使用することで Kafka にデータ連携するPython アプリケーションを構築し、Kafka をビジュアライズできます。本記事では、pandas、SQLAlchemy、およびMatplotlib のビルトイン機能でKafka にリアルタイムアクセスし、クエリを実行し、結果をビジュアライズする方法を説明します。

CData Python Connector は、ビルトインされた効率的なデータプロセスにより、リアルタイムKafka データにPython からアクセスし、高いパフォーマンスと接続性を発揮します。Kafka に複雑なクエリを投げる際に、ドライバーはフィルタリング、集計などがサポートされている場合、SQL 処理を直接Kafka 側に行わせ、サポートされていないSQL 処理については、組み込まれたSQL エンジンによりクライアント側で処理を行います(特にJOIN やSQL 関数など)。

Kafka データへの接続

Kafka への連携は、RDB ソースへのアクセスと同感覚で行うことができます。必要な接続プロパティを使って接続文字列を作成します。本記事では、接続文字列をcreate_engine 関数のパラメータとして送ります。

BootstrapServers およびTopic プロパティを設定して、Apache Kafka サーバーのアドレスと、対話するトピックを指定します。

認可メカニズム

  • SASL PlainUser およびPassword プロパティを指定する必要があります。AuthScheme は'Plain' に設定します。
  • SASL SSLUser およびPassword プロパティを指定する必要があります。AuthScheme は'Scram' に、UseSSL はtrue に設定します。
  • SSLSSLCert およびSSLCertPassword プロパティを指定する必要があります。UseSSL はtrue に設定します。
  • KerberosUser およびPassword プロパティを指定する必要があります。AuthScheme は'Kerberos' に設定します。

サーバー証明書を信頼する必要がある場合があります。そのような場合は、必要に応じてTrustStorePath およびTrustStorePassword を指定してください。

以下の手順に従い、必要なモジュールをインストールし、Python オブジェクト経由でKafka にアクセスします。

必要なモジュールのインストール

pip で、pandas & Matplotlib モジュールおよび、SQLAlchemy ツールキットをインストールします:

pip install pandas
pip install matplotlib
pip install sqlalchemy

以下のようにモジュールをインポートします:

import pandas
import matplotlib.pyplot as plt
from sqlalchemy import create_engine

Python でKafka データをビジュアライズ

次は、接続文字列で接続を確立します。create_engine 関数を使って、Kafka に連携するEngne を作成します。.

engine = create_engine("apachekafka:///?User=admin&Password=pass&BootStrapServers=https://localhost:9091&Topic=MyTopic")

Kafka にアクセスするSQL を実行

pandas のread_sql 関数を使って好きなSQL を発行して、DataFrame にデータを格納します。

df = pandas.read_sql("""SELECT Id, Column1 FROM SampleTable_1 WHERE Column2 = '100'""", engine)

Kafka データをビジュアライズ

DataFrame に格納されたクエリ結果に対して、plot 関数をつかって、Kafka data をグラフで表現してみます。show メソッドはグラフを新しいウィンドウに表示します。

df.plot(kind="bar", x="Id", y="Column1")
plt.show()
Kafka data in a Python plot (Salesforce is shown).

製品の無償トライアル情報

Kafka Python Connector の30日の無償トライアル をぜひダウンロードして、Kafka への接続をPython アプリやスクリプトから簡単に作成しましょう。



ソースコードe

import pandas
import matplotlib.pyplot as plt
from sqlalchemy import create_engin

engine = create_engine("apachekafka:///?User=admin&Password=pass&BootStrapServers=https://localhost:9091&Topic=MyTopic")
df = pandas.read_sql("""SELECT Id, Column1 FROM SampleTable_1 WHERE Column2 = '100'""", engine)

df.plot(kind="bar", x="Id", y="Column1")
plt.show()