by 庄司 里菜 | May 23, 2018

Lambdaで実現するデータ連携FaaS(Twitter to kintone編)

はじめに

本記事では、AWS LambdaCData Driversを使って、Twitterの特定ハッシュタグのツイート情報をサイボウズ社のkintoneに蓄積するアプリケーションを例に、サーバーレスなFaas(Function as a service)でマルチクラウド間のデータ連携を実現する方法をご紹介します。

アーキテクチャイメージ


本方式を活用した利用実績

イベントのハッシュタグをつけてツイートするだけで応募できる抽選会アプリを作成して、これまで下記のIT系のイベントにて、SNSを盛り上げイベントを楽しんでもらう企画として抽選会を実施してきました。

仙台IT文化祭では、2日間で累計約700名が来場したイベントで、景品が超豪華だったということもあり、ハッシュタグ「#sendaiitfes」でなんと約5400ツイートを集めました。 イベントブログ記事: ( http://www.cdata.com/jp/blog/News/20171031-sendaiitfes )

kintone上で作成した抽選会アプリ

実現方法

各クラウドサービスへの接続

TwitterおよびkintoneはREST APIでのデータ連携インタフェースを持っています。ただし、これらのAPIを通じてデータを連携させるには、それぞれのAPI仕様を理解する必要があります。
- kintoneのAPI仕様書
- TwitterのAPI仕様書
本方式では、各クラウドサービスが公開するAPIを標準化してJDBCの規格でSQLとしてアクセスできるCData JDBC Drivers製品を利用しています。それによりTwitterのツイート情報をSQLのSelect構文で取得して、kintoneにInsert構文でデータを取り込むといった処理をSQLという共通言語で行うことでシンプルなデータ連携Faasとして作成することが出来ます。

サンプルコード

JavaのプログラムからJDBC経由でTwitterおよびkintoneへアクセスしています。サンプルのJavaコードは以下のGitHubからもダウンロードできます。
Twitter2Kintone.java

package cdataj;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Arrays;

public class Twitter2Kintone{


	public static String UrlKintone = "jdbc:kintone:URL=https://****.cybozu.com;User=****;Password=****;RTK=****;";
	public static String UrlTwitter = "jdbc:twitter:OAuth Access Token Secret=****;OAuth Access Token=****;OAuth Client Id=****;OAuth Client Secret=****;RTK=****;";
	public static String TableNameKintone = "Twitter2Kintone";
	public static String HashtagKintone = "#qiita";

	public static String[] ListMember = { "" };  // Except From_User_Name

