Google Cloud Data Fusion でKafka のデータを扱う方法:CData JDBC Driver

CData JDBC ドライバを使って、Google Cloud Data fusion で Kafka のデータ をBigQuery にETL。

宮本航太
プロダクトスペシャリスト

最終更新日:2022-05-16

この記事で実現できるKafka 連携のシナリオ

こんにちは!プロダクトスペシャリストの宮本です。

Google Cloud Data Fusion は、ノーコードでデータ連携の設定が可能な言わば GCP の ETL ツール(サービス)です。たくさんのコネクタや変換・分析機能がデフォルトで用意されているため、さまざまなデータソースを色々な組み合わせで扱うことが可能なようです。 また JDBC を扱うこともできるため、この記事では、CData JDBC Driver for Kafka のデータ を使って、Kafka のデータ データをCloud Data Fusion でGoogle BigQuery にノーコードでパイプラインします。

Cloud Data Fusion の準備

まずはCloud Data Fusion のインスタンスを作成します。

  1. Data Fusion のトップ画面にある「CREATE INSTANCE」からインスタンスを作成します。
  2. 作成されたインスタンス名を先ほどの画面でクリックすると以下の画面に遷移しますので、画面下部にある Service Account をコピーします。
  3. Cloud Data Fusion のインスタンス作成
  4. 画面上部にある追加からメンバーを追加します。メンバー名は先ほどコピーした「Service Account」に合わせてください。 役割は BiqQuery へもアクセスしますので、「BigQuery 管理者」、「Cloud Data Fusion 管理者」、「Cloud Data Fusion API サービス エージェント」を付与します。

CData JDBC Driver for ApacheKafka のアップロード

ここからは実際に、Data Fusion の設定をしていきます。 まずは JDBC Driver をアップロードを行います。

  1. 「View Instance」をクリックして、Data Fusion の Control Center を開きます。
  2. Cloud Data Fusion のControl Center を開く
  3. Control Center が表示されたら、「+」ボタンをクリックして JDBC Driver をアップロードしていきます。
    • Name:アップロードしたドライバーに設定する名前
    • Class name:cdata.jdbc.apachekafka.ApacheKafkaDriver
    JDBC Driver をCloud Data Fusion にアップロード
  4. アップロードする際の注意点として、Driver のファイル名を name-version の形式に変更してアップロードする必要があります。 なお、jarファイルをダブルクリックした際に表示されているバージョンをもとに「apachekafka-connector-java-19.0.7115.0.jar」に変更しました。
  5. JDBC Driver をCloud Data Fusion にアップロード
  6. アップロードが成功するとこのような画面が表示されるので、「Create a Pipeline」をクリックします。
  7. JDBC Driver のアップロード終了

Kafka からGoogle BigQuery へのパイプラインの作成

Data Fusion のパイプライン作成

インプット元はサイドメニューの「Source」から選択します。今回は先ほどアップロードした Kafka のデータ の JDBC Driver を使用するため、「DataBase」を選択します。 アウトプット先は同じくサイドメニューより「Sink」→「BigQuery」を選択します。

Source およびSink 先の選択

「DataBase」の設定

「DataBase」のアイコンにカーソルを持ってくるとプロパティというボタンが表示されるのでクリックし、下記内容を設定します。

  • Label:ApacheKafka
  • Reference Name:ApacheKafka
  • Plugin Name:ApacheKafka Driver(Driver をアップロードした際の名前)
  • Plugin Type:jdbc
  • Connection String:ApacheKafka へ接続する際の JDBC URL
  • Import Query:インプットしたいデータを抽出するクエリ

Apache Kafka 接続プロパティの取得・設定方法

.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされ、自動的に本製品と一緒にインストールされます。 別のインストール方法を利用する場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0 をインストールしてください。

Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。

デフォルトでは、本製品はデータソースとPLAINTEXT で通信し、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化するには:

  1. UseSSLtrue に設定し、本製品がSSL 暗号化を使用するように構成します。
  2. SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします。

Apache Kafka への認証

Apache Kafka データソースは、次の認証メソッドをサポートしています:

  • Anonymous
  • Plain
  • SCRAM ログインモジュール
  • SSL クライアント証明書
  • Kerberos

Anonymous

Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 こうした接続はanonymous(匿名)と呼ばれます。

匿名認証を行うには、このプロパティを設定します。

  • AuthSchemeNone

その他の認証方法については、ヘルプドキュメントを参照してください。

Connection String は以下の形式です。

jdbc:apachekafka:User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;

Database プロパティ設定

上のキャプチャの赤枠は、Salesforce から BigQuery へアウトプットするデータの定義となります。 こちらは「Import Query」のすぐ右上にある「Get Schema」をクリックすると下の画面が表示されますので、「Import Query」で入力したクエリを実行し、カラムを定義します。

Output schema 設定

「BigQuery」の設定

こちらも同様に BigQuery のプロパティから下記内容を設定します。

  • Label:BigQuery
  • Reference Name:BigQuery
  • Project ID:使用するProject ID
  • DataSet:使用するDataSet
  • Table:使用するテーブル名、例:Account_DataFusion
BigQuery のプロパティ設定

作成したKafka のデータ からBigQuery のパイプラインの実行

まずは作成したパイプラインをデプロイします。赤枠の「Deploy」ボタンをクリックしてデプロイを行います。

Deploy Cloud Data Fusion Pipeline

デプロイ完了後、Runボタンが表示されますので、クリックします。

デプロイしたパイプラインを実行

このようにCData JDBC ドライバをアップロードすることで、簡単にGoogle Cloud Data Fusion でKafka のデータ データをノーコードで連携し、BigQuery などへのパイプラインを作成することができます。

是非、CData JDBC Driver for ApacheKafka 30日の無償評価版 をダウンロードして、お試しください。

関連コンテンツ

トライアル・お問い合わせ

30日間無償トライアルで、CData のリアルタイムデータ連携をフルにお試しいただけます。記事や製品についてのご質問があればお気軽にお問い合わせください。