ノーコードでクラウド上のデータとの連携を実現。
詳細はこちら →CData
こんにちは!ドライバー周りのヘルプドキュメントを担当している古川です。
Apache Airflow を使うと、データエンジニアリングワークフローの作成、スケジューリング、および監視を行うことができます。CData JDBC Driver for Paylocity と組み合わせることで、Airflow からリアルタイムPaylocity データに連携できます。 この記事では、Apache Airflow インスタンスからPaylocity データに接続してクエリを実行し、結果をCSV ファイルに保存する方法を紹介します。
最適化されたデータ処理が組み込まれたCData JDBC Driver は、リアルタイムPaylocity データを扱う上で高いパフォーマンスを提供します。 Paylocity にSQL クエリを発行すると、CData ドライバーはフィルタや集計などのPaylocity 側でサポートしているSQL 操作をPaylocity に直接渡し、サポートされていない操作(主にSQL 関数とJOIN 操作)は組み込みSQL エンジンを利用してクライアント側で処理します。 組み込みの動的メタデータクエリを使用すると、ネイティブのデータ型を使ってPaylocity データを操作および分析できます。
JDBC URL の作成の補助として、Paylocity JDBC Driver に組み込まれている接続文字列デザイナーが使用できます。JAR ファイルをダブルクリックするか、コマンドラインからjar ファイルを実行します。
java -jar cdata.jdbc.paylocity.jar
接続プロパティを入力し、接続文字列をクリップボードにコピーします。
Paylocity への接続を確立するには以下を設定します。
このプロパティは、Insert およびUpdate ステートメントを実行するために必須です。この機能が無効になっている場合は必須ではありません。
Paylocity は、RSA 復号化を使用してAES 鍵を復号化します。
これはオプションのプロパティで、IV の値が指定されていない場合、ドライバーは内部でキーを生成します。
OAuth を使用してPaylocity で認証する必要があります。OAuth では認証するユーザーにブラウザでPaylocity との通信を要求します。詳しくは、ヘルプドキュメントのOAuth セクションを参照してください。
Pay Entry API はPaylocity API の他の部分と完全に分離されています。個別のクライアントID とシークレットを使用し、アカウントへのアクセスを許可するにはPaylocity から明示的にリクエストする必要があります。 Pay Entry API を使用すると、個々の従業員の給与情報を自動的に送信できます。 Pay Entry API によって提供されるものの性質が非常に限られているため、CData では個別のスキーマを提供しないことを選択しましたが、UsePayEntryAPI 接続プロパティを介して有効にできます。
UsePayEntryAPI をtrue に設定する場合は、CreatePayEntryImportBatch、MergePayEntryImportBatch、Input_TimeEntry、およびOAuth ストアドプロシージャのみ利用できることに注意してください。 製品のその他の機能を使用しようとするとエラーが発生します。また、OAuthAccessToken を個別に保存する必要があります。これは、この接続プロパティを使用するときに異なるOAuthSettingsLocation を設定することを意味します。
クラスタ環境またはクラウドでJDBC ドライバーをホストするには、ライセンス(フルまたはトライアル)およびランタイムキー(RTK)が必要です。本ライセンス(またはトライアル)の取得については、こちらからお問い合わせください。
以下は、JDBC 接続で要求される必須プロパティです。
プロパティ | 値 |
---|---|
Database Connection URL |
jdbc:paylocity:RTK=5246...;OAuthClientID=YourClientId;OAuthClientSecret=YourClientSecret;RSAPublicKey=YourRSAPubKey;Key=YourKey;IV=YourIV;InitiateOAuth=GETANDREFRESH
|
Database Driver Class Name | cdata.jdbc.paylocity.PaylocityDriver |
jdbc:paylocity:RTK=5246...;OAuthClientID=YourClientId;OAuthClientSecret=YourClientSecret;RSAPublicKey=YourRSAPubKey;Key=YourKey;IV=YourIV;InitiateOAuth=GETANDREFRESH
Airflow におけるDAG は、ワークフローのプロセスを格納するエンティティであり、DAG にトリガーを設定することでワークフローを実行することができます。 今回のワークフローでは、シンプルにPaylocity データに対してSQL クエリを実行し、結果をCSV ファイルに格納します。
import time from datetime import datetime from airflow.decorators import dag, task from airflow.providers.jdbc.hooks.jdbc import JdbcHook import pandas as pd # Dag の宣言 @dag(dag_id="paylocity_hook", schedule_interval="0 10 * * *", start_date=datetime(2022,2,15), catchup=False, tags=['load_csv']) # Dag となる関数を定義(取得するテーブルは必要に応じて変更してください) def extract_and_load(): # Define tasks @task() def jdbc_extract(): try: hook = JdbcHook(jdbc_conn_id="jdbc") sql = """ select * from Account """ df = hook.get_pandas_df(sql) df.to_csv("/{some_file_path}/{name_of_csv}.csv",header=False, index=False, quoting=1) # print(df.head()) print(df) tbl_dict = df.to_dict('dict') return tbl_dict except Exception as e: print("Data extract error: " + str(e)) jdbc_extract() sf_extract_and_load = extract_and_load()