PowerShell からKafka のデータに接続してデータの取得・更新・挿入・削除・CSV エクスポートを実行する方法

CData PowerShell Cmdlets を使って、Kafka のデータに接続、データの取得・更新・挿入・削除・CSV エクスポートを実行する方法を紹介します。

加藤龍彦
デジタルマーケティング

最終更新日:2023-09-26

この記事で実現できるKafka 連携のシナリオ

こんにちは!ウェブ担当の加藤です。マーケ関連のデータ分析や整備もやっています。

CData Cmdlets for ApacheKafka を使えば、PowerShell からKafka に手軽に連携して、データのCRUD やエクスポートを実行できます。

本記事では、Kafka への接続方法からCSV エクスポート、データの操作までサンプルコード付きで解説していきます。

Kafka への接続を設定

それでは、まずはKafka への接続設定からはじめていきましょう。接続設定にはCData Kafka Cmdlets が必要となります。右側のサイドバーから製品の全機能が使える30日間の無償トライアルがダウンロードできるので、ぜひご利用ください。

インストールが完了したら、プロファイルに以下の行を追加してください。次のPowerShell セッションでモジュールがロードされます。

    Import-Module ApacheKafkaCmdlets;

Connect-ApacheKafka コマンドを使ってKafka との接続を設定します。各接続プロパティの取得方法は次に説明します。

    $conn = Connect-ApacheKafka  -User "$User" -Password "$Password" -BootStrapServers "$BootStrapServers" -Topic "$Topic"

Apache Kafka 接続プロパティの取得・設定方法

.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされ、自動的に本製品と一緒にインストールされます。 別のインストール方法を利用する場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0 をインストールしてください。

Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。

デフォルトでは、本製品はデータソースとPLAINTEXT で通信し、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化するには:

  1. UseSSLtrue に設定し、本製品がSSL 暗号化を使用するように構成します。
  2. SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします。

Apache Kafka への認証

Apache Kafka データソースは、次の認証メソッドをサポートしています:

  • Anonymous
  • Plain
  • SCRAM ログインモジュール
  • SSL クライアント証明書
  • Kerberos

Anonymous

Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 こうした接続はanonymous(匿名)と呼ばれます。

匿名認証を行うには、このプロパティを設定します。

  • AuthSchemeNone

その他の認証方法については、ヘルプドキュメントを参照してください。

これで接続設定は完了です。

Kafka のデータを取得してCSV にパイプライン

接続が完了したので、SampleTable_1 テーブルデータを取得して結果をCSV ファイルにエクスポートします。

Select-ApacheKafka -Connection $conn -Table SampleTable_1 | Select -Property * -ExcludeProperty Connection,Table,Columns | Export-Csv -Path c:\mySampleTable_1Data.csv -NoTypeInformation

このコードでは、Select-ApacheKafka から取得した結果をSelect-Object に流して、Export-Csv に渡す前にいくつかのプロパティを除外しています。これは、CData Cmdlets が接続情報、テーブル、およびカラム情報を結果セットのそれぞれの行に挿入するためです。それらの情報を表示したくない場合に、Export-Csv コマンドに渡す前に除外を先に行い、そのあとでCSV ファイルにエクスポートします。

それでは、続いてデータの削除や挿入・更新を実行してみます。

データの削除

以下のように、フィルタリングで合致するレコードを削除することができます。

Select-ApacheKafka -Connection $conn -Table SampleTable_1 -Where "Column2 = 100" | Remove-ApacheKafka

データの挿入と更新

データの挿入や更新を使って、加工やクレンジングも行えます。以下の手順では、CSV ファイルのデータを読み込んで挿入対象のオブジェクトに同じレコードが存在するかを確認した上で、存在する場合にはデータを更新、存在しない場合にはデータの挿入を行います。

Import-Csv -Path C:\MySampleTable_1Updates.csv | %{
  $record = Select-ApacheKafka -Connection $ApacheKafka -Table SampleTable_1 -Where ("Id = `'"+$_.Id+"`'")
  if($record){
    Update-ApacheKafka -Connection $apachekafka -Table SampleTable_1 -Columns ("Id","Column1") -Values ($_.Id, $_.Column1) -Where ("Id = `'"+$_.Id+"`'")
  }else{
    Add-ApacheKafka -Connection $apachekafka -Table SampleTable_1 -Columns ("Id","Column1") -Values ($_.Id, $_.Column1)
  }
}

おわりに

このように、CData Cmdlets を使えばPowerShell でのKafka のデータへの連携をシンプルに実現できます。ぜひCData PowerShell Cmdlets の30日間無償トライアルをダウンロードして、シンプルかつパワフルなデータ連携をお試しください。

関連コンテンツ

トライアル・お問い合わせ

30日間無償トライアルで、CData のリアルタイムデータ連携をフルにお試しいただけます。記事や製品についてのご質問があればお気軽にお問い合わせください。