各製品の資料を入手。
詳細はこちら →CData
こんにちは!ドライバー周りのヘルプドキュメントを担当している古川です。
CData Cmdlets for ApacheKafka を使えば、PowerShell からKafka データ データにリアルタイムで連携できます。データ同期などのタスクの連携にぴったりの製品です。 本記事では、PowerShell からCData Cmdlets for ApacheKafka およびCData Cmdlets for MySQL を使って、同期スクリプトを作成して実行します。
まずは、PowerShell でKafka への接続を行います。レプリケーションは4つのステップがあります。
BootstrapServers およびTopic プロパティを設定して、Apache Kafka サーバーのアドレスと、対話するトピックを指定します。
サーバー証明書を信頼する必要がある場合があります。そのような場合は、必要に応じてTrustStorePath およびTrustStorePassword を指定してください。
モジュールのインストール:
Install-Module ApacheKafkaCmdlets
Kafka への接続:
$apachekafka = Connect-ApacheKafka -User $User -Password $Password -BootStrapServers $BootStrapServers -Topic $Topic
取得ターゲットのリソースの取得:
$data = Select-ApacheKafka -Connection $apachekafka -Table "SampleTable_1"
Invoke-ApacheKafka cmdlet を使って、SQL-92 クエリを使用することもできます:
$data = Invoke-ApacheKafka -Connection $apachekafka -Query 'SELECT * FROM SampleTable_1 WHERE Column2 = @Column2' -Params @{'@Column2'='100'}
戻り値からカラム名のリストを保存します。
$columns = ($data | Get-Member -MemberType NoteProperty | Select-Object -Property Name).Name
カラム名を指定できるようにして、データをMySQL データベースにレプリケーションします。
モジュールのインストール:
Install-Module MySQLCmdlets
MySQL DB に、MySQL Server 名、ユーザー、パスワード、レプリケーション先のデータベース名を指定して、接続します:
$mysql = Connect-MySQL -User $User -Password $Password -Database $Database -Server $Server -Port $Port
Kafka、保存された値、そしてAdd-MySQL Cmdlet を使って、MySQL にデータを1レコードずつ挿入します。この例では、MySQL 側のテーブルは、Kafka のリソース(SampleTable_1)と同じテーブル名を持っている必要があります。
$data | % {
$row = $_
$values = @()
$columns | % {
$col = $_
$values += $row.$($col)
}
Add-MySQL -Connection $mysql -Table "SampleTable_1" -Columns $columns -Values $values
}
一度PowerShell でKafka とMySQL に接続したら、次からは1行のコマンドでレプリケーションを実施できます:
Select-ApacheKafka -Connection $apachekafka -Table "SampleTable_1" | % {
$row = $_
$values = @()
$columns | % {
$col = $_
$values += $row.$($col)
}
Add-MySQL -Connection $mysql -Table "SampleTable_1" -Columns $columns -Values $values
}
別のPowerShell モジュールで、Kafka を別のデータベースに複製する場合、Select-ApacheKafka cmdlet のデータから、カラム、接続およびテーブルを除外しておきましょう。これらのデータはデータ移動のときだけ必要となるためです。
$columns = ($data | Get-Member -MemberType NoteProperty | Select-Object -Property Name).Name | ? {$_ -NotIn @('Columns','Connection','Table')}
これで、Kafka データをMySQL に複製できました。分析、BI などでKafka データをMySQL から使うことができるようになります。