製品をチェック

Apache Kafka Connector の30日間無償トライアルをダウンロード

 30日間の無償トライアルへ

製品の詳細

Apache Kafka アイコン Apache Kafka Python Connector 相談したい

Apache Kafka データ連携用Python コネクタライブラリ。Apache Kafka データをPandas、SQLAlchemy、Dash、petl などの人気のPython ツールにシームレスに統合。

Python のDash ライブラリを使って、Kafka データ に連携するウェブアプリケーションを開発する方法

CData Python Connector を使って、Kafka にデータ連携するPython ウェブアプリケーションを開発できます。pandas とDash を使って作成してみます。

加藤龍彦
デジタルマーケティング

最終更新日:2023-09-23
kafka ロゴ

CData

python ロゴ画像
Python ロゴ

こんにちは!ウェブ担当の加藤です。マーケ関連のデータ分析や整備もやっています。

Python エコシステムには、多くのモジュールがあり、システム構築を素早く効率的に行うことができます。CData Python Connector for ApacheKafka を使うことで、pandas モジュールとDash フレームワークでKafka にデータ連携するアプリケーションを効率的に開発することができます。本記事では、pandas、Dash とCData Connector を使って、Kafka に連携して、Kafka データ をビジュアライズするシンプルなウェブアプリを作る方法をご紹介します。

CData Python Connectors の特徴

CData Python Connectors は、以下のような特徴を持った製品です。

  1. Kafka をはじめとする、CRM、MA、会計ツールなど多様なカテゴリの270種類以上のSaaS / オンプレデータソースに対応
  2. Dash をはじめとする多様なデータ分析・BI ツールにKafka データを連携
  3. ノーコードでの手軽な接続設定

必要なモジュールのインストール

まずは、pip で必要なモジュールおよびフレームワークをインストールします:

pip install pandas
pip install dash
pip install dash-daq

Python でKafka データを可視化

必要なモジュールとフレームワークがインストールされたら、ウェブアプリを開発していきます。コードのスニペットは以下の通りです。フルコードは記事の末尾に掲載しているので、参考にしてください。

まず、CData Connector を含むモジュールをインポートします:

import os
import dash
import dash_core_components as dcc
import dash_html_components as html
import pandas as pd
import cdata.apachekafka as mod
import plotly.graph_objs as go

接続文字列を使ってデータへの接続を確立します。connect 関数を使ってCData Kafka Connector からKafka データ との接続を確立します。

cnxn = mod.connect("User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;")

BootstrapServers およびTopic プロパティを設定して、Apache Kafka サーバーのアドレスと、対話するトピックを指定します。

認可メカニズム

  • SASL PlainUser およびPassword プロパティを指定する必要があります。AuthScheme は'Plain' に設定します。
  • SASL SSLUser およびPassword プロパティを指定する必要があります。AuthScheme は'Scram' に、UseSSL はtrue に設定します。
  • SSLSSLCert およびSSLCertPassword プロパティを指定する必要があります。UseSSL はtrue に設定します。
  • KerberosUser およびPassword プロパティを指定する必要があります。AuthScheme は'Kerberos' に設定します。

サーバー証明書を信頼する必要がある場合があります。そのような場合は、必要に応じてTrustStorePath およびTrustStorePassword を指定してください。

Kafka にクエリを実行

read_sql 関数を使って、padas からSQL 文を発行し、DataFrame に結果を格納します。

df = pd.read_sql("""SELECT Id, Column1 FROM SampleTable_1 WHERE Column2 = '100'""", cnxn)

ウェブアプリケーションの設定

DataFrame に格納されたクエリ結果を使って、ウェブアプリにname、stylesheet、title を設定していきます。

app_name = 'dash-apachekafkaedataplot'

external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']

app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
app.title = 'CData + Dash'

Layout 設定

次に、Kafka データ をベースにした棒グラフを作詞し、アプリのレイアウトを設定します。

trace = go.Bar(x=df.Id, y=df.Column1, name='Id')

app.layout = html.Div(children=[html.H1("CData Extention + Dash", style={'textAlign': 'center'}),
	dcc.Graph(
		id='example-graph',
		figure={
			'data': [trace],
			'layout':
			go.Layout(alt='Kafka SampleTable_1 Data', barmode='stack')
		})
], className="container")

アプリをセットアップして実行

接続、アプリ、レイアウトを定義したら、アプリを実行してみましょう。以下のコードで実行できます。

if __name__ == '__main__':
    app.run_server(debug=True)

最後に、Python でウェブアプリを起動してブラウザでKafka データ を見てみましょう。

python apachekafka-dash.py
Dash のウェブアプリでKafka データ を表示

ちゃんとデータが表示できてますね!

おわりに

Kafka Python Connector の30日の無償トライアル をぜひダウンロードして、Kafka データ への接続をPython アプリやスクリプトから簡単に作成してみてください。



import os
import dash
import dash_core_components as dcc
import dash_html_components as html
import pandas as pd
import cdata.apachekafka as mod
import plotly.graph_objs as go

cnxn = mod.connect("User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;")

df = pd.read_sql("SELECT Id, Column1 FROM SampleTable_1 WHERE Column2 = '100'", cnxn)
app_name = 'dash-apachekafkadataplot'

external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']

app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
app.title = 'CData + Dash'
trace = go.Bar(x=df.Id, y=df.Column1, name='Id')

app.layout = html.Div(children=[html.H1("CData Extention + Dash", style={'textAlign': 'center'}),
	dcc.Graph(
		id='example-graph',
		figure={
			'data': [trace],
			'layout':
			go.Layout(alt='Kafka SampleTable_1 Data', barmode='stack')
		})
], className="container")

if __name__ == '__main__':
    app.run_server(debug=True)

関連コンテンツ

トライアル・お問い合わせ

30日間無償トライアルで、CData のリアルタイムデータ連携をフルにお試しいただけます。記事や製品についてのご質問があればお気軽にお問い合わせください。