
こんにちは、プロダクトマネージメント@for Apps の宮本です!
CData Sync に変更データキャプチャ(CDC)の新モード「拡張型CDC 機能」がV24.3で追加されました!従来のバッチ型に代わり、トランザクションログをリアルタイムで監視し、即座に変更データをキャプチャするリアルタイムデータストリーミングを実現できるようになります。また、同期先DB への連携も任意のタイミングで設定できるため、リアルタイム連携のほか1時間ごとのスケジュール実行など、柔軟な運用を実現することができます。現状で使用可能なデータソースは PostgreSQL と Oracle だけですが、今後は他のデータベースコネクタも対象となる予定です。
拡張型CDC 機能について
拡張型CDC 機能はリアルタイムでデータソースとなるデータベースのログを監視し、変更が行われたタイミングでSync 内部に取り込まれ、スケジュールのタイミングで同期先DB へ送信されます。この機能でのデータの流れは以下のパートで分けることができます。
①変更データのキャプチャ&ステージファイルへの保存
②時間起動でステージファイルを同期先DBに連携
それでは各パートについてどのような仕組になっているのか見ていきましょう。
①変更データのキャプチャ&ステージファイルへの保存
PostgreSQL を例にすると、CData Sync はPostgreSQL のトランザクションログのWAL ファイルを読み続け、変更があればすぐにSync 内部(ステージエリア)にレコードが転送され保持します。

Sync 内部ではjson ファイル(ステージファイル)でのレコード管理しています。

中身は以下のように保存されています。

ジョブの追加オプションにて「stage.file.max.rows=150000 (※件数指定)」のようにすることでステージファイルの最大容量を調整することができます。
なお、ステージファイルは同期先DB へのレプリケーションが完了すると中身を削除したままファイルは保持され続け、次回以降取得データがあれば同じファイルに書き込まれていきます。
②時間起動でステージファイルを同期先DBに連携
同期先DB へのレプリケーションはこれまでのジョブと同じように設定した時間で行われます。リアルタイム連携を行いたい場合は、1分間隔やCRON 式でさらに短い間隔で行うよう設定することで実現できます。
同期先DB へのレプリケーションが成功すると、最後にステージファイルの中身を削除します。(変更データがあれば再度同ファイルに書き込まれる)
使い方
ではPosgtreSQL での拡張型CDC 機能を試していきましょう。
PostgreSQL での設定
本機能が使えるPostgreSQL のバージョンは12 以降となります。さらにこれまでのCDC 機能とは別のCDC プラグイン(PostgreSQL 上で使用するもの)も違います。そのため、既存のCDC ジョブを引き継いで使用することはできません。また、UTF-8 でのデータベースのみサポートしており、それ以外の場合は正しくレプリケーションされない可能性がありますので併せてご確認ください。
論理レプリケーションの有効化
まずは論理レプリケーションを有効にするために、wal_level パラメータがlogical かを確認し、違う場合は設定します。
1.SHOW wal_level; で設定値確認
2.logical ではない場合は設定
「ALTER SYSTEM SET wal_level = logical;」
3.PostgreSQL 再起動
パブリケーションの設定
次はパブリケーションを設定します。これまでのPostgreSQL でのCDC プラグインではパブリケーションの設定は不要でしたが、拡張型CDC 機能ではpgoutput というPostgreSQL 標準のCDC プラグインを使用するようになり、パブリケーションの設定が必要となります。これを行うことで、履歴管理されるようになります。
設定方法は以下のとおりです。
- CREATE PUBLICATION cdatasync_pub1 FOR ALL TABLES;
個々のテーブルで管理したい場合は "FOR TABLE table1, table2, ...;" と指定してください。
論理レプリケーションスロットの作成
既存のCDC 機能と同じように論理レプリケーションスロットを作成します。ただし、先ほどお伝えしたようにプラグインが変わっています。拡張型CDC ではpgoutput プラグインを使用しますので以下のように指定します。
- SELECT pg_create_logical_replication_slot('SlotName', 'pgoutput');
これでPostgreSQL 側の設定は完了です。
CData Sync での設定~実行
コネクション作成
拡張型CDC 機能を使う場合は、既存のPostgreSQL コネクタではなく各DBのNative Driver を使用します。

Oracle であれば ojdbc.jar が、PostgreSQL であればpostgresql-XXXX.jar を使用します。これらはデフォルトで格納されていますのですぐに接続設定可能です。
それでは、PostgreSQL (Native) を選択して以下のように接続設定を行います。

ジョブ作成
ジョブ一覧画面で「ジョブを追加」ボタンをクリックして拡張型CDC のジョブを作成します。その際、先ほど作成したパブリケーション名と論理レプリケーションスロットを設定します。

連携対象とするテーブルを選択して実行します。初回の場合は全件レプリケーションになります。

そうすると、CDC エンジンも自動的に起動される形になります。(概要タブ)

CDC エンジンとは、データソース(PostgreSQL)から変更情報を取得するための機能で、手動での起動停止が可能です。
スケジュール設定
スケジューラーを設定します。今回は1分間隔での連携を行いたいので、以下のように選択して保存します。

これで毎分レプリケーションが行われる設定になりました。ちなみに変更情報が無ければ同期先への更新処理は行われません。

変更データの反映確認
ではPostgreSQL を更新してしみます。Name カラムの値を1件だけ更新してみます。

更新後、CDC エンジン側では即座に1件の変更データを取得することができ、

同期先DB へのレプリケーション処理も1分間隔ですので、すぐに実行されました。

SQLServer の連携テーブルをみると正しく更新されていました。

終わりに
いかがでしたでしょうか。新しく追加された「拡張型CDC」機能を使うことで、よりリアルタイムでの変更データキャプチャが可能となり、CDC ユースケースでの利用方法の幅が広がりました。CData Sync は30日間の無償トライアルが可能ですので、ぜひお試しください!
https://www.cdata.com/jp/sync/trial
関連コンテンツ