製品をチェック

製品の詳細・30日間の無償トライアルはコチラ

CData Sync

Apache Cassandra へのSpark データのETL パイプラインを作ってデータを統合する方法

データパイプラインツールのCData Sync を使って、Spark データのApache Cassandra へのETL パイプラインをノーコードで作成してデータを統合する方法を解説します。

宮本航太
プロダクトスペシャリスト

最終更新日:2024-01-19
spark ロゴ

CData

sync ロゴ画像
Cassandra ロゴ

こんにちは!プロダクトスペシャリストの宮本です。

CData Sync は、数百のSaaS / DB のデータをCassandra をはじめとする各種DB / データウェアハウスにノーコードで統合・レプリケーション(複製)が可能なデータパイプラインツールです。本記事では、Spark データをCData Sync を使ってCassandra に統合するデータパイプラインを作っていきます。

CData Sync とは?

CData Sync は、レポーティング、アナリティクス、機械学習、AI などで使えるよう、社内のデータを一か所に統合して管理できるデータ基盤をノーコードで構築できるETL ツールで、以下の特徴を持っています。

  1. Spark をはじめとする数百種類のSaaS / DB データに対応
  2. Cassandra など多くのRDB、データレイク、データストア、データウェアハウスに同期可能
  3. 業務データのデータ分析基盤へのETL / ELT 機能に特化し、極限まで設定操作をシンプルに
  4. 主要なSaaS データの差分更新やCDC(Change Data Capture、変更データキャプチャ)のサポート
  5. フレキシブルなSQL での取得データの操作

CData Sync では、1.データソースとしてSpark の接続を設定、2.同期先としてCassandra の接続を設定、3.Spark からCassandra へのレプリケーションジョブの作成、という3つのステップだけでレプリケーション処理を作成可能です。以下に具体的な設定手順を説明します。

1.データソースとしてSpark の接続を設定

まずはじめに、CData Sync のブラウザ管理コンソールにログインします。CData Sync のインストールをまだ行っていない方は本記事の製品リンクからCData Sync をクリックして、30日の無償トライアルとしてCData Sync をインストールしてください。インストール後にCData Sync が起動して、ブラウザ設定画面が開きます。

それでは、データソース側にSpark を設定していきましょう。左の[接続]タブをクリックします。

  1. [+接続の追加]ボタンをクリックします。 コネクションの追加。
  2. [データソース]タブを選択して、リスト表示されるデータソースを選ぶか、検索バーにデータソース名を入力して、Spark を見つけます。
  3. Spark の右側の[→]をクリックして、Spark アカウントへの接続画面を開きます。もし、Spark のコネクタがデフォルトでCData Sync にインストールされていない場合には、ダウンロードアイコン(コネクタのアップロードアイコン)をクリックし、[ダウンロード]をクリックすると、CData Sync にコネクタがインストールされます。 データソースの追加。
  4. 接続プロパティにSpark に接続するアカウント情報を入力をします。

    SparkSQL への接続

    SparkSQL への接続を確立するには以下を指定します。

    • Server:SparkSQL をホストするサーバーのホスト名またはIP アドレスに設定。
    • Port:SparkSQL インスタンスへの接続用のポートに設定。
    • TransportMode:SparkSQL サーバーとの通信に使用するトランスポートモード。有効な入力値は、BINARY およびHTTP です。デフォルトではBINARY が選択されます。
    • AuthScheme:使用される認証スキーム。有効な入力値はPLAIN、LDAP、NOSASL、およびKERBEROS です。デフォルトではPLAIN が選択されます。

    Databricks への接続

    Databricks クラスターに接続するには、以下の説明に従ってプロパティを設定します。Note:必要な値は、「クラスター」に移動して目的のクラスターを選択し、 「Advanced Options」の下にある「JDBC/ODBC」タブを選択することで、Databricks インスタンスで見つけることができます。

    • Server:Databricks クラスターのサーバーのホスト名に設定。
    • Port:443
    • TransportMode:HTTP
    • HTTPPath:Databricks クラスターのHTTP パスに設定。
    • UseSSL:True
    • AuthScheme:PLAIN
    • User:'token' に設定。
    • Password:パーソナルアクセストークンに設定(値は、Databricks インスタンスの「ユーザー設定」ページに移動して「アクセストークン」タブを選択することで取得できます)。

    データソースの追加。
  5. [作成およびテスト]をクリックして、正しくSpark に接続できているかをテストして保存します。これでレプリケーションのデータソースとしてSpark への接続が設定されました。

