Node.js でKafka を仮想MySQL データベースとしてクエリ

詳細情報をご希望ですか?

製品について詳細情報や無償トライアルをご案内します:

CData Connect



Node.js からKafka に対してMySQL データベースとしてクエリを実行。

CData Connect Cloud を使って、MySQL インターフェースでKafka をクエリすることができます。この記事では、Connect Cloud で仮想Kafka データベースを作成し、Node.js でKafka をクエリする連携方法を説明します。

CData Connect Cloud は、Kafka データのクラウドto クラウドのインターフェースを仮想MySQL として提供し、Node.js からRDB のようにデータをSQL でクエリすることができます。CData Connect Cloud がNode.js から発行されるSQL クエリ(フィルタリングやJOIN も可能)をパースしてKafka に送り、Node.js アプリ にKafka からのデータを返します。インテリジェントなサーバーサイドプロセスで、多様なクエリをパフォーマンス良く利用できます。

Kafka の仮想MySQL データベースを作成

CData Connect Cloud は、直観的なPoint-and-click インターフェースでデータソースへの接続およびAPI エンドポイント作成を行います。

  1. Connect Cloud にログインして、[Databases]をクリックします。
  2. 利用できるデータソースアイコンから"Kafka" を選択します。
  3. Kafka に接続するために必要なプロパティを入力します。

    BootstrapServers およびTopic プロパティを設定して、Apache Kafka サーバーのアドレスと、対話するトピックを指定します。

    認可メカニズム

    • SASL PlainUser およびPassword プロパティを指定する必要があります。AuthScheme は'Plain' に設定します。
    • SASL SSLUser およびPassword プロパティを指定する必要があります。AuthScheme は'Scram' に、UseSSL はtrue に設定します。
    • SSLSSLCert およびSSLCertPassword プロパティを指定する必要があります。UseSSL はtrue に設定します。
    • KerberosUser およびPassword プロパティを指定する必要があります。AuthScheme は'Kerberos' に設定します。

    サーバー証明書を信頼する必要がある場合があります。そのような場合は、必要に応じてTrustStorePath およびTrustStorePassword を指定してください。

  4. Test Database]をクリックします。
  5. [Privileges]->[ Add]をクリックして、新しいユーザーを追加し、適切な権限を指定します。

これで、Kafka の仮想データベースが作成でき、MySQL クライアントからの連携が可能になりました。

Kafka データをNode.js からクエリ

以下のサンプルは、Node.js のMySQL モジュールからKafka への接続を定義し、クエリを実行します。以下の情報が必要です:

  • Host nameaddressport:Connect Cloud のインスタンス名(myinstance.cdatacloud.net)とポート(3306)です。
  • Username およびpassword:Connect Cloud で登録された権限のあるユーザーおよびそのパスワード。
  • Database name:Kafka (kafkadb) のデータベース名。

Kafka に接続して、以下のコードでクエリを実行します:

var mysql      = require('mysql');
var fs         = require('fs');
var connection = mysql.createConnection({
  host     : 'myinstance.cdatacloud.net',
  database : 'kafkadb',
  port	   :'3306',
  user     : 'admin',
  password : 'password',
  ssl      : {
    ca : fs.readFileSync('C:/certs/myCA.pem')
  }
});
connection.connect();
connection.query('SELECT * FROM SampleTable_1', function(err, rows, fields) {
  if (err) throw err;
  console.log(rows);
});

connection.end();