各製品の資料を入手。
詳細はこちら →Apache Airflow でe-Sales Manager のデータに連携したワークフローを作る
CData JDBC Driver を使ってApache Airflow からe-Sales Manager のデータにアクセスして操作します。
最終更新日:2022-09-07
この記事で実現できるe-Sales Manager 連携のシナリオ
こんにちは!ドライバー周りのヘルプドキュメントを担当している古川です。
Apache Airflow を使うと、データエンジニアリングワークフローの作成、スケジューリング、および監視を行うことができます。CData JDBC Driver for ESalesManager と組み合わせることで、Airflow からリアルタイムe-Sales Manager のデータに連携できます。 この記事では、Apache Airflow インスタンスからe-Sales Manager のデータに接続してクエリを実行し、結果をCSV ファイルに保存する方法を紹介します。
最適化されたデータ処理が組み込まれたCData JDBC Driver は、リアルタイムe-Sales Manager のデータを扱う上で高いパフォーマンスを提供します。 e-Sales Manager にSQL クエリを発行すると、CData ドライバーはフィルタや集計などのe-Sales Manager 側でサポートしているSQL 操作をe-Sales Manager に直接渡し、サポートされていない操作(主にSQL 関数とJOIN 操作)は組み込みSQL エンジンを利用してクライアント側で処理します。 組み込みの動的メタデータクエリを使用すると、ネイティブのデータ型を使ってe-Sales Manager のデータを操作および分析できます。
e-Sales Manager への接続を構成する
組み込みの接続文字列デザイナー
JDBC URL の作成の補助として、e-Sales Manager JDBC Driver に組み込まれている接続文字列デザイナーが使用できます。JAR ファイルをダブルクリックするか、コマンドラインからjar ファイルを実行します。
java -jar cdata.jdbc.esalesmanager.jar
接続プロパティを入力し、接続文字列をクリップボードにコピーします。
e セールスマネージャー Remix に接続するには、User、Passowrd、URL、TenantId パラメータが必要です。
- User:API を実行するためのログインユーザーのユーザーID。
- Password:API を実行するためのログインユーザーのユーザーパスワード 。
- URL:e-Sales Manager Remix エンドポイントへのURL。例:https://XXX.softbrain.co.jp
- TenantId:e-Sales Manager Remix テナント名のTenantd。例:cdata

クラスタ環境またはクラウドでJDBC ドライバーをホストするには、ライセンス(フルまたはトライアル)およびランタイムキー(RTK)が必要です。本ライセンス(またはトライアル)の取得については、こちらからお問い合わせください。
以下は、JDBC 接続で要求される必須プロパティです。
プロパティ | 値 |
---|---|
Database Connection URL |
jdbc:esalesmanager:RTK=5246...;User=MyUsername;Password=MyPassword;URL=MyInstanceURL;TenantId=MyTenantId;
|
Database Driver Class Name | cdata.jdbc.esalesmanager.ESalesManagerDriver |
Airflow でJDBC 接続を確立する
- Apache Airflow インスタンスにログインします。
- Airflow インスタンスのナビゲーションバーで、「Admin」にカーソルを合わせ、「Connections」をクリックします。
- 次の画面で「+」マークをクリックして新しい接続を作成します。
- Add Connection フォームで、必要な接続プロパティを入力します。
- Connection Id:接続の名前:esalesmanager_jdbc
- Connection Type:JDBC Connection
- Connection URL:上記のJDBC 接続URL:
jdbc:esalesmanager:RTK=5246...;User=MyUsername;Password=MyPassword;URL=MyInstanceURL;TenantId=MyTenantId;
- Driver Class:cdata.jdbc.esalesmanager.ESalesManagerDriver
- Driver Path:PATH/TO/cdata.jdbc.esalesmanager.jar
- フォームの下にある「Test」ボタンをクリックし、新規の接続をテストします。
- 新規接続を保存すると、新しく表示される画面に、接続リストに新しい行が追加されたことを示す緑のバナーが表示されます。
DAG を作成する
Airflow におけるDAG は、ワークフローのプロセスを格納するエンティティであり、DAG にトリガーを設定することでワークフローを実行することができます。 今回のワークフローでは、シンプルにe-Sales Manager のデータに対してSQL クエリを実行し、結果をCSV ファイルに格納します。
- はじめに、Home ディレクトリにある「airflow」フォルダに移動します。その中に新しいディレクトリを作成し、タイトルを「dags」とします。 ここに、UI に表示されるAirflow のDAG を構築するPython ファイルを格納します。
- 次に新しいPython ファイルを作成し、タイトルをe-sales manager_hook.py にします。この新規ファイル内に、次のコードを挿入します。
import time from datetime import datetime from airflow.decorators import dag, task from airflow.providers.jdbc.hooks.jdbc import JdbcHook import pandas as pd # Dag の宣言 @dag(dag_id="e-sales manager_hook", schedule_interval="0 10 * * *", start_date=datetime(2022,2,15), catchup=False, tags=['load_csv']) # Dag となる関数を定義(取得するテーブルは必要に応じて変更してください) def extract_and_load(): # Define tasks @task() def jdbc_extract(): try: hook = JdbcHook(jdbc_conn_id="jdbc") sql = """ select * from Account """ df = hook.get_pandas_df(sql) df.to_csv("/{some_file_path}/{name_of_csv}.csv",header=False, index=False, quoting=1) # print(df.head()) print(df) tbl_dict = df.to_dict('dict') return tbl_dict except Exception as e: print("Data extract error: " + str(e)) jdbc_extract() sf_extract_and_load = extract_and_load()
- このファイルを保存し、Airflow インスタンスをリフレッシュします。DAG リストの中に、「e-sales manager_hook」というタイトルの新しいDAG が表示されるはずです。
- このDAG をクリックし、新しく表示される画面で一時停止解除スイッチをクリックして青色にし、トリガー(=play)ボタンをクリックしてDAG を実行します。この操作で、e-sales manager_hook.py ファイルのSQL クエリを実行し、結果をCSV としてコード内で指定したファイルパスにエクスポートします。
- 新規のDAG を実行後、Downloads フォルダ(またはPython スクリプト内で選択したフォルダ)を確認し、CSV ファイルが作成されていることを確認します(本ワークフローの場合はaccount.csv です)。
- CSV ファイルを開くと、Apache Airflow によってe-Sales Manager のデータがCSV 形式で利用できるようになったことが確認できます。