こんにちは!ウェブ担当の加藤です。マーケ関連のデータ分析や整備もやっています。
Oracle Data Integrator(ODI)はOracle エコシステムのハイパフォーマンスなデータ統合プラットフォームです。CData JDBC Driver for ApacheKafka を使えば、OCI をはじめとするETL ツールからKafka データにJDBC 経由で簡単に読み取りと書き込みを実現できます。リアルタイムKafka データをデータウェアハウス、BI・帳票ツール、CRM、基幹システムなどに統合すれば、データ活用もぐっと楽に。
CData のコネクタを使えば、Kafka API にリアルタイムで直接接続して、ODI 上で通常のデータベースと同じようにKafka データを操作できます。Kafka エンティティのデータモデルを構築、マッピングを作成し、データの読み込み方法を選択するだけの簡単なステップでKafka データのETL が実現できます。
ドライバーのインストール
ドライバーをインストールするには、インストールフォルダにあるドライバーのJAR ファイルと.lic ファイルをODI の適切なディレクトリにコピーします。
- UNIX/Linux(Agent なし):~/.odi/oracledi/userlib
- UNIX/Linux(Agent):$ODI_HOME/odi/agent/lib
- Windows(Agent なし):%APPDATA%\Roaming\odi\oracledi\userlib
- Windows(Agent):%APPDATA%\Roaming\odi\agent\lib
ODI を再起動してインストールを完了します。
モデルのリバースエンジニアリング
ODI の機能を使ってモデルをリバースエンジニアリングすることで、ドライバー側で取得したKafka データのリレーショナルビューに関するメタデータが取得できます。リバースエンジニアリング後、リアルタイムKafka データにクエリを実行してKafka テーブルのマッピングを作成できます。
-
ODI でリポジトリに接続し、「New」->「Model and Topology Objects」をクリックします。
- 表示されるダイアログの「Model」画面で、以下の情報を入力します。
- Name:ApacheKafka と入力します。
- Technology:Technology:Generic SQL(ODI がVersion 12.2+ の場合はMicrosoft SQL Server)を選択します。
- Logical Schema:ApacheKafka と入力します。
- Context:Global を選択します。
- 表示されるダイアログの「Data Server」画面で、以下の情報を入力します。
- Name:ApacheKafka と入力します。
- Driver List:Oracle JDBC Driver を選択します。
- Driver:cdata.jdbc.apachekafka.ApacheKafkaDriver と入力します。
- URL:接続文字列を含むJDBC URL を入力します。
BootstrapServers およびTopic プロパティを設定して、Apache Kafka サーバーのアドレスと、対話するトピックを指定します。
認可メカニズム
- SASL Plain:User およびPassword プロパティを指定する必要があります。AuthScheme は'Plain' に設定します。
- SASL SSL:User およびPassword プロパティを指定する必要があります。AuthScheme は'Scram'
に、UseSSL はtrue に設定します。
- SSL:SSLCert およびSSLCertPassword プロパティを指定する必要があります。UseSSL はtrue に設定します。
- Kerberos:User およびPassword プロパティを指定する必要があります。AuthScheme は'Kerberos' に設定します。
サーバー証明書を信頼する必要がある場合があります。そのような場合は、必要に応じてTrustStorePath およびTrustStorePassword を指定してください。
組み込みの接続文字列デザイナー
JDBC URL の作成の補助として、Kafka JDBC Driver に組み込まれている接続文字列デザイナーが使用できます。JAR ファイルをダブルクリックするか、コマンドラインからjar ファイルを実行します。
java -jar cdata.jdbc.apachekafka.jar
接続プロパティを入力し、接続文字列をクリップボードにコピーします。
一般的な接続文字列は次のとおりです。
jdbc:apachekafka:User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;
- Physical Schema 画面で、以下の情報を入力します。
- Name:ドロップダウンメニューから選択します。
- Database (Catalog):CData と入力します。
- Owner (Schema):Kafka にSchema を選択した場合は、選択したSchema を入力し、それ以外の場合はApacheKafka と入力します。
- Database (Work Catalog):CData と入力します。
- Owner (Work Schema):Kafka にSchema を選択した場合は、選択したSchema を入力し、それ以外の場合はApacheKafka と入力します。
- 開いたモデルで「Reverse Engineer」をクリックしてKafka テーブルのメタデータを取得します。
Kafka データの編集と保存
リバースエンジニアリング後、ODI でKafka データを操作できるようになります。
Kafka データを編集し保存するには、Designer ナビゲーターでモデルアコーディオンを展開し、テーブルを右クリックして「Data」をクリックします。「Refresh」をクリックしてデータの変更を取得します。変更が完了したら「Save Changes」をクリックします。
ETL プロジェクトの作成
次の手順に従って、Kafka からETL を作成します。SampleTable_1 エンティティをODI Getting Started VM に含まれているサンプルデータウェアハウスにロードします。
SQL Developer を開き、Oracle データベースに接続します。Connections ぺインでデータベースのノードを右クリックし、「New SQL Worksheet」をクリックします。
もしくは、SQLPlus を使用することもできます。コマンドプロンプトから、以下のように入力します。
sqlplus / as sysdba
- 以下のクエリを入力し、ODI_DEMO スキーマにあるサンプルデータウェアハウスに新しいターゲットテーブルを作成します。以下のクエリは、Kafka のSampleTable_1 テーブルに一致するいくつかのカラムを定義します。
CREATE TABLE ODI_DEMO.TRG_SAMPLETABLE_1 (COLUMN1 NUMBER(20,0),Id VARCHAR2(255));
- ODI でDesigner ナビゲーターのModels アコーディオンを展開し、ODI_DEMO フォルダの「Sales Administration」ノードをダブルクリックします。Model Editor でモデルが開きます。
- 「Reverse Engineer」をクリックします。TRG_SAMPLETABLE_1 テーブルがモデルに追加されます。
- プロジェクトの「Mappings」ノードを右クリックし、「New Mapping」をクリックします。マッピングの名前を入力し、「Create Empty Dataset」オプションを無効にします。Mapping Editor が表示されます。
- TRG_SAMPLETABLE_1 テーブルをSales Administration モデルからマッピングにドラッグします。
- SampleTable_1 テーブルをKafka モデルからマッピングにドラッグします。
- ソースコネクタポイントをクリックしてターゲットコネクタポイントにドラッグします。Attribute Matching ダイアログが表示されます。ここでは、デフォルトオプションを使用します。その場合、目的の動作はターゲットカラムのプロパティに表示されます。
- Mapping Editor のPhysical タブを開き、TARGET_GROUP の「SAMPLETABLE_1_AP」をクリックします。
- SAMPLETABLE_1_AP プロパティで、Loading Knowledge Module タブの「LKM SQL to SQL (Built-In)」を選択します。
これで、マッピングを実行してKafka データをOracle にロードできます。