2.同期先としてCassandra の接続を設定

次に、Spark データを書き込む先(=同期先)として、Cassandra を設定します。同じく[接続]タブを開きます。

  1. [+接続の追加]ボタンをクリックします。
  2. [同期先]タブを選択して、リスト表示されるデータソースを選ぶか、検索バーにデータソース名を入力して、Cassandra を見つけます。
  3. Cassandra の右側の[→]をクリックして、Cassandra データベースへの接続画面を開きます。もし、Cassandra のコネクタがデフォルトでCData Sync にインストールされていない場合には、ダウンロードアイコン(コネクタのアップロードアイコン)をクリックし、[ダウンロード]をクリックすると、CData Sync にコネクタがインストールされます。 Apache Cassandra を同期先に設定。
  4. 必要な接続プロパティを入力します。CData Sync は、ログインクレデンシャルを使ったベーシック認証および、DataStax Enterprise(DSE)Cassandra の認証をサポートしています。以下が、認証メソッドで要求される接続プロパティです。

    AuthScheme を対応するシステムの認証に設定します。cassandra.yaml ファイルの認証方法にauthenticator プロパティを設定します。ファイルは通常/etc/dse/cassandra か、DSN Cassandra では、DSE Unified Authonticator にあります。

    Basic Authentication

    ベーシック認証は、Cassandra のビルトインのデフォルトPasswordAuthenticator でサポートされています。

    • AuthScheme プロパティを 'BASIC' に設定し、Userおよび Password プロパティを設定します。
    • cassandra.yaml ファイルでauthenticator プロパティを 'PasswordAuthenticator' に設定します。

    Kerberos 認証

    Kerberos 認証は、DataStax Enterprise Unified Authentication でサポートされています。

    • AuthScheme プロパティを 'KERBEROS' に設定し、Userおよび Password プロパティを設定します。
    • SKerberosKDCKerberosRealmKerberosSPN プロパティを設定します。
    • cassandra.yaml ファイルでauthenticator プロパティを "com.datastax.bdp.cassandra.auth.DseAuthenticator" に設定します。
    • dse.yaml ファイルのauthentication_options セクションを変更し、keytab、service_principle、http_principle、qop プロパティを'kerberos' に設定します。
    • dse.yaml ファイルの セクションを変更し、keytab、service_principle、http_principle、qop プロパティを設定します。

    LDAP 認証

    LDAP 認証は、DataStax Enterprise Unified Authentication でサポートされています。

    • AuthScheme プロパティを 'LDAP' に設定し、Userおよび Password プロパティを設定します。
    • cassandra.yaml ファイルでauthenticator プロパティを "com.datastax.bdp.cassandra.auth.DseAuthenticator" に設定します。
    • dse.yaml ファイルのauthentication_options セクションを変更し、keytab、service_principle、http_principle、qop プロパティを'ldap' に設定します。
    • dse.yaml ファイルのldap_options セクションを変更し、server_host、server_port、search_dn、search_password、user_search_base、user_search_filter プロパティを設定します。

    PKI の使用

    CData Sync でクライアント証明書をSSLClientCertSSLClientCertTypeSSLClientCertSubjectSSLClientCertPassword で指定できます。

  5. [作成およびテスト]をクリックして、正しく接続できているかをテストします。
  6. 同期先の設定。
  7. これで同期先としてCassandra を設定できました。CData Sync では、Cassandra のデータベース名を指定するだけで、同期するSpark に併せたテーブルスキーマを自動的にCREATE TABLE してくれます。同期データに合わせたテーブルを事前に作成するなどの面倒な手順は必要ありません。もちろん、既存テーブルにマッピングを行いデータ同期を行うことも可能です。

3.Spark からCassandra へのレプリケーションジョブの作成

CData Sync では、レプリケーションをジョブ単位で設定します。ジョブは、Spark からCassandra という単位で設定し、複数のテーブルを含むことができます。レプリケーションジョブ設定には、[ジョブ]タブに進み、[+ジョブを追加]ボタンをクリックします。 ジョブの追加Salesforce の例)。

