今すぐお試しください!

製品の詳細CData SSIS Components for Amazon Athena を確認して、無償評価版をダウンロード:

今すぐダウンロード

Biml を使って、Athena Data をSQL Server に同期するSSIS タスクを作成

CData SSIS Components を使って、Biml で Athena data をSQL Server に同期するタスクを作成します。

SQL Server に基幹業務データのバックアップを保管することは、ビジネス上のセーフティネットです。また、ユーザーはSQL Server のバックアップデータからレポーティングや分析を簡単に行うことができます。Biml は、SSIS パッケージのようなMicrosoft SQL Server BI Object で使えるXML の方言です。CData SSIS Components をBiml と組み合わせて使うことで、Athena data に連携するSSIS パッケージを簡単ンい作成できます。以下のメリットがあります:

  • Built-in metadata discovery — CData SSIS components expose metadata just like working with SQL Server, even dynamically generating schema for schema-less data sources
  • Dynamic SSIS task generation — Use code nuggets in Biml to build SSIS tasks by iterating over discovered metadata
  • Read from and write to Athena — Native source and destination components make Athena look just like a database

This article demonstrates how to use Biml with the CData SSIS Tasks for Athena to dynamically build SSIS tasks (one for each Athena entity) to replicate Athena data to a Microsoft SQL Server database. We step through the Biml file one section at a time but have included the complete Biml file at the end of the article.

Getting Started

In order to use Biml in an SSIS Project in Visual Studio, install BimlExpress. Once you install BimlExpress, open Visual Studio, create a new Integration Services project, and add a new Biml file.

Building the Biml File

With Biml, you can write scripting to dynamically generate SSIS projects, packages, and tasks. To see the Biml file for an existing project (and gain insights on using Biml with CData SSIS Tasks), simply create your tasks and then right-click the project and select Convert SSIS Packages to Biml.

C# Code

  1. Use directives <#@ .. #> to import necessary namespaces and the assembly for the CData SSIS Components for Athena.
    <#@ template language="C#" hostspecific="true"#>
    <#@ import namespace="System.Data"#>
    <#@ import namespace="System.IO"#>
    <#@ import namespace="System.Collections"#>
    <#@ import namespace="System.Data.CData.AmazonAthena"#>
    <#@ assembly name="C:\Program Files\CData\CData SSIS Components for Athena 2018\lib\CData.SSIS2017.AmazonAthena.dll"#>
    
  2. In a new control nugget <# ... #>, create variables for values that will be used throughout the Biml script, including a connection string for Athena and structures to store the Athena metadata.

    Amazon Athena への接続

    Amazon Athena リクエストの認証には、アカウントの管理のクレデンシャルか、IAM ユーザーのカスタムPermission を設定します。 AccessKey にAccess Key Id、SecretKey にはSecret Access Key を設定します。

    Note: AWS アカウントアドミニストレータとしてアクセスできる場合でも、AWS サービスへの接続にはIAM ユーザークレデンシャルを使用することが推奨されます。

    Access Key の取得

    IAM ユーザーのクレデンシャル取得は以下の通り:

    1. IAM コンソールにログイン
    2. Navigation ペインでUsers を選択。
    3. To create or manage the access keys for a user, select the user and then select the Security Credentials tab.

    To obtain the credentials for your AWS root account, follow the steps below:

    1. Sign into the AWS Management console with the credentials for your root account.
    2. Select your account name or number and select My Security Credentials in the menu that is displayed.
    3. Click Continue to Security Credentials and expand the Access Keys section to manage or create root account access keys.

    Authenticating from an EC2 Instance

    If you are using the CData Data Provider for Amazon Athena 2018 from an EC2 Instance and have an IAM Role assigned to the instance, you can use the IAM Role to authenticate. To do so, set UseEC2Roles to true and leave AccessKey and SecretKey empty. The CData Data Provider for Amazon Athena 2018 will automatically obtain your IAM Role credentials and authenticate with them.

    Authenticating as an AWS Role

    In many situations it may be preferable to use an IAM role for authentication instead of the direct security credentials of an AWS root user. An AWS role may be used instead by specifying the RoleARN. This will cause the CData Data Provider for Amazon Athena 2018 to attempt to retrieve credentials for the specified role. If you are connecting to AWS (instead of already being connected such as on an EC2 instance), you must additionally specify the AccessKey and SecretKey of an IAM user to assume the role for. Roles may not be used when specifying the AccessKey and SecretKey of an AWS root user.

    Authenticating with MFA

    For users and roles that require Multi-factor Authentication, specify the MFASerialNumber and MFAToken connection properties. This will cause the CData Data Provider for Amazon Athena 2018 to submit the MFA credentials in a request to retrieve temporary authentication credentials. Note that the duration of the temporary credentials may be controlled via the TemporaryTokenDuration (default 3600 seconds).

    Connecting to Amazon Athena

    In addition to the AccessKey and SecretKey properties, specify Database, S3StagingDirectory and Region. Set Region to the region where your Amazon Athena data is hosted. Set S3StagingDirectory to a folder in S3 where you would like to store the results of queries.

    If Database is not set in the connection, the data provider connects to the default database set in Amazon Athena.

    
    var amazonathenaConnectionString = "AccessKey='a123';SecretKey='s123';Region='IRELAND';Database='sampledb';S3StagingDirectory='s3://bucket/staging/';";
    var replicationServer = "SERVER";
    var replicationCatalog = "CATALOG";
    var replicationUserID = "sqluser";
    var replicationPassword = "sqlpassword";
    
    List<string> allEntityNames = new List<string>();
    Hashtable entitySchema = new Hashtable();
    
  3. In the same control nugget used to defined variables, use ADO.NET code to programmatically query the Athena entities (tables) and fields (columns).
    using (AmazonAthenaConnection connection = new AmazonAthenaConnection(amazonathenaConnectionString)) {
      connection.Open();
      var entities = connection.GetSchema("Tables").Rows;
      foreach (DataRow entity in entities)
      {
        allEntityNames.Add(entity["TABLE_NAME"].ToString());
      }
      foreach (string entity in allEntityNames){
        var columns = connection.GetSchema("Columns", new string [] {entity}).Rows;
        entitySchema.Add(entity,columns);
      }
    }
    