	public static void main(String[] args) {
		try {
			long start = System.currentTimeMillis();

			//Connection for kintone
			Class.forName("cdata.jdbc.kintone.KintoneDriver");
			//System.out.println(cdata.jdbc.kintone.KintoneDriver.getRTK());
			Connection connk = DriverManager.getConnection(UrlKintone);
			Statement statk = connk.createStatement();

			//Connection for Twitter
			Class.forName("cdata.jdbc.twitter.TwitterDriver");
			//System.out.println(cdata.jdbc.twitter.TwitterDriver.getRTK());
			Connection connt = DriverManager.getConnection(UrlTwitter);
			Statement statt = connt.createStatement();

			//Select max Twitter ID from kintone
			statk.execute("SELECT max(ID) AS MAX_ID FROM '" + TableNameKintone + "'");
			ResultSet rsk=statk.getResultSet();
			String maxid = "";
			while(rsk.next()){
				maxid = rsk.getString(1);
			}

			//Select Tweets from Twitter
			statt.execute("SELECT ID, CONCAT(FORMAT(Created_At,'yyyy-MM-dd'), 'T', FORMAT(Created_At,'HH:mm:ssZ')) AS Created_At, text, From_User_Id, From_User_Screen_Name, From_User_Name FROM Tweets WHERE SEARCHTERMS = '" + HashtagKintone + "' AND MIN_ID = '" + maxid + "';");
			ResultSet rst=statt.getResultSet();
			String insertstring = "";
			int affectedcount = 0;
			while(rst.next()){
				//Insert into kintone
				String cmd = "INSERT INTO '" + TableNameKintone + "' (ID, Created_At, text, From_User_Id, From_User_Screen_Name, From_User_Name) VALUES (?,?,?,?,?,?);";
				PreparedStatement pstmt = connk.prepareStatement(cmd,Statement.RETURN_GENERATED_KEYS);
				for(int i=1;i<=rst.getMetaData().getColumnCount();i++)
				{
					insertstring = rst.getString(i);
					if (insertstring == null){
						insertstring = "";
					}
					pstmt.setString(i, insertstring);
					System.out.println(rst.getMetaData().getColumnName(i) +"="+insertstring);
				}

				if(Arrays.asList(ListMember).contains(rst.getString(6))){
					//Skip
				}
				else{
					affectedcount += pstmt.executeUpdate();
				}
			}
			System.out.println(affectedcount+" rows are affected");
			long end = System.currentTimeMillis();
			System.out.println((end - start)  + "ms");
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

コード内の主要パートについて説明します。まず初めに、kintoneとTwitterに接続するためのJDBCの接続URL、および、蓄積するkintoneのアプリ名、Twitterのハッシュタグを設定します。今回は例として「#qiita」をつけてツイートしている情報としました。

	public static String UrlKintone = "jdbc:kintone:URL=https://****.cybozu.com;User=****;Password=****;RTK=****;";
	public static String UrlTwitter = "jdbc:twitter:OAuth Access Token Secret=****;OAuth Access Token=****;OAuth Client Id=****;OAuth Client Secret=****;RTK=****;";
	public static String TableNameKintone = "Twitter2Kintone";
	public static String HashtagKintone = "#qiita";

kintoneおよびTwitterへの接続URLの設定例は以下のCData社の製品マニュアルをご覧ください。

※Lambdaなどのサーバーレス環境で動作させるにはJDBCのURLにRTK(RunTimeKey)が必要となります。取得方法はCDataサポートよりお問い合わせください。

KintoneDriverとTwitterDriverクラスを呼び出し、それぞれのSaasに接続するコネクションを作成します。

			//Connection for kintone
			Class.forName("cdata.jdbc.kintone.KintoneDriver");
			//System.out.println(cdata.jdbc.kintone.KintoneDriver.getRTK());
			Connection connk = DriverManager.getConnection(UrlKintone);
			Statement statk = connk.createStatement();

			//Connection for Twitter
			Class.forName("cdata.jdbc.twitter.TwitterDriver");
			//System.out.println(cdata.jdbc.twitter.TwitterDriver.getRTK());
			Connection connt = DriverManager.getConnection(UrlTwitter);
			Statement statt = connt.createStatement();

既にkintoneに取り込まれたツイート情報の最大IDを取得します。

			//Select max Twitter ID from kintone
			statk.execute("SELECT max(ID) AS MAX_ID FROM '" + TableNameKintone + "'");
			ResultSet rsk=statk.getResultSet();
			String maxid = "";
			while(rsk.next()){
				maxid = rsk.getString(1);
			}

Twitterから特定のハッシュタグで、既にkintoneに取り込んだID以降のツイートをSelectして、KintoneにInsertします。

			//Select Tweets from Twitter
			statt.execute("SELECT ID, CONCAT(FORMAT(Created_At,'yyyy-MM-dd'), 'T', FORMAT(Created_At,'HH:mm:ssZ')) AS Created_At, text, From_User_Id, From_User_Screen_Name, From_User_Name FROM Tweets WHERE SEARCHTERMS = '" + HashtagKintone + "' AND MIN_ID = '" + maxid + "';");
			ResultSet rst=statt.getResultSet();
			String insertstring = "";
			int affectedcount = 0;
			while(rst.next()){
				//Insert into kintone
				String cmd = "INSERT INTO '" + TableNameKintone + "' (ID, Created_At, text, From_User_Id, From_User_Screen_Name, From_User_Name) VALUES (?,?,?,?,?,?);";
				PreparedStatement pstmt = connk.prepareStatement(cmd,Statement.RETURN_GENERATED_KEYS);
				for(int i=1;i<=rst.getMetaData().getColumnCount();i++)
				{
					insertstring = rst.getString(i);
					if (insertstring == null){
						insertstring = "";
					}
					pstmt.setString(i, insertstring);
					System.out.println(rst.getMetaData().getColumnName(i) +"="+insertstring);
				}

				if(Arrays.asList(ListMember).contains(rst.getString(6))){
					//Skip
				}
				else{
					affectedcount += pstmt.executeUpdate();
				}
			}

AWS Lambdaへのデプロイ

AWS Lambdaは、Javaコードおよび、JDBCのJarファイルを含めた形でZIPファイル化してアップロードすることが出来ます。
※ハンドラ名は、Javaコードのパッケージ名.クラス名.(例:cdataj.Twitter2Kintone::main)を指定

アップロードするファイルは、Eclipse等のIDEなどからエクスポートする際に、JDBCのJarを含める形で生成します。

kintoneアプリの準備

ツイート情報を蓄積するためのkintoneアプリを作成します。

アプリテンプレートをこちらのパスに格納しておりますのでインポートしてご利用ください。

本アプリでは、2つのJavascriptライブラリとCSSを利用しています。下記のページよりダウンロードして「JavaScript / CSSでカスタマイズ」に追加してください。


AWS Lambdaでのジョブの実行確認

それでは、AWS Lambda上でジョブのテスト実行を行ってみましょう。作成した関数の左上の「テスト」ボタンをクリックします。しばらくすると、実行結果、および、ログを確認できます。ログの内容を見ると、指定したハッシュタグのツイート情報が出力されていることがわかります。
あとは、このジョブをスケジュールなどを定義して定期的に実行できるように設定します。本記事ではこの手順は割愛します。

それでは、kintoneのアプリを開いてみましょう。ツイート内容が登録されていれば成功です。
「抽選タイム!」ボタンをクリックすると、当選者がダイアログとして表示されるようになります。これは、全ツイート内容の中からランダム関数を使って抽出しているのでツイート数が多い人が抽出される確率が上がります。ですので、「ツイートすればするほど当選確率があがる」とイベント企画のなかで事前に言っておくと、SNS上も盛り上がりますね!!

まとめ

AWS Lambdaで実現するデータ連携FaaS(Function As A Service)の実現手順でした。本記事では、Twitterからkintoneへの連携でしたが、Saas連携のところで利用しているCData JDBC Driverは約90を超える様々なデータソースへJDBCなどの規格のもとSQLでアクセスできますので、本方式でサーバーレスアーキテクチャで様々なSaas間連携ができるようになります。
本手順で使用しているCData JDBC Driverは30日間の無償評価版もございますので是非お試してください。



 
 
ダウンロード