[ジョブを追加]画面が開き、以下を入力します:

  1. 名前:ジョブの名前
  2. データソース:ドロップダウンリストから先に設定したSpark を選択
  3. 同期先:先に設定したCassandra を選択
データソースの設定Salesforce の例)。

すべてのオブジェクトをレプリケーションする場合

Spark のすべてのオブジェクト / テーブルをレプリケーションするには、[種類]セクションで[すべて同期]を選択して、[ジョブを追加]ボタンで確定します。

作成したジョブ画面で、右上の[▷実行]ボタンをクリックするだけで、全Spark テーブルのCassandra への同期を行うことができます。

オブジェクトを選択してレプリケーションする場合

Spark から特定のオブジェクト / テーブルを選択してレプリケーションを行うことが可能です。[種類]セクションでは、[標準(個別設定)]を選んでください。

次に[ジョブ]画面で、[タスク]タブをクリックし、[タスクを追加]ボタンをクリックします。 ジョブへのタスク追加Salesforce の例)。

するとCData Sync で利用可能なオブジェクト / テーブルのリストが表示されるので、レプリケーションを行うオブジェクトにチェックを付けます(複数選択可)。[ジョブを追加]ボタンで確定します。

タスク選択(Salesforce の例)。

作成したジョブ画面で、[▷実行]ボタンをクリックして(もしくは各タスク毎の実行ボタンを押して)、レプリケーションジョブを実行します。 作成したジョブの実行(Salesforce の例)。

このようにとても簡単にSpark からCassandra への同期を行うことができました。

CData Sync の主要な機能を試してみる:スケジューリング・差分更新・ETL

ジョブのスケジュール起動設定

CData Sync では、同期ジョブを1日に1回や15分に1回などのスケジュール起動をすることができます。ジョブ画面の[概要]タブから[スケジュール]パネルを選び、[⚙設定]ボタンをクリックします。[間隔]と同期時間の[毎時何分]を設定し、[保存]を押して設定を完了します。これでCData Sync が同期ジョブをスケジュール実行してくれます。ユーザーはダッシュボードで同期ジョブの状態をチェックするだけです。 スケジュール実行設定。

差分更新

CData Sync では、主要なデータソースでは、差分更新が可能です。差分更新では、最後のジョブ実行時からデータソース側でデータの追加・変更があったデータだけを同期するので、レプリケーションのクエリ・通信のコストを圧倒的に抑えることが可能です。

差分更新を有効化するには、ジョブの[概要]タブから「差分更新」パネルを選び、[⚙設定]ボタンをクリックします。[開始日]と[レプリケーション間隔]を設定して、[保存]します。

SQL での取得データのカスタマイズ

CData Sync は、デフォルトではSpark のオブジェクト / テーブルをそのままCassandra に複製しますが、ここにSQL、またはdbt 連携でのETL 処理を組み込むことができます。テーブルカラムが多すぎる場合や、データ管理の観点から一部のカラムだけをレプリケーションしたり、さらにデータの絞り込み(フィルタリング)をしたデータだけをレプリケーションすることが可能です。

ジョブの[概要]タブ、[タスク]タブへと進みます。選択されたタスク(テーブル)の[▶]の左側のメニューをクリックし、[編集]を選びます。タスクの編集画面が開きます。

UI からカラムを選択する場合には、[カラム]タブから[マッピング編集]をクリックします。レプリケーションで使用しないカラムからチェックを外します。

SQL を記述して、フィルタリングなどのカスタマイズを行うには、[クエリ]タブをクリックし、REPLICATE [テーブル名]の後に標準SQL でフィルタリングを行います。 レプリケーションのカスタマイズ設定。

Spark からCassandra へのデータ同期には、ぜひCData Sync をご利用ください

このようにノーコードで簡単にSpark データをCassandra にレプリケーションできます。データ分析、AI やノーコードツールからのデータ利用などさまざまな用途でCData Sync をご利用いただけます。30日の無償トライアルで、シンプルでパワフルなデータパイプラインを体感してください。

日本のユーザー向けにCData Sync は、UI の日本語化、ドキュメントの日本語化、日本語でのテクニカルサポートを提供しています。

CData Sync の 導入事例を併せてご覧ください。

関連コンテンツ

トライアル・お問い合わせ

30日間無償トライアルで、CData のリアルタイムデータ連携をフルにお試しいただけます。記事や製品についてのご質問があればお気軽にお問い合わせください。