ノーコードでクラウド上のデータとの連携を実現。
詳細はこちら →30日間の無償トライアル
無償トライアルはこちら製品の詳細
パワフルなSSIS Source & Destination Components で、SQL Server とAzure Data Lake Storage をSSIS ワークフローで連携。
Azure Data Lake Storage Data Flow Components を使って、Azure Data Lake Storage データを簡単に同期。データ同期、ローカルバックアップ、ワークフロー自動化に最適!
CData
こんにちは!ドライバー周りのヘルプドキュメントを担当している古川です。
SQL Server に基幹業務データのバックアップを保管することは、ビジネス上のセーフティネットです。また、ユーザーはSQL Server のバックアップデータからレポーティングや分析を簡単に行うことができます。Biml は、SSIS パッケージのようなMicrosoft SQL Server BI Object で使えるXML の方言です。CData SSIS Components をBiml と組み合わせて使うことで、Azure Data Lake Storage データ に連携するSSIS パッケージを簡単ンい作成できます。以下のメリットがあります:
This article demonstrates how to use Biml with the CData SSIS Tasks for ADLS to dynamically build SSIS tasks (one for each Azure Data Lake Storage entity) to replicate Azure Data Lake Storage データ to a Microsoft SQL Server database. We step through the Biml file one section at a time but have included the complete Biml file at the end of the article.
In order to use Biml in an SSIS Project in Visual Studio, install BimlExpress. Once you install BimlExpress, open Visual Studio, create a new Integration Services project, and add a new Biml file.
With Biml, you can write scripting to dynamically generate SSIS projects, packages, and tasks. To see the Biml file for an existing project (and gain insights on using Biml with CData SSIS Tasks), simply create your tasks and then right-click the project and select Convert SSIS Packages to Biml.
<#@ template language="C#" hostspecific="true"#> <#@ import namespace="System.Data"#> <#@ import namespace="System.IO"#> <#@ import namespace="System.Collections"#> <#@ import namespace="System.Data.CData.ADLS"#> <#@ assembly name="C:\Program Files\CData\CData SSIS Components for ADLS 2018\lib\CData.SSIS2017.ADLS.dll"#>
In a new control nugget <# ... #>, create variables for values that will be used throughout the Biml script, including a connection string for ADLS and structures to store the Azure Data Lake Storage metadata.
Gen 1 DataLakeStorage アカウントに接続するには、はじめに以下のプロパティを設定します。
Gen 1 は、認証方法としてAzure Active Directory OAuth(AzureAD)およびマネージドサービスID(AzureMSI)をサポートしています。認証方法は、ヘルプドキュメントの「Azure DataLakeStorage Gen 1 への認証」セクションを参照してください。
Gen 2 DataLakeStorage アカウントに接続するには、はじめに以下のプロパティを設定します。
Gen 2は、認証方法としてアクセスキー、共有アクセス署名(SAS)、Azure Active Directory OAuth(AzureAD)、マネージドサービスID(AzureMSI)など多様な方法をサポートしています。AzureAD、AzureMSI での認証方法は、ヘルプドキュメントの「Azure DataLakeStorage Gen 2 への認証」セクションを参照してください。
アクセスキーを使用して接続するには、AccessKey プロパティを取得したアクセスキーの値に、AuthScheme を「AccessKey」に設定します。
Azure ポータルからADLS Gen2 ストレージアカウントのアクセスキーを取得できます。
共有アクセス署名を使用して接続するには、SharedAccessSignature プロパティを接続先リソースの有効な署名に設定して、AuthScheme を「SAS」に設定します。 共有アクセス署名は、Azure Storage Explorer などのツールで生成できます。
var adlsConnectionString = "Schema=ADLSGen2;Account=myAccount;FileSystem=myFileSystem;AccessKey=myAccessKey;InitiateOAuth=GETANDREFRESH"; var replicationServer = "SERVER"; var replicationCatalog = "CATALOG"; var replicationUserID = "sqluser"; var replicationPassword = "sqlpassword"; List<string> allEntityNames = new List<string>(); Hashtable entitySchema = new Hashtable();
using (ADLSConnection connection = new ADLSConnection(adlsConnectionString)) { connection.Open(); var entities = connection.GetSchema("Tables").Rows; foreach (DataRow entity in entities) { allEntityNames.Add(entity["TABLE_NAME"].ToString()); } foreach (string entity in allEntityNames){ var columns = connection.GetSchema("Columns", new string [] {entity}).Rows; entitySchema.Add(entity,columns); } }
In our Biml script to create the replication tasks, there are several places where repeated XML elements are created dynamically (mostly for columns in SSIS tasks). Instead of repeating the code, add a class nugget <#+ ... #> and create a helper class with methods to consolidate repeated code (full code at the end of the article).
public static int OUTPUT_WITH_ERROR = 0; public static int EXTERNAL = 1; public static int OUTPUT = 2; public static int DATAOVERRIDE_COLUMN = 4;
// Dynamically builds a DROP TABLE and CREATE statement // for each entity (table) in Azure Data Lake Storage using the table name and metadata. public static string GetDeleteAndCreateStatement(string tableName, DataRowCollection columns) { ... }
// Dynamically build various column-based XML elements // for each entity (table) in Azure Data Lake Storage based on the column // metadata and the parent element public static string GetColumnDefs(DataRowCollection columns, int columnType){ ... }
Now that you have the table metadata and a Helper class to reduce repeated code, write the Biml script to dynamically create your replication packages.
<ADLSConnectionManager> <Property Name="ConnectionString"><#=adlsConnectionString#></Property> </ADLSConnectionManager>
After configuring the connection to the CData SSIS Task, configure a connection to the replication database. The completed Connections element looks like the following (note the use of text nuggets <#= ... #> to add variables for connection string values):
<Connections> <CustomSsisConnection Name="CData ADLS Connection Manager" CreationName = "CDATA_ADLS" ObjectData = "<ADLSConnectionManager> <Property Name="ConnectionString"> <#=adlsConnectionString#></Property> </ADLSConnectionManager>" /> <Connection Name="Destination" ConnectionString="Data Source=<#=replicationServer#>;User ID=<#=replicationUserID#>;Password=<#=replicationPassword#>;Initial Catalog=<#=replicationCatalog#>;Provider=SQLNCLI11.1;"/> </Connections>
With the Connections element configured, you are ready to build our replication package. In the package, the Biml script create an ExecuteSQL task and a Dataflow task for each table to be replicated.
To build each set of tasks, use a while loop in a control nugget to iterate through the entity (table) names:
int entityCounter = 0; while(entityCounter < allEntityNames.Count){ var tableName = allEntityNames[entityCounter].ToString(); DataRowCollection columns = ((DataRowCollection)entitySchema[tableName]);
In the ExecuteSQL task, execute a SQL query to drop any existing tables that have the same name as our Azure Data Lake Storage entity (table) and create a new table based on the metadata discovered using the CData SSIS Component.
To create the query dynamically, use the Helper.GetDeleteAndCreateStatement() helper function.
Within the Dataflow use a CustomComponent as the source component and an OleDbDestination as the destination.
The CustomComponent element uses the CData SSIS Source component to retrieve Azure Data Lake Storage データ. Start by configuring the component to use with the CData component.
<CustomComponent Name="CData Azure Data Lake Storage Source" ComponentTypeName="CData.SSIS.ADLS.ADLSSource" Version="18" ContactInfo="support@cdata.com" UsesDispositions="true"> ... </CustomComponent>
The next step after configuring the connection is to add Columns elements to the OutputPath child element of the DataflowOverrides element. To do so, call the Helper.GetColumnDefs() helper function.
Use the same Helper class to add columns to the OutputColumns and ExternalColumns child elements of the various OutputPaths elements.
The definitions created provide information about the input, output, and error information for the SSIS component.
<DataflowOverrides> <OutputPath OutputPathName="CData ADLS Source Output"> <Columns> <#=HelperClass.GetColumnDefs(columns,HelperClass.DATAOVERRIDE_COLUMN) #> </Columns> </OutputPath> </DataflowOverrides> ... <OutputPaths> <OutputPath Name="CData ADLS Source Output"> <OutputColumns> <#=HelperClass.GetColumnDefs(columns,HelperClass.OUTPUT_WITH_ERROR) #> </OutputColumns> <ExternalColumns> <#=HelperClass.GetColumnDefs(columns,HelperClass.EXTERNAL) #> </ExternalColumns> </OutputPath> <OutputPath Name="CData ADLS Source Error Output" IsErrorOutput="true"> <OutputColumns> <#=HelperClass.GetColumnDefs(columns,HelperClass.OUTPUT) # </OutputColumns> </OutputPath> </OutputPaths>
The CData SSIS tasks are surfaced in SSIS as custom components with a series of required CustomProperties:
<CustomProperties> <CustomProperty Name="SQLStatement" DataType="Null" UITypeEditor="Microsoft.DataTransformationServices.Controls.ModalMultilineStringEditor, Microsoft.DataTransformationServices.Controls, Version= 10.0.0.0, Culture=neutral, PublicKeyToken=89845dcd8080cc91" SupportsExpression="true"></CustomProperty> <CustomProperty Name="AccessMode" DataType="Int32" TypeConverter="CData.SSIS.ADLS.AccessModeToStringConverter">0</CustomProperty> <CustomProperty Name="TableOrView" DataType="String" UITypeEditor="Microsoft.DataTransformationServices.Controls.ModalMultilineStringEditor, Microsoft.DataTransformationServices.Controls, Version= 10.0.0.0, Culture=neutral, PublicKeyToken=89845dcd8080cc91" SupportsExpression="true">[<#=tableName#>]</CustomProperty> <CustomProperty Name="ExecStoredProcedure" DataType="Boolean">false</CustomProperty> </CustomProperties>
The last element to add to the CustomComponent element is a Connections element, attaching the previously defined connection to the task:
<Connections> <Connection Name="ADLS 2018 Connection" ConnectionName="CData ADLS Connection Manager" /> </Connections>
The final piece of the Dataflow task is the OleDbDestination element. Attach the previously defined OleDbConnection to the element, set the InputPath and ExternalTableOutput:
<OleDbDestination Name="OLE DB Destination" ConnectionName="Destination" CheckConstraints="false"> <InputPath OutputPathName="CData ADLS Source.CData ADLS Source Output" /> <ExternalTableOutput Table="[<#=tableName#>]" /> </OleDbDestination>
... </Dataflow> <# entityCounter++;}#> </Tasks> </Package> </Packages> </Biml>
Once the Biml file is written, right-click on the Biml file in Server Explorer and select Generate SSIS Packages. At this point, Visual Studio and BimlExpress will translate the Biml file into SSIS package(s), ready to be run.
Run the package to begin replicating your Azure Data Lake Storage データ to a SQL Server database (or any other destination you choose).
With the CData SSIS Components for ADLS, you get SQL access to your Azure Data Lake Storage データ directly from SSIS packages. And with Biml, you can automatically generate those packages. For more information about the CData SSIS Components for ADLS, refer to the product page. You can always get started with a free, 30-day trial. As always, our world-class CData Support Team is available if you have any questions.
<#@ template language="C#" hostspecific="true"#> <#@ import namespace="System.Data"#> <#@ import namespace="System.IO"#> <#@ import namespace="System.Collections"#> <#@ import namespace="System.Data.CData.ADLS"#> <#@ assembly name="C:\Program Files\CData\CData SSIS Components for ADLS 2018\lib\CData.SSIS2017.ADLS.dll"#> <# var adlsConnectionString = ""Schema=ADLSGen2;Account=myAccount;FileSystem=myFileSystem;AccessKey=myAccessKey;InitiateOAuth=GETANDREFRESH"; var replicationServer = "JDG"; var replicationCatalog = "BIML"; var replicationUserID = "sqltest"; var replicationPassword = "sqltest"; List<string> allEntityNames = new List<string>(); Hashtable entitySchema = new Hashtable(); using (ADLSConnection connection = new ADLSConnection(adlsConnectionString)) { connection.Open(); var entities = connection.GetSchema("Tables").Rows; foreach (DataRow entity in entities) { allEntityNames.Add(entity["TABLE_NAME"].ToString()); } foreach (string entity in allEntityNames){ var columns = connection.GetSchema("Columns", new string [] {entity}).Rows; entitySchema.Add(entity,columns); } }#> <Biml xmlns="http://schemas.varigence.com/biml.xsd"> <Connections> <CustomSsisConnection Name="CData ADLS Connection Manager" CreationName="CDATA_ADLS" ObjectData="<ADLSConnectionManager><Property Name="ConnectionString"><#=adlsConnectionString#></Property></ADLSConnectionManager>"/> <Connection Name="Destination" ConnectionString="Data Source=<#=replicationServer#>;User ID=<#=replicationUserID#>;Password=<#=replicationPassword#>;Initial Catalog=<#=replicationCatalog#>;Provider=SQLNCLI11.1;"/> </Connections> <Packages> <Package Name="Replicate ADLS Package" Language="None" ConstraintMode="LinearOnCompletion" ProtectionLevel="EncryptSensitiveWithUserKey"> <Tasks> <# int entityCounter = 0; while(entityCounter < allEntityNames.Count){ var tableName = allEntityNames[entityCounter].ToString(); if (tableName.Equals("IdpEventLog")) break; DataRowCollection columns = ((DataRowCollection)entitySchema[tableName]);#> <ExecuteSQL Name="Create <#=tableName#> Replication Table" ConnectionName="Destination"> <DirectInput> <#=HelperClass.GetDeleteAndCreateStatement(tableName,columns)#> </DirectInput> </ExecuteSQL> <Dataflow Name="Replicate <#=tableName#>"> <Transformations> <CustomComponent Name="CData ADLS Source" ComponentTypeName="CData.SSIS.ADLS.ADLSSource" Version="18" ContactInfo="support@cdata.com" UsesDispositions="true"> <DataflowOverrides> <OutputPath OutputPathName="CData ADLS Source Output"> <Columns> <#=HelperClass.GetColumnDefs(columns,HelperClass.DATAOVERRIDE_COLUMN) #> </Columns> </OutputPath> </DataflowOverrides> <CustomProperties> <CustomProperty Name="SQLStatement" DataType="Null" UITypeEditor="Microsoft.DataTransformationServices.Controls.ModalMultilineStringEditor, Microsoft.DataTransformationServices.Controls, Version= 10.0.0.0, Culture=neutral, PublicKeyToken=89845dcd8080cc91" SupportsExpression="true"></CustomProperty> <CustomProperty Name="AccessMode" DataType="Int32" TypeConverter="CData.SSIS.ADLS.AccessModeToStringConverter">0</CustomProperty> <CustomProperty Name="TableOrView" DataType="String" UITypeEditor="Microsoft.DataTransformationServices.Controls.ModalMultilineStringEditor, Microsoft.DataTransformationServices.Controls, Version= 10.0.0.0, Culture=neutral, PublicKeyToken=89845dcd8080cc91" SupportsExpression="true">[<#=tableName#>]</CustomProperty> <CustomProperty Name="ExecStoredProcedure" DataType="Boolean">false</CustomProperty> </CustomProperties> <OutputPaths> <OutputPath Name="CData ADLS Source Output"> <OutputColumns> <#=HelperClass.GetColumnDefs(columns,HelperClass.OUTPUT_WITH_ERROR) #> </OutputColumns> <ExternalColumns> <#=HelperClass.GetColumnDefs(columns,HelperClass.EXTERNAL) #> </ExternalColumns> </OutputPath> <OutputPath Name="CData ADLS Source Error Output" IsErrorOutput="true"> <OutputColumns> <#=HelperClass.GetColumnDefs(columns,HelperClass.OUTPUT) #> </OutputColumns> </OutputPath> </OutputPaths> <Connections> <Connection Name="ADLS 2018 Connection" ConnectionName="CData ADLS Connection Manager" /> </Connections> </CustomComponent> <OleDbDestination Name="OLE DB Destination" ConnectionName="Destination" CheckConstraints="false"> <InputPath OutputPathName="CData ADLS Source.CData ADLS Source Output" /> <ExternalTableOutput Table="[<#=tableName#>]" /> </OleDbDestination> </Transformations> </Dataflow> <# entityCounter++;}#> </Tasks> </Package> </Packages> </Biml> <#+ public static class HelperClass { public static int OUTPUT_WITH_ERROR = 0; public static int EXTERNAL = 1; public static int OUTPUT = 2; public static int DATAOVERRIDE_COLUMN = 4; public static string GetDeleteAndCreateStatement(string tableName, DataRowCollection columns) { var dropAndCreateStatement = "IF EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[{0}]') AND type IN (N'U'))\r\n" + "DROP TABLE [{0}];\r\n" + "CREATE TABLE [{0}]\r\n" + "(\r\n" + "{1}\r\n" + ")\r\n" + "ON \"default\";"; string columnDefs = ""; foreach (DataRow column in columns){ string columnDef = " [{0}] {1}"; string dataType = column["DATA_TYPE"].ToString(); if (dataType.ToLower().StartsWith("bool")) { dataType = "bit"; } else if (dataType.ToLower().Equals("real")) { dataType = "float"; } else if (dataType.ToLower().Contains("varchar")) { var columnLength = column["CHARACTER_MAXIMUM_LENGTH"]; dataType = "nvarchar(" + ((int)columnLength > 4000 ? "MAX" : columnLength) + ")"; } columnDefs += String.Format(columnDef,column["COLUMN_NAME"],dataType) + ",\r\n"; } columnDefs = columnDefs.Remove(columnDefs.LastIndexOf(",\r\n"),",\r\n".Length); return String.Format(dropAndCreateStatement,tableName,columnDefs); } public static string GetColumnDefs(DataRowCollection columns, int columnType){ var columnDefTemplate = ""; var columnElements = ""; if (columnType == DATAOVERRIDE_COLUMN) { columnDefTemplate = " <Column ErrorRowDisposition=\"FailComponent\" TruncationRowDisposition=\"FailComponent\" ColumnName=\"{0}\" />\r\n"; foreach(DataRow column in columns) { var columnName = column["COLUMN_NAME"]; columnElements += String.Format(columnDefTemplate,columnName); } return columnElements; } if (columnType == OUTPUT_WITH_ERROR) columnDefTemplate = " <OutputColumn Name=\"{0}\" {1} ExternalMetadataColumnName=\"{0}\" ErrorRowDisposition=\"FailComponent\" TruncationRowDisposition=\"FailComponent\" />\r\n"; else if (columnType == EXTERNAL) columnDefTemplate = " <ExternalColumn Name=\"{0}\" {1} />\r\n"; else if (columnType == OUTPUT) columnDefTemplate = " <OutputColumn Name=\"{0}\" {1} />\r\n"; foreach(DataRow column in columns){ var columnName = column["COLUMN_NAME"]; var dataTypeRaw = column["DATA_TYPE"].ToString().ToLower(); var typeAndRelatedInfo = ""; if (dataTypeRaw.Equals("bool")) { typeAndRelatedInfo = "DataType=\"Boolean\""; } else if (dataTypeRaw.Equals("date")) { typeAndRelatedInfo = "DataType=\"Date\" SsisDataTypeOverride=\"DT_DBDATE\""; } else if (dataTypeRaw.Equals("datetime")) { typeAndRelatedInfo = "DataType=\"DateTime\""; } else if (dataTypeRaw.Equals("real")) { typeAndRelatedInfo = ((int)column["NumericPrecision"] > 0 ? "Precision=\"18\" " : " ") + ((int)column["NumericScale"] > 0 ? "Scale=\"15\" " : " ") + "DataType=\"Decimal\""; } else if (dataTypeRaw.Equals("varchar")) { var columnLength = column["CHARACTER_MAXIMUM_LENGTH"]; if ((int)columnLength > 4000) { typeAndRelatedInfo = "DataType=\"String\""; } else { typeAndRelatedInfo = "Length=\"" + columnLength + "\" DataType=\"String\" CodePage=\"1252\""; } } columnElements += String.Format(columnDefTemplate,columnName,typeAndRelatedInfo); } return columnElements; } } #>