Kafka データをHeroku Postgres にレプリケーションして、Salesforce Connect から利用する方法

ETL / ELT ツールのCData Sync を使って、Kafka のデータのHeroku Postgres へのETL パイプラインをノーコードで作成する方法を解説します。

宮本航太
プロダクトスペシャリスト

最終更新日:2024-01-01

この記事で実現できるKafka 連携のシナリオ

こんにちは!プロダクトスペシャリストの宮本です。

CData Sync は、いろいろなシナリオのデータレプリケーション(同期)を行うことができるスタンドアロンのアプリケーションです。例えば、sandbox および本番インスタンスのデータをデータベースに同期することができます。Kafka のデータ をHeroku 上のPostgreSQL に同期することで、Salesforce の通常オブジェクトに加えて、Salesforce 外部オブジェクト(Salesforce Connect)としてKafka のデータへのアクセスが可能になります。

要件

本レプリケーション例では、次が必要です:

  1. CData Sync (試用版もしくは商用版)、およびKafka のレプリケーションに必要なライセンス。
  2. Heroku Postgress を含むHeroku app および、Heroku Connect アドオン許可。
  3. Salesforce アカウント。

レプリケーション同期先の設定

CData Sync を使って、Kafka のデータ をHeroku 上のPostgreSQL データベースにレプリケーションできます。本記事では、Heroku 上の既存のPostgreSQL を使用します。PostgreSQL データベースをレプリケーション先に指定するには、[接続]タブから進みます。

  1. [同期先]タブをクリックします。
  2. PostgreSQL を同期先として選択します。

    PostgreSQL への接続には、Port(デフォルトでは5432)、およびデータベース接続プロパティを設定し、サーバーに認証するuser およびpassword を設定します。データベースプロパティが指定されていない場合には、ユーザーのデフォルトデータベースに接続します。

  3. 接続のテスト]をクリックして、正しく接続できているかをテストします。 PostgreSQL データベースの接続設定。
  4. [変更を保存]をクリックします。

Kafka 接続の設定

データソース側にKafka を設定します。[接続]タブをクリックします。

  1. [接続の追加]セクションで[データソース]タブを選択します。
  2. Kafka アイコンをデータソースとして選択します。プリインストールされたソースにKafka がない場合には、追加データソースとしてダウンロードします。
  3. 接続プロパティに入力をします。

    Apache Kafka 接続プロパティの取得・設定方法

    .NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされ、自動的に本製品と一緒にインストールされます。 別のインストール方法を利用する場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0 をインストールしてください。

    Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。

    デフォルトでは、本製品はデータソースとPLAINTEXT で通信し、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化するには:

    1. UseSSLtrue に設定し、本製品がSSL 暗号化を使用するように構成します。
    2. SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします。

    Apache Kafka への認証

    Apache Kafka データソースは、次の認証メソッドをサポートしています:

    • Anonymous
    • Plain
    • SCRAM ログインモジュール
    • SSL クライアント証明書
    • Kerberos

    Anonymous

    Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 こうした接続はanonymous(匿名)と呼ばれます。

    匿名認証を行うには、このプロパティを設定します。

    • AuthSchemeNone

    その他の認証方法については、ヘルプドキュメントを参照してください。 データソースの接続設定(NetSuite の例)。

  4. [接続のテスト]をクリックして、正しく接続できているかをテストします。
  5. [変更を保存]をクリックします。

Kafka インスタンス毎のクエリの設定

Data Sync はレプリケーションをコントロールするSQL クエリを簡単なGUI 操作で設定できます。 レプリケーションジョブ設定には、[ジョブ]タブに進み、[ジョブを追加]ボタンをクリックします。 次にデータソースおよび同期先をそれぞれドロップダウンから選択します。 レプリケーションジョブのソースおよび同期先を選択。

テーブル全体をレプリケーションする

テーブル全体をレプリケーションするには、[テーブル]セクションで[テーブルを追加]をクリックします。表示されたテーブルリストからレプリケーションするテーブルをチェックします。

テーブル全体をレプリケーション(NetSuite の例)。

テーブルをカスタマイズしてレプリケーションする