Class Nugget

In our Biml script to create the replication tasks, there are several places where repeated XML elements are created dynamically (mostly for columns in SSIS tasks). Instead of repeating the code, add a class nugget <#+ ... #> and create a helper class with methods to consolidate repeated code (full code at the end of the article).

  1. Add public static variables to determine which type of XML element to create.
    public static int OUTPUT_WITH_ERROR = 0;
    public static int EXTERNAL = 1;
    public static int OUTPUT = 2;
    public static int DATAOVERRIDE_COLUMN = 4;
    
  2. Add a public method to build a SQL statement for use in the ExecuteSQL task used to drop existing tables and create a new table for the replicated data.
    // Dynamically builds a DROP TABLE and CREATE statement
    // for each entity (table) in Athena using the table name and metadata.
    public static string GetDeleteAndCreateStatement(string tableName, DataRowCollection columns) {
      ...
    }
    
  3. Add a public method to build the collection of column-based XML elements.
     
    // Dynamically build various column-based XML elements
    // for each entity (table) in Athena based on the column 
    // metadata and the parent element
    public static string GetColumnDefs(DataRowCollection columns, int columnType){
      ...
    }
    

Biml Script

Now that you have the table metadata and a Helper class to reduce repeated code, write the Biml script to dynamically create your replication packages.

  1. Start by adding a CustomSsisConnection element for the CData SSIS Tasks. Note that the ObjectData attribute must be XML encoded. A typical connecting string looks similar to the following (note the use of the amazonathenaConnectionString variable for the ConnectionString property:
    <AmazonAthenaConnectionManager>
      <Property Name="ConnectionString"><#=amazonathenaConnectionString#></Property>
    </AmazonAthenaConnectionManager>
    

    After configuring the connection to the CData SSIS Task, configure a connection to the replication database. The completed Connections element looks like the following (note the use of text nuggets <#= ... #> to add variables for connection string values):

    <Connections>
      <CustomSsisConnection Name="CData AmazonAthena Connection Manager" CreationName = "CDATA_AMAZONATHENA" ObjectData = "&lt;AmazonAthenaConnectionManager&gt; &lt;Property Name=&quot;ConnectionString&quot;&gt; <#=amazonathenaConnectionString#>&lt;/Property&gt; &lt;/AmazonAthenaConnectionManager&gt;" />
      <Connection Name="Destination" ConnectionString="Data Source=<#=replicationServer#>;User ID=<#=replicationUserID#>;Password=<#=replicationPassword#>;Initial Catalog=<#=replicationCatalog#>;Provider=SQLNCLI11.1;"/>
    </Connections>
    
  2. With the Connections element configured, you are ready to build our replication package. In the package, the Biml script create an ExecuteSQL task and a Dataflow task for each table to be replicated.

    To build each set of tasks, use a while loop in a control nugget to iterate through the entity (table) names:

    int entityCounter = 0; while(entityCounter < allEntityNames.Count){
    var tableName = allEntityNames[entityCounter].ToString();
    DataRowCollection columns = ((DataRowCollection)entitySchema[tableName]);
    
    • ExecuteSQL Task

      In the ExecuteSQL task, execute a SQL query to drop any existing tables that have the same name as our Athena entity (table) and create a new table based on the metadata discovered using the CData SSIS Component.

      To create the query dynamically, use the Helper.GetDeleteAndCreateStatement() helper function.

    • Dataflow Task

      Within the Dataflow use a CustomComponent as the source component and an OleDbDestination as the destination.

      • CustomComponent Element

        The CustomComponent element uses the CData SSIS Source component to retrieve Athena data. Start by configuring the component to use with the CData component.

        
        <CustomComponent Name="CData Athena Source" ComponentTypeName="CData.SSIS.AmazonAthena.AmazonAthenaSource" Version="18" ContactInfo="support@cdata.com" UsesDispositions="true">
        ...
        </CustomComponent>
        

        DataflowOverrides and OutputPaths Elements

        The next step after configuring the connection is to add Columns elements to the OutputPath child element of the DataflowOverrides element. To do so, call the Helper.GetColumnDefs() helper function.

        Use the same Helper class to add columns to the OutputColumns and ExternalColumns child elements of the various OutputPaths elements.

        The definitions created provide information about the input, output, and error information for the SSIS component.

        
        <DataflowOverrides>
          <OutputPath OutputPathName="CData AmazonAthena Source Output">
            <Columns>
        <#=HelperClass.GetColumnDefs(columns,HelperClass.DATAOVERRIDE_COLUMN) #>
            </Columns>
          </OutputPath>
        </DataflowOverrides>
        ...
        <OutputPaths>
          <OutputPath Name="CData AmazonAthena Source Output">
            <OutputColumns>
        <#=HelperClass.GetColumnDefs(columns,HelperClass.OUTPUT_WITH_ERROR) #>
            </OutputColumns>
            <ExternalColumns>
        <#=HelperClass.GetColumnDefs(columns,HelperClass.EXTERNAL) #>
            </ExternalColumns>
          </OutputPath>
          <OutputPath Name="CData AmazonAthena Source Error Output" IsErrorOutput="true">
            <OutputColumns>
        <#=HelperClass.GetColumnDefs(columns,HelperClass.OUTPUT) #      
            </OutputColumns>
          </OutputPath>
        </OutputPaths>
        

        CustomProperties Element

        The CData SSIS tasks are surfaced in SSIS as custom components with a series of required CustomProperties:

        
        <CustomProperties>
          <CustomProperty Name="SQLStatement" DataType="Null" UITypeEditor="Microsoft.DataTransformationServices.Controls.ModalMultilineStringEditor, Microsoft.DataTransformationServices.Controls, Version= 10.0.0.0, Culture=neutral, PublicKeyToken=89845dcd8080cc91" SupportsExpression="true"></CustomProperty>
          <CustomProperty Name="AccessMode" DataType="Int32" TypeConverter="CData.SSIS.AmazonAthena.AccessModeToStringConverter">0</CustomProperty>
          <CustomProperty Name="TableOrView" DataType="String" UITypeEditor="Microsoft.DataTransformationServices.Controls.ModalMultilineStringEditor, Microsoft.DataTransformationServices.Controls, Version= 10.0.0.0, Culture=neutral, PublicKeyToken=89845dcd8080cc91" SupportsExpression="true">[<#=tableName#>]</CustomProperty>
          <CustomProperty Name="ExecStoredProcedure" DataType="Boolean">false</CustomProperty>
        </CustomProperties>
        

        Connections Element

        The last element to add to the CustomComponent element is a Connections element, attaching the previously defined connection to the task:

        
        <Connections>
          <Connection Name="AmazonAthena 2018 Connection" ConnectionName="CData AmazonAthena Connection Manager" />
        </Connections>
        
      • OleDbDestination Element

        The final piece of the Dataflow task is the OleDbDestination element. Attach the previously defined OleDbConnection to the element, set the InputPath and ExternalTableOutput:

        
        <OleDbDestination Name="OLE DB Destination" ConnectionName="Destination" CheckConstraints="false">
          <InputPath OutputPathName="CData AmazonAthena Source.CData AmazonAthena Source Output" />
          <ExternalTableOutput Table="[<#=tableName#>]" />
        </OleDbDestination>
        

  3. Use a control nugget to increment the counter used to iterate over the collection of entity (table) names. Do this within the Tasks element, after the end of the Dataflow element:
    
    ...
              </Dataflow>          
    <# entityCounter++;}#>
            </Tasks>
        </Package>
      </Packages>
    </Biml>
    

Build the SSIS Project

Once the Biml file is written, right-click on the Biml file in Server Explorer and select Generate SSIS Packages. At this point, Visual Studio and BimlExpress will translate the Biml file into SSIS package(s), ready to be run.

Run the package to begin replicating your Athena data to a SQL Server database (or any other destination you choose).

Free Trial & More Information

With the CData SSIS Components for Athena, you get SQL access to your Athena data directly from SSIS packages. And with Biml, you can automatically generate those packages. For more information about the CData SSIS Components for Athena, refer to the product page. You can always get started with a free, 30-day trial. As always, our world-class CData Support Team is available if you have any questions.

Complete Biml File


<#@ template language="C#" hostspecific="true"#>
<#@ import namespace="System.Data"#>
<#@ import namespace="System.IO"#>
<#@ import namespace="System.Collections"#>
<#@ import namespace="System.Data.CData.AmazonAthena"#>
<#@ assembly name="C:\Program Files\CData\CData SSIS Components for AmazonAthena 2018\lib\CData.SSIS2017.AmazonAthena.dll"#>
<#
var amazonathenaConnectionString = ""AccessKey='a123';SecretKey='s123';Region='IRELAND';Database='sampledb';S3StagingDirectory='s3://bucket/staging/';";
var replicationServer = "JDG";
var replicationCatalog = "BIML";
var replicationUserID = "sqltest";
var replicationPassword = "sqltest";

List<string> allEntityNames = new List<string>();
Hashtable entitySchema = new Hashtable();
using (AmazonAthenaConnection connection = new AmazonAthenaConnection(amazonathenaConnectionString)) {
    connection.Open();
    var entities = connection.GetSchema("Tables").Rows;
    foreach (DataRow entity in entities)
    {
        allEntityNames.Add(entity["TABLE_NAME"].ToString());
    }
    foreach (string entity in allEntityNames){
        var columns = connection.GetSchema("Columns", new string [] {entity}).Rows;
        entitySchema.Add(entity,columns);
    }
}#>
<Biml xmlns="http://schemas.varigence.com/biml.xsd">
  <Connections>
    <CustomSsisConnection Name="CData AmazonAthena Connection Manager" CreationName="CDATA_AMAZONATHENA" ObjectData="&lt;AmazonAthenaConnectionManager&gt;&lt;Property Name=&quot;ConnectionString"&gt;<#=amazonathenaConnectionString#>&lt;/Property&gt;&lt;/AmazonAthenaConnectionManager&gt;"/>
    <Connection Name="Destination" ConnectionString="Data Source=<#=replicationServer#>;User ID=<#=replicationUserID#>;Password=<#=replicationPassword#>;Initial Catalog=<#=replicationCatalog#>;Provider=SQLNCLI11.1;"/>
  </Connections>
  <Packages>
    <Package Name="Replicate AmazonAthena Package" Language="None" ConstraintMode="LinearOnCompletion" ProtectionLevel="EncryptSensitiveWithUserKey">
      <Tasks>
<# int entityCounter = 0; while(entityCounter < allEntityNames.Count){
   var tableName = allEntityNames[entityCounter].ToString();
   if (tableName.Equals("IdpEventLog")) break;
   DataRowCollection columns = ((DataRowCollection)entitySchema[tableName]);#>
        <ExecuteSQL Name="Create <#=tableName#> Replication Table" ConnectionName="Destination">
          <DirectInput>
<#=HelperClass.GetDeleteAndCreateStatement(tableName,columns)#>
          </DirectInput>
        </ExecuteSQL>
        <Dataflow Name="Replicate <#=tableName#>">
          <Transformations>
            <CustomComponent Name="CData AmazonAthena Source" ComponentTypeName="CData.SSIS.AmazonAthena.AmazonAthenaSource" Version="18" ContactInfo="support@cdata.com" UsesDispositions="true">
              <DataflowOverrides>
                <OutputPath OutputPathName="CData AmazonAthena Source Output">
                  <Columns>
<#=HelperClass.GetColumnDefs(columns,HelperClass.DATAOVERRIDE_COLUMN) #>
                  </Columns>
                </OutputPath>
              </DataflowOverrides>
              <CustomProperties>
                <CustomProperty Name="SQLStatement" DataType="Null" UITypeEditor="Microsoft.DataTransformationServices.Controls.ModalMultilineStringEditor, Microsoft.DataTransformationServices.Controls, Version= 10.0.0.0, Culture=neutral, PublicKeyToken=89845dcd8080cc91" SupportsExpression="true"></CustomProperty>
                <CustomProperty Name="AccessMode" DataType="Int32" TypeConverter="CData.SSIS.AmazonAthena.AccessModeToStringConverter">0</CustomProperty>
                <CustomProperty Name="TableOrView" DataType="String" UITypeEditor="Microsoft.DataTransformationServices.Controls.ModalMultilineStringEditor, Microsoft.DataTransformationServices.Controls, Version= 10.0.0.0, Culture=neutral, PublicKeyToken=89845dcd8080cc91" SupportsExpression="true">[<#=tableName#>]</CustomProperty>
                <CustomProperty Name="ExecStoredProcedure" DataType="Boolean">false</CustomProperty>
              </CustomProperties>
              <OutputPaths>
                <OutputPath Name="CData AmazonAthena Source Output">
                  <OutputColumns>
<#=HelperClass.GetColumnDefs(columns,HelperClass.OUTPUT_WITH_ERROR) #>
                  </OutputColumns>
                  <ExternalColumns>
<#=HelperClass.GetColumnDefs(columns,HelperClass.EXTERNAL) #>
                  </ExternalColumns>
                </OutputPath>
                <OutputPath Name="CData AmazonAthena Source Error Output" IsErrorOutput="true">
                  <OutputColumns>
<#=HelperClass.GetColumnDefs(columns,HelperClass.OUTPUT) #>                     
                  </OutputColumns>
                </OutputPath>
              </OutputPaths>
              <Connections>
                <Connection Name="AmazonAthena 2018 Connection" ConnectionName="CData AmazonAthena Connection Manager" />
              </Connections>
            </CustomComponent>
            <OleDbDestination Name="OLE DB Destination" ConnectionName="Destination" CheckConstraints="false">
              <InputPath OutputPathName="CData AmazonAthena Source.CData AmazonAthena Source Output" />
              <ExternalTableOutput Table="[<#=tableName#>]" />
            </OleDbDestination>
          </Transformations>
        </Dataflow>          
<# entityCounter++;}#>
      </Tasks>
    </Package>
  </Packages>
</Biml>

<#+
public static class HelperClass {
    
    public static int OUTPUT_WITH_ERROR = 0;
    public static int EXTERNAL = 1;
    public static int OUTPUT = 2;
    public static int DATAOVERRIDE_COLUMN = 4;
    
    public static string GetDeleteAndCreateStatement(string tableName, DataRowCollection columns) {
        var dropAndCreateStatement = 
            "IF EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[{0}]') AND type IN (N'U'))\r\n" + 
            "DROP TABLE [{0}];\r\n" + 
            "CREATE TABLE [{0}]\r\n" + 
            "(\r\n" + 
            "{1}\r\n" + 
            ")\r\n" + 
            "ON \"default\";";
        string columnDefs = "";
        foreach (DataRow column in columns){
            string columnDef = "    [{0}] {1}";
            string dataType = column["DATA_TYPE"].ToString();
            if (dataType.ToLower().StartsWith("bool")) {
                dataType = "bit";
            } else if (dataType.ToLower().Equals("real")) {
                dataType = "float";
            } else if (dataType.ToLower().Contains("varchar")) {
                var columnLength = column["CHARACTER_MAXIMUM_LENGTH"];
                dataType = "nvarchar(" + ((int)columnLength > 4000 ? "MAX" : columnLength) + ")";         
            } 
            columnDefs += String.Format(columnDef,column["COLUMN_NAME"],dataType) + ",\r\n";
            
        }
        columnDefs = columnDefs.Remove(columnDefs.LastIndexOf(",\r\n"),",\r\n".Length);
        return String.Format(dropAndCreateStatement,tableName,columnDefs);
    }
    
    public static string GetColumnDefs(DataRowCollection columns, int columnType){
        var columnDefTemplate = "";
        var columnElements = "";
        
        if (columnType == DATAOVERRIDE_COLUMN) {
            columnDefTemplate = "                      <Column ErrorRowDisposition=\"FailComponent\" TruncationRowDisposition=\"FailComponent\" ColumnName=\"{0}\" />\r\n";
            foreach(DataRow column in columns) {
                var columnName = column["COLUMN_NAME"];
                columnElements += String.Format(columnDefTemplate,columnName);
            }
            return columnElements;
        } 
        if (columnType == OUTPUT_WITH_ERROR)
            columnDefTemplate = "                      <OutputColumn Name=\"{0}\" {1} ExternalMetadataColumnName=\"{0}\" ErrorRowDisposition=\"FailComponent\" TruncationRowDisposition=\"FailComponent\" />\r\n";
        else if (columnType == EXTERNAL)
            columnDefTemplate = "                      <ExternalColumn Name=\"{0}\" {1} />\r\n";
        else if (columnType == OUTPUT)
            columnDefTemplate = "                      <OutputColumn Name=\"{0}\" {1} />\r\n";
        
        foreach(DataRow column in columns){ 
            var columnName = column["COLUMN_NAME"];
            var dataTypeRaw = column["DATA_TYPE"].ToString().ToLower();
            var typeAndRelatedInfo = "";
            if (dataTypeRaw.Equals("bool")) {
                typeAndRelatedInfo = "DataType=\"Boolean\"";
            } else if (dataTypeRaw.Equals("date")) {
                typeAndRelatedInfo = "DataType=\"Date\" SsisDataTypeOverride=\"DT_DBDATE\"";
            } else if (dataTypeRaw.Equals("datetime")) {
                typeAndRelatedInfo = "DataType=\"DateTime\"";
            } else if (dataTypeRaw.Equals("real")) {
                typeAndRelatedInfo = ((int)column["NumericPrecision"] > 0 ? "Precision=\"18\" " : " ") + ((int)column["NumericScale"] > 0 ? "Scale=\"15\" " : " ") + "DataType=\"Decimal\"";
            } else if (dataTypeRaw.Equals("varchar")) {
                var columnLength = column["CHARACTER_MAXIMUM_LENGTH"];
                if ((int)columnLength > 4000) {
                    typeAndRelatedInfo = "DataType=\"String\"";
                } else {
                    typeAndRelatedInfo = "Length=\"" + columnLength + "\" DataType=\"String\" CodePage=\"1252\"";
                }
            }
            columnElements += String.Format(columnDefTemplate,columnName,typeAndRelatedInfo);
        }
        return columnElements;
    }
}
#>
 
 
ダウンロード