製品をチェック

CData Connect Cloud の詳細情報はこちら。無償トライアルはこちら:

  無償トライアル

データ連携でお困りですか?

お問い合わせ

Google Apps Script とKafka データを連携


CData Connect Cloud を使ってGoogle Apps Script からKafka データに接続します。


宮本航太
プロダクトスペシャリスト

kafka ロゴ画像

Connect Cloud

cloud ロゴ画像
Google Apps Script ロゴ画像

Google Apps Script を使えばGoogle スプレッドシート、Google ドキュメントといったGoogle のサービス内でカスタム機能を作成できます。CData Connect Cloud を使えば、Kafka を含むCData がサポートする100以上のすべてのデータソースのSQL Server インターフェースを利用できます。SQL Server プロトコルはGoogle Apps Script のJDBC サービスでネイティブにサポートされているので、Connect Cloud を活用すればご利用のGoogle サービスからKafka ライブデータにアクセスできます。

この記事では、Connect Cloud でKafka に接続する方法を説明し、Google スプレッドシートでKafka データを扱うためのサンプルスクリプトを提供します。

この記事のスクリプトは指定したテーブルからのみデータを読み込みますが、スクリプトを拡張して更新機能を追加することもできます。

Connect Cloud からKafka への接続

CData Connect Cloud では、直感的なクリック操作ベースのインターフェースを使ってデータソースに接続できます。

  1. Connect Cloud にログインし、 Add Connection をクリックします。 コネクションの追加
  2. Add Connection パネルから「Kafka」を選択します。 データソースの選択
  3. 必要な認証プロパティを入力し、Kafka に接続します。

    BootstrapServers およびTopic プロパティを設定して、Apache Kafka サーバーのアドレスと、対話するトピックを指定します。

    認可メカニズム

    • SASL PlainUser およびPassword プロパティを指定する必要があります。AuthScheme は'Plain' に設定します。
    • SASL SSLUser およびPassword プロパティを指定する必要があります。AuthScheme は'Scram' に、UseSSL はtrue に設定します。
    • SSLSSLCert およびSSLCertPassword プロパティを指定する必要があります。UseSSL はtrue に設定します。
    • KerberosUser およびPassword プロパティを指定する必要があります。AuthScheme は'Kerberos' に設定します。

    サーバー証明書を信頼する必要がある場合があります。そのような場合は、必要に応じてTrustStorePath およびTrustStorePassword を指定してください。

    接続の設定(Salesforce の表示)
  4. Create & Test をクリックします。
  5. 「Add Kafka Connection」ページの「Permissions」タブに移動し、ユーザーベースのアクセス許可を更新します。 権限を更新

パーソナルアクセストークンの追加

OAuth 認証をサポートしていないサービス、アプリケーション、プラットフォーム、またはフレームワークから接続する場合は、認証に使用するパーソナルアクセストークン(PAT)を作成できます。きめ細かなアクセス管理を行うために、サービスごとに個別のPAT を作成するのがベストプラクティスです。

  1. Connect Cloud アプリの右上にあるユーザー名をクリックし、「Settings」をクリックします。
  2. 「Settings」ページで「Access Token」セクションにスクロールし、 Create PAT をクリックします。
  3. PAT の名前を入力して Create をクリックします。 新しいPAT を作成
  4. パーソナルアクセストークンは作成時にしか表示されないため、必ずコピーして安全に保存してください。

コネクションの設定が完了したら、Google Apps Script からKafka データへの接続準備ができました。

Apps Script からKafka データに接続

ここまでで、Connect Cloud でのKafka のコネクション設定は完了していると思います。ここからは、Google Apps Script からConnect Cloud に接続して、Google スプレッドシートでKafka データを扱います。

このセクションでは、スプレッドシートからKafka データを取得する、スクリプトを呼び出すメニューオプション付きのスクリプトを作成していきます。本記事用に、各部分にコメントをつけたサンプルスクリプトを作成しました。サンプルスクリプトは、記事の最後に記載しています。

1.空のスクリプトを作成

Google スプレッドシート用のスクリプトを作るには、Google スプレッドシートのメニューから拡張機能 Apps Script をクリックします。

2.クラス変数の宣言

スクリプトで作成した関数で利用するためのクラス変数をいくつか作成します。

