こんにちは富士通研究所のymokです。
我々の部署ではストリームデータ処理基盤技術Dracenaを開発しています。 今回は、DracenaのベースとなっているFlinkを題材として、リアルタイム交通データをFlinkに流し込んで可視化する一連のシステム構築の様子を end-to-end でチュートリアルとして示したいと思います。
バス走行位置のリアルタイムデータ(GTFS-RT)
2020年8月に、東京都交通局による都営バスのリアルタイムロケーション情報の提供が始まりました(詳細はこちら)。今回はこのデータを対象として、チュートリアルデモを作成します。
このデータは、公共交通オープンデータ開発者サイトにアカウント登録することで取得できます。データの形式はGTFS-RTで、Protocol Buffers
によって提供されます。当然ながら、このデータを利用したアプリケーションやサービスを公開する場合は、公共交通オープンデータセンターの定める利用規約・ガイドラインを遵守してください。
システムの構成
今回開発するデモシステムの全体構成を下に示します。Flinkにリアルタイムバスロケーションデータを投入し、ストリーム処理した結果をElasticsearchにストアしてKibanaで可視化します。スケーラビリティを考慮すると、Flinkへのデータ投入はKafkaといった分散メッセージング基盤を介して行うべきですが、今回は構成をシンプルに保つために、公共交通オープンデータセンターからデータを取得し、SocketによってFlinkに送信する簡易的な中継サーバを開発し、そこを介する構成にしています。
なお、本チュートリアルでは、下のソフトウェアがすでにインストールされ、利用できる状態になっているものとします。また、全てサービスのは同じホスト上で動作しているものとします。
- Apache Flink 1.10.x
- Elasticsearch 5.6.x
- Kibana 5.6.x
なお、ElasticsearchとKibanaのライセンスはそれまでのApache License 2.0から「Server Side Public License」(SSPL)と「Elastic License」のデュアルライセンスへと変更が予定されています。
開発手順
それではデモシステムを作るための手順を示していきます。主な開発言語はJavaです。また、ビルドツールといてmavenを用います。
データ中継サーバの開発
まず、公共交通オープンデータセンターからデータを定期的に取得し、Socketで接続してきたクライアントに送信する中継サーバを開発します。
次のコマンドで空のプロジェクトを作りましょう。gruopID
やartifaciId
は適宜変えてください。
$mvn archetype:generate \ -DarchetypeArtifactId=maven-archetype-quickstart \ -DinteractiveMode=false \ -DgroupId=dev.fltech \ -DartifactId=relay
GTFS-RT形式のデータをProtocol Bufferで取得するライブラリを利用するため、生成されたpom.xmlの<dependencies>
内に次を追記します。
<dependency> <groupId>io.mobilitydata.transit</groupId> <artifactId>gtfs-realtime-bindings</artifactId> <version>0.0.5</version> </dependency>
このライブラリを用いて、コード内で
FeedMessage feed = FeedMessage.parseFrom(url.openStream());
とすれば FeedEntity
のリストが含まれた FeedMessage
が得られるので、次のようにして取り出せば各バスのIDと、位置情報(緯度経度)を取得できます。
for (FeedEntity entity : feed.getEntityList()) { String id = entity.getId(); VehiclePosition vpos = entity.getVehicle(); Position pos = vpos.getPosition(); float lat = pos.getLatitude(); float lon = pos.getLongitude(); }
私が日中に試した時は600件前後のデータを取得できました。この件数は時間帯によって異なり、深夜だと十数件にまで減るようです。
データの取得はこれでできたので、あとはSocketサーバとして待ち受け、接続してきたクライアントに対して、定期的に取得したデータを送信する処理を実装しましょう。今回はmavenによって生成されたApp.java
ファイルに直接コードを書いていきます。全てのソースを次に示します。
データ中継サーバの全コードを表示
package dev.fltech; import com.google.transit.realtime.GtfsRealtime.FeedEntity; import com.google.transit.realtime.GtfsRealtime.FeedMessage; import com.google.transit.realtime.GtfsRealtime.Position; import com.google.transit.realtime.GtfsRealtime.VehiclePosition; import java.io.PrintWriter; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.net.URL; public class App { public static void main(String[] args) throws Exception { ServerSocket serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress("127.0.0.1", 1234)); Socket socket = serverSocket.accept(); PrintWriter writer = new PrintWriter(socket.getOutputStream(), true); URL url = new URL( "https://api.odpt.org/api/v4/gtfs/realtime/ToeiBus?acl:consumerKey=[アクセストークン]" ); while (true) { //交通オープンデータを取得 FeedMessage feed = FeedMessage.parseFrom(url.openStream()); for (FeedEntity entity : feed.getEntityList()) { //取得したデータからIDと緯度、経度を取得しカンマ区切りにしてSocket送信 String id = entity.getId(); VehiclePosition vpos = entity.getVehicle(); Position pos = vpos.getPosition(); float lat = pos.getLatitude(); float lon = pos.getLongitude(); long timestamp = pos.getTimestamp(); writer.println( id + "," + String.valueOf(lat) + "," + String.valueOf(lon) + "," + String.valueOf(timestamp) ); } //30秒待機 Thread.sleep(30000); } } }
[アクセストークン]
の部分には公共交通オープンデータ開発者サイトにて発行されるものを指定します。Socket接続は1234番ポートで待ち受けています。公共交通オープンデータセンターへのデータ取得周期は、odpt:frequency
に合わせて30秒としています。Socketを介してFlinkに送信されるデータの内容は次のようになります。
E451,35.686702,139.826675,1610510580\n E132,35.666728,139.822672,1610510582\n T226,35.681041,139.764694,1610510581\n . .
コードを書いたらビルドします。Jar形式で実行できるように、pom.xmlの中に次を追記しておきます。
POMへの追記内容を全て表示
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>dev.fltech.App</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
これで次のコマンドを実行すれば、Jarとして実行可能なデータ中継サーバが、プロジェクトフォルダ内にtarget/relay-1.0-SNAPSHOT-jar-with-dependencies.jar
として出来上がります。
$mvn packages -DskipTests
FlinkのJobの開発
次に、中継サーバからデータを取得し、それを車両ごとにウィンドウ処理して、その結果をElasticsearchに出力するFlinkのJobを開発します。
次のようにしてFlinkのJobの雛形プロジェクトを作りましょう。groupId
とartifactId
は適宜変えてください。
$ mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ -DarchetypeVersion=1.11.0 \ -DinteractiveMode=false \ -DgroupId=dev.fltech \ -DartifactId=busview
生成されたプロジェクト内のpom.xmlの <dependencies>
内に、データ出力のために用いるElasticsearchコネクタの定義を次の通りに追記します。
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch5_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>
これで準備は整ったので、ストリーム処理を実装していきましょう。コードmavenによって生成されたStreamingJob.java
に書いていきます。
今回は、Socketで受信した文字列をコンマ[,]
で分解し、車両IDごとに振り分けた上で一定時間ごとにデータをバッファリングし、そのデータ群に対して特定の処理を行った結果をElasticsearchに出力するという流れを作ります。
Flinkのストリーム処理は、ストリームとしてsource
に入力されたデータに対して連鎖的に演算を適用し、その結果をsink
として出力する形で作っていきます。今回はその流れを source -> flatMap -> keyBy -> window -> apply -> sink
というフローで定義します。
sourceの定義
まずはSocketで1234番ポートで待ち受けている中継サーバをsource
として設定します。StreamExecutionEnvironment#socketTextStream
を用いれば、次のように記述するだけで中継サーバに対してSocketで自動的に接続してデータを受信し、DataStream<String>
型のデータストリームを生成してくれます。
DataStream<String> busStream = env.socketTextStream("127.0.0.1", 1234, "\n");
flatMapの定義
flatMapはストリーム内のデータに対して一律に共通の処理を施してくれるオペレータです。今回は DataStream<String>
型のデータを[,]
で分解し、車両IDと緯度、経度、タイムスタンプを保持するBus
オブジェクトに変換する処理を実装します。
busStream.flatMap(new FlatMapFunction<String,Bus>(){ @Override public void flatMap(String value, Collector<Bus> out) { String[] t = value.split(","); out.collect(new Bus( t[0], Float.parseFloat(t[1]), Float.parseFloat(t[2]), Float.parseLong(t[3]) ) ); } })
flatMapメソッドの第1引数であるvalue
がSource
から入ってくるテキストデータであり、第2引数であるout
のcollectメソッドを実行することで、処理結果を後段に渡します。今回はconnectにBus
オブジェクトとしてデータを指定しています。
Bus
オブジェクトは独自に定義したクラスであり、次の通りです。保持する内容はバスのIDと位置、タイムスタンプ、移動中かどうかのフラグという単純なものです。Elasticsearchコネクタに渡すために保持内容をHashMapで返すメソッドも実装しておきます。なお、HashMapに含める緯度経度はElasticsearchとKibanaで扱うために都合が良いため、Elasticsearchの提供するorg.elasticsearch.common.geo.GeoPoint
型にしています。
public static class Bus { public String id; // バスのID public double lat; // バスの現在位置の緯度 public double lng; // バス現在位置の経度 public boolean moving; // 移動中かどうか public long timestamp; // タイムスタンプ(UNIX_TIME) public Bus(String id, double lat, double lng, long timestamp) { this.id = id; this.lat = lat; this.lng = lng; this.timestamp = timestamp; } public Bus(String id, double lat, double lng, long timestamp, boolean moving) { this(id, lat, lng, timestamp); this.moving = moving; } public HashMap getHashMap() { HashMap<String, Object> map = new HashMap(); map.put("id", id); map.put("location", new GeoPoint(lat, lng)); map.put("moving", moving); return map; } }
keyByの定義
keyBy
はストリーム内のデータを、指定したキーごとに仕分けするオペレータです。引数には前段のflatMap
で出力したBus
型のデータが入ってくるので、ここでバスのIDをキーとして指定し、バスごとにストリームを仕分けします。keyByを作用させるとストリームの型はDataStream
型からKeyedStream
型に変わります。
.keyBy( value-> value.id )
windowの定義
window
は、データを一定期間ごとにバッファリングするオペレータです。今回は単純に時間ベースの重なりの無いTumbling Windowを作用させます。ウィンドウ幅は3分とします。window
を作用させるとストリームの型はKeyedStream
型からWindowedStream
型に変わります。
.timeWindow(Time.seconds(180))
window applyの定義
window apply
は、バッファリングしたデータに対して一括して特定の処理を行うオペレータです。applyメソッドの引数にはWindowFunction
を指定し、その中のapplyメソッドの中に処理を実装します。処理結果は第4引数のout
のcollectメソッドで後段に渡します。
.apply( new WindowFunction<Bus, Bus, String, TimeWindow>() { @Override public void apply( String key, TimeWindow window, Iterable<Bus> input, Collector<Bus> out ) { double sum_lng = 0f, sum_lat = 0f, d_lng = 0f, d_lat = 0f; long timestamp = 0l; int cnt = 0; boolean moving = false; for (Bus bus : input) { sum_lat += bus.lat; sum_lng += bus.lng; if (cnt == 0) { d_lat = bus.lat; d_lng = bus.lng; // とりあえずタイムスタンプはバッファの初めに入っているデータのものを利用 timestamp = bus.timestamp; } else { if (d_lat != bus.lat || d_lng != bus.lng) { moving = true; } } cnt++; } out.collect(new Bus(key, sum_lat / cnt, sum_lng / cnt, timestamp, moving)); } } )
今回は単純にバッファしたデータに対して、緯度、経度の平均値を求めるとともに、その間に緯度と経度が変化したかどうかで移動中フラグを立てるようにしました。
もちろん、バッファする時間を短くしたり、処理を行うタイミングを変えて平均値の計算をもっと効率的に行ったり、あるいはもっと複雑な処理を実装することもできます。例えば移動の軌跡を分析し、バスが「直進しているか」、「右左折したか」といった走行状況を時々刻々と判定することもできるかもしれません。
sinkの定義
sinkはストリーム処理の結果をFlinkの外へ出力するためのものです。今回はElasticsearchにデータを出力するので、ElasticsearchSink
を用います。この実装は少し長くなりますが、インデックスにデータを追加/更新するためのIndexRequest/UpdateRequest
を返すメソッドを、前段からデータが流れてくる度に呼び出されるprocess
メソッドの中から呼び出しています。
今回はElasticsearch側のインデックス名、タイプ名ともにbus
にするのでそれに合わせます。また、Elasticsearchに書き込むデータはBus#getHashMap
から得られる、バスのID、緯度経度、タイムスタンプ、移動状況フラグになります。
.addSink( new ElasticsearchSink<Bus>( config, tranportAddresses, new ElasticsearchSinkFunction<Bus>() { // データ追加時のリクエストを生成 public IndexRequest createIndexRequest(Bus bus) { return Requests .indexRequest() .index("bus") .type("bus") .id(bus.id) .source(bus.getHashMap()); } // データ更新時のリクエストを生成 public UpdateRequest createUpdateRequest(Bus bus) { return new UpdateRequest() .index("bus") .type("bus") .id(bus.id) .doc(bus.getHashMap()) .upsert(createIndexRequest(bus)); //upsertにより既存データが存在しない場合には自動的に追加がなされる } @Override public void process( Bus bus, RuntimeContext runtimeContext, RequestIndexer requestIndexer ) { requestIndexer.add(createUpdateRequest(bus)); } } ) )
これで一連のストリーム処理を実装できました。これまで説明したオペレータを含めた全コードを示しておきます。
Flinkジョブの全コードを表示
package dev.fltech; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink; import org.apache.flink.util.Collector; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.Requests; import org.elasticsearch.common.geo.GeoPoint; public class StreamingJob { public static void main(String[] args) { try { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Map<String, String> config = new HashMap<>(); config.put("cluster.name", "elasticsearch"); config.put("bulk.flush.max.actions", "1"); List<InetSocketAddress> tranportAddresses = new ArrayList<>(); tranportAddresses.add( new InetSocketAddress(InetAddress.getByName("localhost"), 9300) ); // Sinkにデータ中継サーバを設定 DataStream<String> busStream = env.socketTextStream( "127.0.0.1", 1234, "\n" ); busStream .flatMap( // データを解釈してBusオブジェクトに変換 new FlatMapFunction<String, Bus>() { @Override public void flatMap(String value, Collector<Bus> out) { System.out.println(value); String[] t = value.split(","); out.collect( new Bus(t[0], Float.parseFloat(t[1]), Float.parseFloat(t[2])) ); } } ) .keyBy(value -> value.id) // バスのIDごとに仕分け .timeWindow(Time.seconds(60)) // 60秒間蓄積 .apply( // 平均値算出と移動判定 new WindowFunction<Bus, Bus, String, TimeWindow>() { @Override public void apply( String key, TimeWindow window, Iterable<Bus> input, Collector<Bus> out ) { double sum_lng = 0f, sum_lat = 0f, d_lng = 0f, d_lat = 0f; int cnt = 0; boolean moving = false; for (Bus bus : input) { sum_lat += bus.lat; sum_lng += bus.lng; if (cnt == 0) { d_lat = bus.lat; d_lng = bus.lng; // とりあえずタイムスタンプはバッファの初めに入っているデータのものを利用 timestamp = bus.timestamp; } else { // 一度でも緯度経度が変化していたら移動中と判定 if (d_lat != bus.lat || d_lng != bus.lng) { moving = true; } } cnt++; } out.collect(new Bus(key, sum_lat / cnt, sum_lng / cnt, moving)); } } ) .addSink( new ElasticsearchSink<Bus>( config, tranportAddresses, new ElasticsearchSinkFunction<Bus>() { // Elasticsearchへのデータ追加時のリクエストを生成 public IndexRequest createIndexRequest(Bus bus) { return Requests .indexRequest() .index("bus") .type("bus") .id(bus.id) .source(bus.getHashMap()); } // データ更新時のリクエストを生成 public UpdateRequest createUpdateRequest(Bus bus) { return new UpdateRequest() .index("bus") .type("bus") .id(bus.id) .doc(bus.getHashMap()) .upsert(createIndexRequest(bus)); //upsertしておくと既存データが存在しない場合には自動的に追加がなされる } @Override public void process( Bus bus, RuntimeContext runtimeContext, RequestIndexer requestIndexer ) { requestIndexer.add(createUpdateRequest(bus)); } } ) ); // 定義した処理フローを実行 env.execute("BusView"); } catch (Exception e) { e.printStackTrace(); } } public static class Bus { public String id; public double lat; public double lng; public long timestamp; // タイムスタンプ(UNIX_TIME) public Bus(String id, double lat, double lng, long timestamp) { this.id = id; this.lat = lat; this.lng = lng; this.timestamp = timestamp; } public Bus(String id, double lat, double lng, long timestamp, boolean moving) { this(id, lat, lng, timestamp); this.moving = moving; } public HashMap getHashMap() { HashMap<String, Object> map = new HashMap(); map.put("id", id); map.put("location", new GeoPoint(lat, lng)); map.put("moving", moving); map.put("timestamp", timestamp); return map; } } }
ビルドは次のコマンドでOKです。必要なライブラリが含まれたfat-jarがtarget/busview-1.0-SNAPSHOT.jar
として生成されるはずです。
$mvn packages -DskipTests
Elasticseachのインデックスとマッピングの作成
開発したFlinkのjobを実行する前に、Elasticsearchのインデックスとマッピングを作成しておきます。インデックスとはRDBでいうテーブルで、マッピングはスキーマのようなものです。
まずインデックスを次のようにして作成します。今回のインデックス名はbus
にします。
$curl -XPUT "http://localhost:9200/bus"
下のレスポンスが得られればOKです。
{"acknowledged":true,"shards_acknowledged":true,"index":"bus"}
続いてマッピングも定義しておきましょう。次の内容を記載したmap.json
を用意します。今回、位置情報はgeo_pont
型として、またタイムスタンプはFlink側からの出力形式に合わせてformat:epoc_second
を指定したdate
型としています。このように位置やタイムスタンプのデータ型を明示しておくことで、Kibanaで可視化する際に型を考慮した表示がなされて便利です。
{ "bus": { "properties": { "id": { "type": "string" }, "moving": { "type": "boolean" }, "location": { "type": "geo_point" }, "timestamp": { "type": "date", "format": "epoc_second" } } } }
このJSONファイルを指定して下のようにコマンドを実行し、結果が返ってくればOKです。
$curl -H "Content-Type: application/json" -XPUT 'http://localhost:9200/bus/_mapping/bus' -d @map.json
結果
{"acknowledged":true}
デモの実行
それでは開発したデモを動かします。
中継サーバの起動
まず、データ中継サーバを起動します。
$java -jar target/relay-1.0-SNAPSHOT-jar-with-dependencies.jar
Flinkジョブの起動
次にFlinkのjobを投入します。[FLINK_HOME]
の部分はFlinkをインストールした場所に合わせて変えてください。
[FLINK_HOME]/bin/flink run busview/target/busview-1.0-SNAPSHOT.jar
ブラウザからhttp://サーバのIPアドレス:8081
を開き、Running Job List
に投入したジョブが表示されていればOKです。なお、ここに表示されているJob Name
は、ソースコード内からenv.execute("BusView");
等として指定した文字列となります。下の例ではBusView
となっています。
これで、Elasticsearchにバス情報データが定期的に書き込まれているはずです。
結果の可視化
それではいよいよ結果を可視化してみましょう。
ブラウザからhttp://サーバのIPアドレス:5601
にアクセスし、Kibanaを開きます。
初めにインデックスパターンを定義する必要があるので、Kibanaのダッシュボードの左メニューからManagement
を選択し、Index Petterns -> Configure an index pattern -> index pettern
にbus*
と入力し、Create
を押します。
インデックスパターンを生成すると、下のようにFlink側から投入されたデータを確認できるようになります。
それでは、このデータを地図上にプロットしてみましょう。ダッシュボードの左のメニューからVisualize -> Create a visualization -> Coodinate Map
を選択します。
そして、buckets -> Geo Coordinates -> Field
にlocation
を設定して反映すると、次のように車両の位置がプロットされます。
上部のクエリ入力バーにmoving:true
と入力すると、移動中と判定されたバスのみ表示することもできます。
ヒートマップ形式でも表示できます。
なお、Kibanaのデフォルトの地図だと拡大できるレベルが限定されており、バスの位置を詳細に確認できません。そこで、次をkibana.xml
に記入し、利用する地図をOpenStreetMapに切り替え、さらに最大ズームレベルを高めに指定し、kibanaを再起動します。
tilemap.url: "http://tile.openstreetmap.jp/{z}/{x}/{y}.png" tilemap.options.maxZoom: 18
これによってバスが走行する道路まで判別できるレベルまでズームできるようになります。
最後に、都内広域を走るバスの様子をアニメーションにしてみました。 よく見ると、渋谷区の六本木通り、豊島区の明治通り、新宿区の青梅街道といった幹線道路に沢山走っている様子がわかりますね。
- このアニメーションは、20分程度録画したもの5秒程度に縮めてタイムラプス再生しています。
- また、ウィンドウ処理は無効にし、車両位置をそのまま出力しています。
まとめ
今回はOSSのストリーム処理基盤であるApache Flinkを利用し、リアルタイム交通データを可視化するデモの開発チュートリアルを示しました。今回はあくまでチュートリアルとして、単にバスの位置をバッファリングして平均を出すという、あまり意味のない?処理の例を示しましたが、リアルタイムなデータとストリーム処理を掛け合わせて、これまでにない便利なサービスを実現できる可能性を感じられたのではないでしょうか?
もちろんここで示した内容は、とりあえ疎通するレベルの品質です。我々はFlinkをベースとしたストリーム処理基盤であるDracenaの研究開発を行っており、研究開発の現場では、実際のサービス運用で重要となる、スケーラビリティや耐障害性など、ここには書いていない様々なことを考慮しながら基盤の性能向上に取り組んでいます。