ノーコードでクラウド上のデータとの連携を実現。
詳細はこちら →CData
こんにちは!ウェブ担当の加藤です。マーケ関連のデータ分析や整備もやっています。
Pythonエコシステムには、多くのモジュールがあり、システム構築を素早く効率的に行うことができます。CData Python Connector for AmazonAthena は、pandas、Matplotlib モジュール、SQLAlchemy ツールキットから使用することで Amazon Athena にデータ連携するPython アプリケーションを構築し、Amazon Athena データを可視化できます。 本記事では、SQLAlchemy でAmazon Athena に連携して、データを取得、、更新、挿入、削除 する方法を説明します。
CData Python Connectors は、以下のような特徴を持った製品です。
CData Python Connectors では、1.データソースとしてAmazon Athena の接続を設定、2.Python からPython Connectors との接続を設定、という2つのステップだけでデータソースに接続できます。以下に具体的な設定手順を説明します。
pip でSQLAlchemy ツールキットをインストールします:
pip install sqlalchemy
モジュールのインポートを忘れずに行います:
import sqlalchemy
次は、接続文字列で接続を確立します。create_engine 関数を使って、Amazon Athena データに連携するEngne を作成します。
engine = create_engine("amazonathena///?AccessKey='a123'&SecretKey='s123'&Region='IRELAND'&Database='sampledb'&S3StagingDirectory='s3://bucket/staging/'")
Amazon Athena リクエストの認証には、アカウントの管理のクレデンシャルか、IAM ユーザーのカスタムPermission を設定します。 AccessKey にAccess Key Id、SecretKey にはSecret Access Key を設定します。
AWS アカウントアドミニストレータとしてアクセスできる場合でも、AWS サービスへの接続にはIAM ユーザークレデンシャルを使用することが推奨されます。
IAM ユーザーのクレデンシャル取得は以下のとおり:
AWS ルートアカウントのクレデンシャル取得は以下のとおり:
EC2 インスタンスからCData 製品を使用していて、そのインスタンスにIAM ロールが割り当てられている場合は、認証にIAM ロールを使用できます。 これを行うには、UseEC2Roles をtrue に設定しAccessKey とSecretKey を空のままにします。 CData 製品は自動的にIAM ロールの認証情報を取得し、それらを使って認証します。
多くの場合、認証にはAWS ルートユーザーのダイレクトなセキュリティ認証情報ではなく、IAM ロールを使用することをお勧めします。 代わりにRoleARN を指定してAWS ロールを使用できます。これにより、CData 製品は指定されたロールの資格情報を取得しようと試みます。 (すでにEC2 インスタンスなどで接続されているのではなく)AWS に接続している場合は、役割を担うIAM ユーザーのAccessKeyと SecretKey を追加で指定する必要があります。AWS ルートユーザーのAccessKey およびSecretKey を指定する場合、 ロールは使用できません。
多要素認証を必要とするユーザーおよびロールには、MFASerialNumber およびMFAToken 接続プロパティを指定してください。 これにより、CData 製品は一時的な認証資格情報を取得するために、リクエストでMFA 認証情報を送信します。一時的な認証情報の有効期間 (デフォルトは3600秒)は、TemporaryTokenDuration プロパティを介して制御できます。
AccessKey とSecretKey プロパティに加え、Database、S3StagingDirectory、Region を設定します。Region をAmazon Athena データがホストされているリージョンに設定します。S3StagingDirectory をクエリの結果を格納したいS3内のフォルダに設定します。
接続にDatabase が設定されていない場合は、CData 製品はAmazon Athena に設定されているデフォルトデータベースに接続します。
接続を確立したら、OR マッパーでモデル化するテーブルのマッピングクラスを宣言します。本記事では、Customers テーブルを使います。sqlalchemy.ext.declarative.declarative_base 関数を使って、新しいクラスにフィールド(カラム)を定義します。
base = declarative_base() class Customers(base): __tablename__ = "Customers" Name = Column(String,primary_key=True) TotalDue = Column(String) ...
マッピングクラスができたので、セッションオブジェクトを使ってデータソースをクエリすることができます。セッションにEngine をバインドして、セッションのquery メソッドにマッピングクラスを提供します。
engine = create_engine("amazonathena///?AccessKey='a123'&SecretKey='s123'&Region='IRELAND'&Database='sampledb'&S3StagingDirectory='s3://bucket/staging/'") factory = sessionmaker(bind=engine) session = factory() for instance in session.query(Customers).filter_by(CustomerId="12345"): print("Name: ", instance.Name) print("TotalDue: ", instance.TotalDue) print("---------")
ほかの方法としては、execute メソッドを適切なテーブルオブジェクトに使うことが可能です。以下のコードはアクティブなsession に対して有効です。
Customers_table = Customers.metadata.tables["Customers"] for instance in session.execute(Customers_table.select().where(Customers_table.c.CustomerId == "12345")): print("Name: ", instance.Name) print("TotalDue: ", instance.TotalDue) print("---------")
より複雑なクエリとして、JOIN、集計、Limit などが利用可能です。詳細はヘルプドキュメントをご覧ください。
Amazon Athena データへの挿入には、マップされたクラスのインスタンスを定義し、アクティブな session に追加します。commit 関数を呼び出して、Amazon Athena にすべての追加インスタンスを送ります。
new_rec = Customers(Name="placeholder", CustomerId="12345") session.add(new_rec) session.commit()
Amazon Athena データの更新には、更新するレコードをフィルタクエリとともにフェッチします。そして、フィールドの値を変更し、セッションでcommit 関数を呼んで、Amazon Athena にレコードを追加します。
updated_rec = session.query(Customers).filter_by(SOME_ID_COLUMN="SOME_ID_VALUE").first() updated_rec.CustomerId = "12345" session.commit()
Amazon Athena データの削除には、フィルタクエリと一緒に対象となるレコードをフェッチします。そして、アクティブsession でレコードを削除し、セッションでcommit 関数を呼び出して、該当するレコードの削除を実行します。
deleted_rec = session.query(Customers).filter_by(SOME_ID_COLUMN="SOME_ID_VALUE").first() session.delete(deleted_rec) session.commit()
このようにCData Python Connector と併用することで、270を超えるSaaS、NoSQL データをPython からコーディングなしで扱うことができます。30日の無償評価版が利用できますので、ぜひ自社で使っているクラウドサービスやNoSQL と合わせて活用してみてください。
日本のユーザー向けにCData Python Connector は、UI の日本語化、ドキュメントの日本語化、日本語でのテクニカルサポートを提供しています。