//replace the variables in this block with real values as needed
var address = 'tds.cdata.com:14333';
var user = 'CONNECT_USER'; // user@mydomain.com
var userPwd = 'CONNECT_USER_PAT';
var db = 'ApacheKafka1';

var dbUrl = 'jdbc:sqlserver://' + address + ';databaseName=' + db;

3.メニューオプションを追加

この関数はGoogle スプレッドシートに、作成した関数を呼び出すためのメニューオプションを追加します。

function onOpen() {
  var spreadsheet = SpreadsheetApp.getActive();
  var menuItems = [
    {name:'Export data to a sheet', functionName: 'connectToApacheKafkaData'}
  ];
  spreadsheet.addMenu('Kafka Data', menuItems);
} 
新しく追加したメニューオプション。

4.ヘルパー関数を作成

スプレッドシートの最初の空行を見つけるために、この関数を使用します。

/*
 * Finds the first empty row in a spreadsheet by scanning an array of columns
 * @return The row number of the first empty row.
 */
function getFirstEmptyRowByColumnArray(spreadSheet, column) {
  var column = spreadSheet.getRange(column + ":" + column);
  var values = column.getValues(); // get all data in one call
  var ct = 0;
  while ( values[ct] && values[ct][0] != "" ) {
    ct++;
  }
  return (ct+1);
}

5.Kafka データをスプレッドシートに書き込む関数を作成

以下の関数はGoogle Apps Script のJDBC 機能を使ってConnect Cloud に接続し、Kafka データを書き込み、データをSELECT してスプレッドシートにデータを読み込みます。スクリプトを実行すると、2つの入力ボックスが表示されます。 最初のボックスはユーザーに、データを保持するためのシート名を入力するよう求めます。スプレッドシートが存在しなければ、関数側で作成します。 シート選択用の入力ボックス

2つ目のボックスはユーザーに、読み込むKafka テーブル名を入力するよう求めます。無効なテーブルを選択すると、エラーメッセージが表示され関数は終了します。 テーブル選択用の入力ボックス。

関数はメニューオプションとして使用するよう設計されていますが、スプレッドシートの式として使用するよう拡張することもできます。

/*
 * Reads data from a specified Kafka 'table' and writes it to the specified sheet.
 *    (If the specified sheet does not exist, it is created.)
 */
function connectToApacheKafkaData() {
  var thisWorkbook = SpreadsheetApp.getActive();

  //select a sheet and create it if it does not exist
  var selectedSheet = Browser.inputBox('Which sheet would you like the data to post to?',Browser.Buttons.OK_CANCEL);
  if (selectedSheet == 'cancel')
    return;

  if (thisWorkbook.getSheetByName(selectedSheet) == null)
    thisWorkbook.insertSheet(selectedSheet);
  var resultSheet = thisWorkbook.getSheetByName(selectedSheet);
  var rowNum = 2;

  //select a Kafka 'table'
  var table = Browser.inputBox('Which table would you like to pull data from?',Browser.Buttons.OK_CANCEL);
  if (table == 'cancel')
    return;

  var name = Jdbc.getConnection(dbUrl, {
    user: user, 
    password: userPwd
	}	
  );

  //confirm that var table is a valid table/view
  var dbMetaData = name.getMetaData();
  var tableSet = dbMetaData.getTables(null, null, table, null);
  var validTable = false;
  while (tableSet.next()) {
    var tempTable = tableSet.getString(3);
    if (table.toUpperCase() == tempTable.toUpperCase()){
      table = tempTable;
      validTable = true;
      break;
    }
  } 
  tableSet.close();
  if (!validTable) {
    Browser.msgBox("Invalid table name: " + table, Browser.Buttons.OK);
    return;
  }

  var stmt = name.createStatement();

  var results = stmt.executeQuery('SELECT * FROM ' + table);
  var rsmd = results.getMetaData();
  var numCols = rsmd.getColumnCount();

  //if the sheet is empty, populate the first row with the headers
  var firstEmptyRow = getFirstEmptyRowByColumnArray(resultSheet, "A");
  if (firstEmptyRow == 1) {
    //collect column names
    var headers = new Array(new Array(numCols));
    for (var col = 0; col < numCols; col++){
      headers[0][col] = rsmd.getColumnName(col+1);
    }
    resultSheet.getRange(1, 1, headers.length, headers[0].length).setValues(headers);
  } else {
    rowNum = firstEmptyRow;
  }

  //write rows of Kafka data to the sheet
  var values = new Array(new Array(numCols));
  while (results.next()) {
    for (var col = 0; col < numCols; col++) {
      values[0][col] = results.getString(col + 1);
    }
    resultSheet.getRange(rowNum, 1, 1, numCols).setValues(values);
    rowNum++;
  }

  results.close();
  stmt.close();
}
  

