PowerShell からSpark のデータに接続してデータの取得・更新・挿入・削除・CSV エクスポートを実行する方法

CData PowerShell Cmdlets を使って、Spark のデータに接続、データの取得・更新・挿入・削除・CSV エクスポートを実行する方法を紹介します。

加藤龍彦
デジタルマーケティング

最終更新日:2023-09-26

この記事で実現できるSpark 連携のシナリオ

こんにちは!ウェブ担当の加藤です。マーケ関連のデータ分析や整備もやっています。

CData Cmdlets for SparkSQL を使えば、PowerShell からSpark に手軽に連携して、データのCRUD やエクスポートを実行できます。

本記事では、Spark への接続方法からCSV エクスポート、データの操作までサンプルコード付きで解説していきます。

Spark への接続を設定

それでは、まずはSpark への接続設定からはじめていきましょう。接続設定にはCData Spark Cmdlets が必要となります。右側のサイドバーから製品の全機能が使える30日間の無償トライアルがダウンロードできるので、ぜひご利用ください。

インストールが完了したら、プロファイルに以下の行を追加してください。次のPowerShell セッションでモジュールがロードされます。

    Import-Module SparkSQLCmdlets;

Connect-SparkSQL コマンドを使ってSpark との接続を設定します。各接続プロパティの取得方法は次に説明します。

    $conn = Connect-SparkSQL  -Server "$Server"

SparkSQL への接続

SparkSQL への接続を確立するには以下を指定します。

  • Server:SparkSQL をホストするサーバーのホスト名またはIP アドレスに設定。
  • Port:SparkSQL インスタンスへの接続用のポートに設定。
  • TransportMode:SparkSQL サーバーとの通信に使用するトランスポートモード。有効な入力値は、BINARY およびHTTP です。デフォルトではBINARY が選択されます。
  • AuthScheme:使用される認証スキーム。有効な入力値はPLAIN、LDAP、NOSASL、およびKERBEROS です。デフォルトではPLAIN が選択されます。

Databricks への接続

Databricks クラスターに接続するには、以下の説明に従ってプロパティを設定します。Note:必要な値は、「クラスター」に移動して目的のクラスターを選択し、 「Advanced Options」の下にある「JDBC/ODBC」タブを選択することで、Databricks インスタンスで見つけることができます。

  • Server:Databricks クラスターのサーバーのホスト名に設定。
  • Port:443
  • TransportMode:HTTP
  • HTTPPath:Databricks クラスターのHTTP パスに設定。
  • UseSSL:True
  • AuthScheme:PLAIN
  • User:'token' に設定。
  • Password:パーソナルアクセストークンに設定(値は、Databricks インスタンスの「ユーザー設定」ページに移動して「アクセストークン」タブを選択することで取得できます)。

これで接続設定は完了です。

Spark のデータを取得してCSV にパイプライン

接続が完了したので、Customers テーブルデータを取得して結果をCSV ファイルにエクスポートします。

Select-SparkSQL -Connection $conn -Table Customers | Select -Property * -ExcludeProperty Connection,Table,Columns | Export-Csv -Path c:\myCustomersData.csv -NoTypeInformation

このコードでは、Select-SparkSQL から取得した結果をSelect-Object に流して、Export-Csv に渡す前にいくつかのプロパティを除外しています。これは、CData Cmdlets が接続情報、テーブル、およびカラム情報を結果セットのそれぞれの行に挿入するためです。それらの情報を表示したくない場合に、Export-Csv コマンドに渡す前に除外を先に行い、そのあとでCSV ファイルにエクスポートします。

それでは、続いてデータの削除や挿入・更新を実行してみます。

データの削除

以下のように、フィルタリングで合致するレコードを削除することができます。

Select-SparkSQL -Connection $conn -Table Customers -Where "Country = US" | Remove-SparkSQL

データの挿入と更新

データの挿入や更新を使って、加工やクレンジングも行えます。以下の手順では、CSV ファイルのデータを読み込んで挿入対象のオブジェクトに同じレコードが存在するかを確認した上で、存在する場合にはデータを更新、存在しない場合にはデータの挿入を行います。

Import-Csv -Path C:\MyCustomersUpdates.csv | %{
  $record = Select-SparkSQL -Connection $SparkSQL -Table Customers -Where ("Id = `'"+$_.Id+"`'")
  if($record){
    Update-SparkSQL -Connection $sparksql -Table Customers -Columns ("City","Balance") -Values ($_.City, $_.Balance) -Where ("Id = `'"+$_.Id+"`'")
  }else{
    Add-SparkSQL -Connection $sparksql -Table Customers -Columns ("City","Balance") -Values ($_.City, $_.Balance)
  }
}

おわりに

このように、CData Cmdlets を使えばPowerShell でのSpark のデータへの連携をシンプルに実現できます。ぜひCData PowerShell Cmdlets の30日間無償トライアルをダウンロードして、シンプルかつパワフルなデータ連携をお試しください。

関連コンテンツ

トライアル・お問い合わせ

30日間無償トライアルで、CData のリアルタイムデータ連携をフルにお試しいただけます。記事や製品についてのご質問があればお気軽にお問い合わせください。