今すぐお試しください!

製品の詳細CData Python Connector for Amazon Athena を確認して、無償評価版をダウンロード:

今すぐダウンロード

Python でAmazon Athena データをETL

CData Python Connector for Amazon Athena を使って、Python petl でAmazon Athena data のETL 連携・パイプラインアプリケーションを作成。

Pythonエコシステムには、多くのモジュールがあり、システム構築を素早く効率的に行うことができます。CData Python Connector for Amazon Athena とpetl フレームワークを使って、Amazon Athena に連携するPython アプリや、Amazon Athena データをETL することが可能です。本記事では、CData Python Connector をpetl と一緒に使い、ETL 処理を実装します。

CData Python Connector は、ビルトインされた効率的なデータプロセスにより、リアルタイムAmazon Athena data データにPython からアクセスし、高いパフォーマンスと接続性を発揮します。Amazon Athena に複雑なクエリを投げる際に、ドライバーはフィルタリング、集計などがサポートされている場合、SQL 処理を直接Amazon Athena 側に行わせ、サポートされていないSQL 処理については、組み込まれたSQL エンジンによりクライアント側で処理を行います(特にJOIN やSQL 関数など)。

Amazon Athena Data への接続

Amazon Athena data への連携は、RDB ソースへのアクセスと同感覚で行うことができます。必要な接続プロパティを使って接続文字列を作成します。本記事では、接続文字列をcreate_engine 関数のパラメータとして送ります。

Amazon Athena への接続

Amazon Athena リクエストの認証には、アカウントの管理のクレデンシャルか、IAM ユーザーのカスタムPermission を設定します。 AccessKey にAccess Key Id、SecretKey にはSecret Access Key を設定します。

Note: AWS アカウントアドミニストレータとしてアクセスできる場合でも、AWS サービスへの接続にはIAM ユーザークレデンシャルを使用することが推奨されます。

Access Key の取得

IAM ユーザーのクレデンシャル取得は以下のとおり:

  1. IAM コンソールにログイン。
  2. Navigation ペインで[ユーザー]を選択。
  3. ユーザーのアクセスキーを作成または管理するには、ユーザーを選択してから[セキュリティ認証情報]タブを選択。

AWS ルートアカウントのクレデンシャル取得は以下のとおり:

  1. ルートアカウントの資格情報を使用してAWS 管理コンソールにサインイン。
  2. アカウント名または番号を選択し、表示されたメニューで[My Security Credentials]を選択。
  3. [Continue to Security Credentials]をクリックし、[Access Keys]セクションを展開して、ルートアカウントのアクセスキーを管理または作成。

EC2 インスタンスからの認証

EC2 インスタンスから本製品を使用していて、そのインスタンスにIAM ロールが割り当てられている場合は、認証にIAM ロールを使用できます。 これを行うには、UseEC2Roles をtrue に設定しAccessKeySecretKey を空のままにします。 本製品は自動的にIAM ロールの認証情報を取得し、それらを使って認証します。

AWS ロールとして認証

多くの場合、認証にはAWS ルートユーザーのダイレクトなセキュリティ認証情報ではなく、IAM ロールを使用することをお勧めします。 代わりにRoleARN を指定してAWS ロールを使用できます。これにより、本製品は指定されたロールの資格情報を取得しようと試みます。 (すでにEC2 インスタンスなどで接続されているのではなく)AWS に接続している場合は、役割を担うIAM ユーザーのAccessKeySecretKey を追加で指定する必要があります。AWS ルートユーザーのAccessKey およびSecretKey を指定する場合、 ロールは使用できません。

MFA での認証

多要素認証を必要とするユーザーおよびロールには、MFASerialNumber およびMFAToken 接続プロパティを指定してください。 これにより、本製品は一時的な認証資格情報を取得するために、リクエストでMFA 認証情報を送信します。一時的な認証情報の有効期間 (デフォルトは3600秒)は、TemporaryTokenDuration プロパティを介して制御できます。

Amazon Athena への接続

AccessKeySecretKey プロパティに加え、DatabaseS3StagingDirectoryRegion を設定します。Region をAmazon Athena データがホストされているリージョンに設定します。S3StagingDirectory をクエリの結果を格納したいS3内のフォルダに設定します。

接続にDatabase が設定されていない場合は、本製品はAmazon Athena に設定されているデフォルトデータベースに接続します。

CData Amazon Athena Connector をインストールしたら、次のように必要なモジュールをインストールし、Python オブジェクトでAmazon Athena にアクセスします。

必要なモジュールのインストール

pip で必要なモジュールおよびフレームワークをインストールします:

pip install petl
pip install pandas

Python でAmazon Athena データをETL 処理するアプリを構築

モジュールとフレームワークをインストールしたら、ETL アプリケーションを組んでいきます。コードのスニペットは以下の通りです。フルコードは記事の末尾に付いています。

CData Connector を含むモジュールをインポートします:

import petl as etl
import pandas as pd
import cdata.amazonathena as mod

接続文字列で接続を確立します。connect 関数を使って、CData Amazon Athena Connector からAmazon Athena への接続を行います

cnxn = mod.connect("AccessKey='a123';SecretKey='s123';Region='IRELAND';Database='sampledb';S3StagingDirectory='s3://bucket/staging/';")

Amazon Athena をクエリするSQL 文の作成

Amazon Athena にはSQL でデータアクセスが可能です。Customers エンティティからのデータを読み出します。

sql = "SELECT Name, TotalDue FROM Customers WHERE CustomerId = '12345'"

Amazon Athena Data のETL 処理

DataFrame に格納されたクエリ結果を使って、petl でExtract(取得)、Transform(加工)、Load(ロード)を組みます。この例では、Amazon Athena data を取得して、TotalDue カラムでデータをソートして、CSV ファイルにデータをロードします。

table1 = etl.fromdb(cnxn,sql)

table2 = etl.sort(table1,'TotalDue')

etl.tocsv(table2,'customers_data.csv')

CData Python Connector for Amazon Athena を使えば、データベースを扱う場合と同感覚で、Amazon Athena data を扱うことができ、petl のようなETL パッケージから直接データにアクセスが可能になります。

製品の無償トライアル情報

Amazon Athena Python Connector の30日の無償トライアル をぜひダウンロードして、Amazon Athena data への接続をPython アプリやスクリプトから簡単に作成しましょう。



フルソースコード

import petl as etl
import pandas as pd
import cdata.amazonathena as mod

cnxn = mod.connect("AccessKey='a123';SecretKey='s123';Region='IRELAND';Database='sampledb';S3StagingDirectory='s3://bucket/staging/';")

sql = "SELECT Name, TotalDue FROM Customers WHERE CustomerId = '12345'"

table1 = etl.fromdb(cnxn,sql)

table2 = etl.sort(table1,'TotalDue')

etl.tocsv(table2,'customers_data.csv')
 
 
ダウンロード