ビッグデータをApache Kafka にフィードする方法

by Murat Cengiz | 2019年10月03日

ビッグデータをApache Kafka にフィードする方法

Apache Kafka はさまざまなシステムやソリューションへと統合されています。膨大なリアルタイムデータのフィルタリングや処理、異なるソースから1 箇所に集約されたハンドラーへメトリックデータをログし作成したりします。CData Sync を使うことで、そのようなソリューションを、あらゆるCRM、ERP、Analytics (アナリティクス) ソフトウェアへ適応できます。

Apache Kafka をCData Sync の同期先に設定

CData Sync にKafka を同期先として設定するのは容易です。シンプルにServer とPort を設定するだけで、あとはレプリケーションコマンドを作成するだけです。追加の構成は不要です。接続の設定には、Connections ページへ移動し、Destinations タブをクリック、Kafka を選択します。

Server とPort プロパティを指定します。認証が有効になっている場合、User と Password プロパティも同様に指定します。Save Changes とText Connection をクリックし、変更を保存。CData Sync がKafka Server へ接続できるか確かめます。

その他のプロパティも利用可能で、Advnced タブで設定ができます。

  • Enable Idempotence : メッセージが一度だけ送信されるようにします。ある特定のシナリオでメッセージの重複生成がありえます。確かめるために、ジョブを実行したあとにクライアント側で、消費された結果数を Records Affected(影響を受けるレコード)ステータスと比較できます。
  • Serialization Format : メッセージを生成するフォーマットを指定します。JSON、XML、CSV が利用できます。
  • Topic: 指定した場合、ターゲットトピックのレプリケーションとして、プロパティがテーブル名をオーバライドします。

差分更新はどのように機能するか

CData Sync はシームレスに差分更新します。サーバー側で構成は必要ありません。Source とそのソーステーブルによって、必要なプロパティは事前に構成されます。

SQL Server や他のデータベースツールと異なり、Kafka はステートを保存する方法がありません。CData Sync はこの課題を回避するためにローカルのSQLite Database を使います。テーブルが複製された前回の時間を保存し、最新レコードをフィルターするためにそのタイムスタンプを利用します。ほとんどのエンタープライズシステムが、レコード最新更新日を指定するためのシステム列を提供していますので、この目的には十分です。

たとえば、QuickBooks Online 内のAccounts テーブルにそのような列があります。そのテーブルを複製します。

3 つのレコードを修正したあとに、もう1 つ複製します。

一部のテーブルはレコード最終更新日が入るための自動更新列がありません。そのようなケースでは最初からすべての結果を複製するしか方法がありません。Kafka は新しい結果を区別するために、利用されるメッセージに付いているタイムスタンプ列を提供しています。

QuickBooks Online の部テーブルには、最新更新日を指定する列がありません。そのため、まずこのテーブルを複製します。

2 つの新しいレコードを追加しつつ、レプリケーションを再度実行すると、以下が生成されます。

クエリの最適化

CData Sync により生成されるメッセージサイズを管理するにはさまざまな方法があります。Kafka サーバーの構成により、最適化が必要になるかもしれません。または、単純にレプリケーションパフォーマンスを少し向上させるためにも、最適化を考慮する価値があります。

** Compression Type (圧縮タイプ):** 生成データの圧縮方法を指定します。gzip、lz4、snappy、もしくはnone (指定しない)オプションがあります。none 以外のオプションを指定するとメッセージのペイロードを削減できます。

Maximum Batch Size (最大バッチサイズ): 1 回のリクエストで送信される最大バッチサイズを指定します。バッチはメッセージでいっぱいです。バッチが待機中のときに、メッセージであふれないよう事前に送信されます。この値を低くするとパフォーマンスに影響がでるかもしれませんが、生成されるメッセージがサーバーの最大許容サイズを超えている場合には必要になるかもしれません。

Exclude Columns(列を除外する): 1 つの列が大きすぎる場合、変換機能により特定の列を出力メッセージから省くことができます。これは集計列で最も広く使われています。列を除外するためには、対象のジョブを選択し、目的のテーブル横の変換ボタンをクリックします。

次に、集計列の選択を外します。

最後に、OK をクリックし保存します。

データフィード管理にCData Sync を設定する

Scheduled Jobs (ジョブのスケジュール化)を使うことで、全自動でレコードの取り込みを設定できます。これはKafka Consumer が新しい入力を常に最新に保つために使用できます。特定のデータセットに合わせ時間を調整することができます。

ジョブをスケジュールするために、目的のジョブを選択します。Schedule セクションより、"Schedule this job to run automatically" にチェックを入れます。最後に、適切なインターバルを選択します。

まとめ

CData Sync とApache Kafka の組み合わせは強力です。あらゆるデータソースからKafka Consumer へレプリケーションを可能にし、アナリティクスからログまで広範囲な要件をサポートします。新しいレコードを自動的に検出しジョブを予定することにより、そのSubscriber に対し新しいデータの安定したフローが保証されます。圧縮、変換、およびその他最適化により、データフォーマットやサイズ、頻度を超えてよりコントロール可能になります。CData Sync の 30 日の無償評価版をダウンロードし、ビッグデータをApache Kafka で整理しましょう!

関連コンテンツ

トライアル・お問い合わせ

30日間無償トライアルで、CData のリアルタイムデータ連携をフルにお試しいただけます。記事や製品についてのご質問があればお気軽にお問い合わせください。