レプリケーションはテーブル全体ではなく、カスタマイズが可能です。[変更]機能を使えば、レプリケーションするカラムの指定、同期先でのカラム名を変更しての保存、ソースデータの各種加工が可能です。レプリケーションのカスタマイズには、ジョブの[変更]ボタンをクリックしてカスタマイズウィンドウを開いて操作を行います:

  • チェックボックスでフィールドを追加もしくは削除 Choose which fields to replicate.
  • カラムリストの下に新しく計算されたフィールドを追加する Add a computed field.
  • フィルタセクションを利用してフィルタを追加する Add a replication filter.

インターフェースを使って変更を行うと、レプリケーションのSQL クエリは以下のようなシンプルなものから:

REPLICATE [SampleTable_1]

次のような複雑なものになります:

REPLICATE [SampleTable_1] SELECT [Id], [Column1] FROM [SampleTable_1] WHERE [Column2] = 100

レプリケーションのスケジュール起動設定

[スケジュール]セクションでは、レプリケーションジョブの自動起動スケジュール設定が可能です。反復同期間隔は、15分おきから毎月1回までの間で設定が可能です。

スケジュール起動設定。

レプリケーションジョブを設定したら、[変更を保存]ボタンを押して保存します。複数のKafka のデータ のジョブを作成して、Salesforce の外部オブジェクトとして利用可能です。

外部オブジェクトとしてKafka データレプリケーションにアクセス

Kafka のデータ がHeroku 上のPostgreSQL データベースとしてレプリケーションされたので、Heroku のOData インターフェースを設定し、Salesforce Connect から外部オブジェクトとしてデータ連携できるようにします。

Heroku のOData Service を設定します

まずは、Heroku 上のPostgreSQL データベースに複製されたKafka のデータ への接続のために、データベースに対しHeroku External Object を設定します。

  1. Heroku ダッシュボードで、[Connect Add-on] をクリックします。
  2. [External Objects]を指定します。はじめてHeroku External Object を使用する場合には、OData Server のログインクレデンシャルを作成するようにナビゲートされます。
  3. OData service URL およびクレデンシャルを確認します。このクレデンシャルをSalesforce Connect 接続時に利用します。
  4. [Data Sources]において、前のプロセスで作成したレプリケーション済みデータベースを設定します。
Heroku Connect でのOData サービスの設定。

詳しくは、こちらのHeroku documentation を参照してください。

Salesforce の外部データソースの設定

Heroku のOData サービスの設定が終わったら、Salesforce Connect を使って、複製されたKafka のデータ のデータに外部データソースとして連携します。

  1. Salesforce で設定をクリックします。
  2. Administration (管理)セクションで、[データ]→[外部データソース]をクリック。
  3. データソースパラメータプロパティを設定します:
    • External Data Source: Salesforce UI に表示される名前
    • Name: API の一位の識別子
    • Type: Salesforce Connect: OData 4.0
    • URL: Heroku Connect のOData エンドポイント
    • Format: JSON
  4. 認証の設定:
    • Identity Type: Named Principal
    • Authentication Protocol: Password Authentication
    • Username: Heroku Connect username
    • Password: Heroku Connect password
  5. [保存]をクリック。
Salesforce 外部オブジェクトの設定。

Kafka オブジェクトの同期

Salesforce の外部データソース登録が終わったら、次の方法でKafka 外部データソースに変更を反映させます。Kafka テーブルの定義とKafka 外部オブジェクトの定義を同期します。

  1. 作成した外部データソースのリンクをクリック。
  2. [Validate and Sync]をクリック
  3. Kafka テーブルを選択して、[同期]をクリックします。
外部データソースの同期。

Salesforce オブジェクトとしてKafka データにアクセス

これで、レプリケーションされたKafka エンティティに対して、Salesforce の通常オブジェクトと同じように外部オブジェクトとしてアクセスが可能になりました。

是非、CData Sync の30日の無償評価版 をダウンロードして、Salesforce との連携をお試しください!

関連コンテンツ

トライアル・お問い合わせ

30日間無償トライアルで、CData のリアルタイムデータ連携をフルにお試しいただけます。記事や製品についてのご質問があればお気軽にお問い合わせください。