関数の処理が完了すると、スプレッドシートにKafka データが読み込まれ、Google スプレッドシートの表計算、グラフ作成機能をどこでも自在に活用できます。


完全なGoogle Apps Script

//replace the variables in this block with real values as needed
var address = 'tds.cdata.com:14333';
var user = 'CONNECT_USER'; // user@mydomain.com
var userPwd = 'CONNECT_USER_PAT';
var db = 'ApacheKafka1';

var dbUrl = 'jdbc:sqlserver://' + address + ';databaseName=' + db;

function onOpen() {
  var spreadsheet = SpreadsheetApp.getActive();
  var menuItems = [
    {name:'Write table data to a sheet', functionName: 'connectToApacheKafkaData'}
  ];
  spreadsheet.addMenu('Kafka Data', menuItems);
}

/*
 * Finds the first empty row in a spreadsheet by scanning an array of columns
 * @return The row number of the first empty row.
 */
function getFirstEmptyRowByColumnArray(spreadSheet, column) {
  var column = spreadSheet.getRange(column + ":" + column);
  var values = column.getValues(); // get all data in one call
  var ct = 0;
  while ( values[ct] && values[ct][0] != "" ) {
    ct++;
  }
  return (ct+1);
}

/*
 * Reads data from a specified 'table' and writes it to the specified sheet.
 *    (If the specified sheet does not exist, it is created.)
 */
function connectToApacheKafkaData() {
  var thisWorkbook = SpreadsheetApp.getActive();

  //select a sheet and create it if it does not exist
  var selectedSheet = Browser.inputBox('Which sheet would you like the data to post to?',Browser.Buttons.OK_CANCEL);
  if (selectedSheet == 'cancel')
    return;

  if (thisWorkbook.getSheetByName(selectedSheet) == null)
    thisWorkbook.insertSheet(selectedSheet);
  var resultSheet = thisWorkbook.getSheetByName(selectedSheet);
  var rowNum = 2;

  //select a Kafka 'table'
  var table = Browser.inputBox('Which table would you like to pull data from?',Browser.Buttons.OK_CANCEL);
  if (table == 'cancel')
    return;

  var name = Jdbc.getConnection(dbUrl, {
    user: user, 
    password: userPwd
	}
  );

  //confirm that var table is a valid table/view
  var dbMetaData = name.getMetaData();
  var tableSet = dbMetaData.getTables(null, null, table, null);
  var validTable = false;
  while (tableSet.next()) {
    var tempTable = tableSet.getString(3);
    if (table.toUpperCase() == tempTable.toUpperCase()){
      table = tempTable;
      validTable = true;
      break;
    }
  } 
  tableSet.close();
  if (!validTable) {
    Browser.msgBox("Invalid table name: " + table, Browser.Buttons.OK);
    return;
  }

  var stmt = name.createStatement();

  var results = stmt.executeQuery('SELECT * FROM ' + table);
  var rsmd = results.getMetaData();
  var numCols = rsmd.getColumnCount();

  //if the sheet is empty, populate the first row with the headers
  var firstEmptyRow = getFirstEmptyRowByColumnArray(resultSheet, "A");
  if (firstEmptyRow == 1) {
    //collect column names
    var headers = new Array(new Array(numCols));
    for (var col = 0; col < numCols; col++){
      headers[0][col] = rsmd.getColumnName(col+1);
    }
    resultSheet.getRange(1, 1, headers.length, headers[0].length).setValues(headers);
  } else {
    rowNum = firstEmptyRow;
  }

  //write rows of Kafka data to the sheet
  var values = new Array(new Array(numCols));
  while (results.next()) {
    for (var col = 0; col < numCols; col++) {
      values[0][col] = results.getString(col + 1);
    }
    resultSheet.getRange(rowNum, 1, 1, numCols).setValues(values);
    rowNum++;
  }

  results.close();
  stmt.close();
}