製品をチェック

Apache Spark Connector の30日間無償トライアルをダウンロード

 30日間の無償トライアルへ

製品の詳細

Apache Spark アイコン Apache Spark Python Connector 相談したい

Apache Spark へのデータ連携用のPython Connecotr ライブラリ。 pandas、SQLAlchemy、Dash、petl などの主要なPython ツールにApache Spark をシームレスに統合。

Python でSpark データを変換・出力するETL 処理を作る方法

CData Python Connector とpetl モジュールを使って、Spark データを変換後にCSV ファイルに吐き出すETL 処理を実装します。

加藤龍彦
デジタルマーケティング

最終更新日:2023-09-23
spark ロゴ

CData

python ロゴ画像
Python ロゴ

こんにちは!ウェブ担当の加藤です。マーケ関連のデータ分析や整備もやっています。

Pythonエコシステムには多くのモジュールがあり、システム構築を素早く効率的に行うことができます。本記事では、CData Python Connector for SparkSQL とpetl フレームワークを使って、Spark データにPython から接続してデータを変換、CSV に出力するETL 変換を実装してみます。

CData Python Connector は効率的なデータ処理によりSpark データ にPython から接続し、高いパフォーマンスを発揮します。Spark にデータをクエリする際、ドライバーはフィルタリング、集計などがサポートされている場合SQL 処理を直接Spark 側に行わせ、サポートされていないSQL 処理については、組み込みのSQL エンジンによりクライアント側で処理を行います(JOIN やSQL 関数など)。

必要なモジュールのインストール

pip で必要なモジュールおよびフレームワークをインストールします:

pip install petl
pip install pandas

Python でSpark データをETL 処理するアプリを構築

モジュールとフレームワークをインストールしたら、ETL アプリケーションを組んでいきます。コードのスニペットは以下の通りです。フルコードは記事の末尾に付いています。

CData Connector を含むモジュールをインポートします。

import petl as etl
import pandas as pd
import cdata.sparksql as mod

接続文字列で接続を確立します。connect 関数を使って、CData Spark Connector からSpark への接続を行います

cnxn = mod.connect("Server=127.0.0.1;")

SparkSQL への接続

SparkSQL への接続を確立するには以下を指定します。

  • Server:SparkSQL をホストするサーバーのホスト名またはIP アドレスに設定。
  • Port:SparkSQL インスタンスへの接続用のポートに設定。
  • TransportMode:SparkSQL サーバーとの通信に使用するトランスポートモード。有効な入力値は、BINARY およびHTTP です。デフォルトではBINARY が選択されます。
  • AuthScheme:使用される認証スキーム。有効な入力値はPLAIN、LDAP、NOSASL、およびKERBEROS です。デフォルトではPLAIN が選択されます。

Databricks への接続

Databricks クラスターに接続するには、以下の説明に従ってプロパティを設定します。Note:必要な値は、「クラスター」に移動して目的のクラスターを選択し、 「Advanced Options」の下にある「JDBC/ODBC」タブを選択することで、Databricks インスタンスで見つけることができます。

  • Server:Databricks クラスターのサーバーのホスト名に設定。
  • Port:443
  • TransportMode:HTTP
  • HTTPPath:Databricks クラスターのHTTP パスに設定。
  • UseSSL:True
  • AuthScheme:PLAIN
  • User:'token' に設定。
  • Password:パーソナルアクセストークンに設定(値は、Databricks インスタンスの「ユーザー設定」ページに移動して「アクセストークン」タブを選択することで取得できます)。

Spark をクエリするSQL 文の作成

Spark にはSQL でデータアクセスが可能です。Customers エンティティからのデータを読み出します。

sql = "SELECT City, Balance FROM Customers WHERE Country = 'US'"

Spark データ のETL 処理

DataFrame に格納されたクエリ結果を使って、petl でETL(抽出・変換・ロード)パイプラインを組みます。この例では、Spark データ を取得して、Balance カラムでデータをソートして、CSV ファイルにデータをロードします。

table1 = etl.fromdb(cnxn,sql)

table2 = etl.sort(table1,'Balance')

etl.tocsv(table2,'customers_data.csv')

CData Python Connector for SparkSQL を使えば、データベースを扱う場合と同感覚で、Spark データ を扱うことができ、petl のようなETL パッケージから直接データにアクセスが可能になります。

おわりに

Spark Python Connector の30日の無償トライアル をぜひダウンロードして、Spark データ への接続をPython アプリやスクリプトから簡単に作成しましょう。



フルソースコード

import petl as etl
import pandas as pd
import cdata.sparksql as mod

cnxn = mod.connect("Server=127.0.0.1;")

sql = "SELECT City, Balance FROM Customers WHERE Country = 'US'"

table1 = etl.fromdb(cnxn,sql)

table2 = etl.sort(table1,'Balance')

etl.tocsv(table2,'customers_data.csv')

関連コンテンツ

トライアル・お問い合わせ

30日間無償トライアルで、CData のリアルタイムデータ連携をフルにお試しいただけます。記事や製品についてのご質問があればお気軽にお問い合わせください。