Please enable JavaScript in your browser.

fltech - 富士通研究所の技術ブログ

富士通研究所の研究員がさまざまなテーマで語る技術ブログ

Flink+Elasticsearch+Kibanaで作るリアルタイム交通情報ダッシュボード

f:id:fltech:20210113153010p:plain

こんにちは富士通研究所のymokです。

我々の部署ではストリームデータ処理基盤技術Dracenaを開発しています。 今回は、DracenaのベースとなっているFlinkを題材として、リアルタイム交通データをFlinkに流し込んで可視化する一連のシステム構築の様子を end-to-end でチュートリアルとして示したいと思います。

バス走行位置のリアルタイムデータ(GTFS-RT)

2020年8月に、東京都交通局による都営バスのリアルタイムロケーション情報の提供が始まりました(詳細はこちら)。今回はこのデータを対象として、チュートリアルデモを作成します。

このデータは、公共交通オープンデータ開発者サイトにアカウント登録することで取得できます。データの形式はGTFS-RTで、Protocol Buffersによって提供されます。当然ながら、このデータを利用したアプリケーションやサービスを公開する場合は、公共交通オープンデータセンターの定める利用規約・ガイドラインを遵守してください。

システムの構成

今回開発するデモシステムの全体構成を下に示します。Flinkにリアルタイムバスロケーションデータを投入し、ストリーム処理した結果をElasticsearchにストアしてKibanaで可視化します。スケーラビリティを考慮すると、Flinkへのデータ投入はKafkaといった分散メッセージング基盤を介して行うべきですが、今回は構成をシンプルに保つために、公共交通オープンデータセンターからデータを取得し、SocketによってFlinkに送信する簡易的な中継サーバを開発し、そこを介する構成にしています。

f:id:fltech:20210113134550p:plain:w600

なお、本チュートリアルでは、下のソフトウェアがすでにインストールされ、利用できる状態になっているものとします。また、全てサービスのは同じホスト上で動作しているものとします。

  • Apache Flink 1.10.x
  • Elasticsearch 5.6.x
  • Kibana 5.6.x

なお、ElasticsearchとKibanaのライセンスはそれまでのApache License 2.0から「Server Side Public License」(SSPL)と「Elastic License」のデュアルライセンスへと変更が予定されています。

開発手順

それではデモシステムを作るための手順を示していきます。主な開発言語はJavaです。また、ビルドツールといてmavenを用います。

データ中継サーバの開発

まず、公共交通オープンデータセンターからデータを定期的に取得し、Socketで接続してきたクライアントに送信する中継サーバを開発します。

次のコマンドで空のプロジェクトを作りましょう。gruopIDartifaciIdは適宜変えてください。

$mvn archetype:generate \
  -DarchetypeArtifactId=maven-archetype-quickstart \
  -DinteractiveMode=false \
  -DgroupId=dev.fltech \
  -DartifactId=relay

GTFS-RT形式のデータをProtocol Bufferで取得するライブラリを利用するため、生成されたpom.xmlの<dependencies>内に次を追記します。

<dependency>
    <groupId>io.mobilitydata.transit</groupId>
    <artifactId>gtfs-realtime-bindings</artifactId>
    <version>0.0.5</version>
</dependency>

このライブラリを用いて、コード内で

FeedMessage feed = FeedMessage.parseFrom(url.openStream());

とすれば FeedEntity のリストが含まれた FeedMessage が得られるので、次のようにして取り出せば各バスのIDと、位置情報(緯度経度)を取得できます。

for (FeedEntity entity : feed.getEntityList()) {
    String id = entity.getId();
    VehiclePosition vpos = entity.getVehicle();
    Position pos = vpos.getPosition();
    float lat = pos.getLatitude();
    float lon = pos.getLongitude();
}

私が日中に試した時は600件前後のデータを取得できました。この件数は時間帯によって異なり、深夜だと十数件にまで減るようです。

データの取得はこれでできたので、あとはSocketサーバとして待ち受け、接続してきたクライアントに対して、定期的に取得したデータを送信する処理を実装しましょう。今回はmavenによって生成されたApp.javaファイルに直接コードを書いていきます。全てのソースを次に示します。

データ中継サーバの全コードを表示

package dev.fltech;

import com.google.transit.realtime.GtfsRealtime.FeedEntity;
import com.google.transit.realtime.GtfsRealtime.FeedMessage;
import com.google.transit.realtime.GtfsRealtime.Position;
import com.google.transit.realtime.GtfsRealtime.VehiclePosition;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URL;

public class App {

  public static void main(String[] args) throws Exception {
    ServerSocket serverSocket = new ServerSocket();
    serverSocket.bind(new InetSocketAddress("127.0.0.1", 1234));
    Socket socket = serverSocket.accept();

    PrintWriter writer = new PrintWriter(socket.getOutputStream(), true);

    URL url = new URL(
      "https://api.odpt.org/api/v4/gtfs/realtime/ToeiBus?acl:consumerKey=[アクセストークン]"
    );

    while (true) {
      //交通オープンデータを取得
      FeedMessage feed = FeedMessage.parseFrom(url.openStream());

      for (FeedEntity entity : feed.getEntityList()) {
        //取得したデータからIDと緯度、経度を取得しカンマ区切りにしてSocket送信
        String id = entity.getId();
        VehiclePosition vpos = entity.getVehicle();
        Position pos = vpos.getPosition();
        float lat = pos.getLatitude();
        float lon = pos.getLongitude();
        long timestamp = pos.getTimestamp();
        writer.println(
          id + "," + String.valueOf(lat) + "," + String.valueOf(lon) + "," + String.valueOf(timestamp)
        );
      }
      //30秒待機
      Thread.sleep(30000);
    }
  }
}

[アクセストークン]の部分には公共交通オープンデータ開発者サイトにて発行されるものを指定します。Socket接続は1234番ポートで待ち受けています。公共交通オープンデータセンターへのデータ取得周期は、odpt:frequencyに合わせて30秒としています。Socketを介してFlinkに送信されるデータの内容は次のようになります。

E451,35.686702,139.826675,1610510580\n
E132,35.666728,139.822672,1610510582\n
T226,35.681041,139.764694,1610510581\n
  .
  .

コードを書いたらビルドします。Jar形式で実行できるように、pom.xmlの中に次を追記しておきます。

POMへの追記内容を全て表示

<build>
  <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.0.0</version>
        <configuration>
           <descriptorRefs>
              <descriptorRef>jar-with-dependencies</descriptorRef>
           </descriptorRefs>
           <archive>
              <manifest>
                <mainClass>dev.fltech.App</mainClass>
              </manifest>
           </archive>
        </configuration>
        <executions>
           <execution>
              <id>make-assembly</id>
              <phase>package</phase>
              <goals>
                <goal>single</goal>
              </goals>
           </execution>
        </executions>
      </plugin>
   </plugins>
</build>

これで次のコマンドを実行すれば、Jarとして実行可能なデータ中継サーバが、プロジェクトフォルダ内にtarget/relay-1.0-SNAPSHOT-jar-with-dependencies.jarとして出来上がります。

$mvn packages -DskipTests

FlinkのJobの開発

次に、中継サーバからデータを取得し、それを車両ごとにウィンドウ処理して、その結果をElasticsearchに出力するFlinkのJobを開発します。

次のようにしてFlinkのJobの雛形プロジェクトを作りましょう。groupIdartifactIdは適宜変えてください。

$ mvn archetype:generate \
  -DarchetypeGroupId=org.apache.flink \
  -DarchetypeArtifactId=flink-quickstart-java \
  -DarchetypeVersion=1.11.0 \
  -DinteractiveMode=false \
  -DgroupId=dev.fltech \
  -DartifactId=busview 

生成されたプロジェクト内のpom.xmlの <dependencies> 内に、データ出力のために用いるElasticsearchコネクタの定義を次の通りに追記します。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch5_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

これで準備は整ったので、ストリーム処理を実装していきましょう。コードmavenによって生成されたStreamingJob.javaに書いていきます。

今回は、Socketで受信した文字列をコンマ[,]で分解し、車両IDごとに振り分けた上で一定時間ごとにデータをバッファリングし、そのデータ群に対して特定の処理を行った結果をElasticsearchに出力するという流れを作ります。

Flinkのストリーム処理は、ストリームとしてsourceに入力されたデータに対して連鎖的に演算を適用し、その結果をsinkとして出力する形で作っていきます。今回はその流れを source -> flatMap -> keyBy -> window -> apply -> sink というフローで定義します。

f:id:fltech:20210113135119p:plain:w600

sourceの定義

まずはSocketで1234番ポートで待ち受けている中継サーバをsourceとして設定します。StreamExecutionEnvironment#socketTextStream を用いれば、次のように記述するだけで中継サーバに対してSocketで自動的に接続してデータを受信し、DataStream<String>型のデータストリームを生成してくれます。

DataStream<String> busStream = env.socketTextStream("127.0.0.1", 1234, "\n");

flatMapの定義

flatMapはストリーム内のデータに対して一律に共通の処理を施してくれるオペレータです。今回は DataStream<String>型のデータを[,]で分解し、車両IDと緯度、経度、タイムスタンプを保持するBusオブジェクトに変換する処理を実装します。

busStream.flatMap(new FlatMapFunction<String,Bus>(){
    @Override
    public void flatMap(String value, Collector<Bus> out) {
        String[] t = value.split(",");
        out.collect(new Bus( t[0], Float.parseFloat(t[1]), Float.parseFloat(t[2]), Float.parseLong(t[3]) ) );
    }
})

flatMapメソッドの第1引数であるvalueSourceから入ってくるテキストデータであり、第2引数であるoutのcollectメソッドを実行することで、処理結果を後段に渡します。今回はconnectにBusオブジェクトとしてデータを指定しています。

Busオブジェクトは独自に定義したクラスであり、次の通りです。保持する内容はバスのIDと位置、タイムスタンプ、移動中かどうかのフラグという単純なものです。Elasticsearchコネクタに渡すために保持内容をHashMapで返すメソッドも実装しておきます。なお、HashMapに含める緯度経度はElasticsearchとKibanaで扱うために都合が良いため、Elasticsearchの提供するorg.elasticsearch.common.geo.GeoPoint型にしています。

public static class Bus {

  public String id; // バスのID
  public double lat; // バスの現在位置の緯度
  public double lng; // バス現在位置の経度
  public boolean moving; // 移動中かどうか
  public long timestamp; // タイムスタンプ(UNIX_TIME)

  public Bus(String id, double lat, double lng, long timestamp) {
    this.id = id;
    this.lat = lat;
    this.lng = lng;
    this.timestamp = timestamp;
  }

  public Bus(String id, double lat, double lng, long timestamp, boolean moving) {
    this(id, lat, lng, timestamp);
    this.moving = moving;
  }

  public HashMap getHashMap() {
    HashMap<String, Object> map = new HashMap();
    map.put("id", id);
    map.put("location", new GeoPoint(lat, lng));
    map.put("moving", moving);
    return map;
  }
}

keyByの定義

keyByはストリーム内のデータを、指定したキーごとに仕分けするオペレータです。引数には前段のflatMapで出力したBus型のデータが入ってくるので、ここでバスのIDをキーとして指定し、バスごとにストリームを仕分けします。keyByを作用させるとストリームの型はDataStream型からKeyedStream型に変わります。

.keyBy( value-> value.id )

windowの定義

windowは、データを一定期間ごとにバッファリングするオペレータです。今回は単純に時間ベースの重なりの無いTumbling Windowを作用させます。ウィンドウ幅は3分とします。windowを作用させるとストリームの型はKeyedStream型からWindowedStream型に変わります。

.timeWindow(Time.seconds(180))

window applyの定義

window applyは、バッファリングしたデータに対して一括して特定の処理を行うオペレータです。applyメソッドの引数にはWindowFunctionを指定し、その中のapplyメソッドの中に処理を実装します。処理結果は第4引数のoutのcollectメソッドで後段に渡します。

.apply(
    new WindowFunction<Bus, Bus, String, TimeWindow>() {
    @Override
    public void apply(
        String key,
        TimeWindow window,
        Iterable<Bus> input,
        Collector<Bus> out
    ) {
        double sum_lng = 0f, sum_lat = 0f, d_lng = 0f, d_lat = 0f;
        long timestamp = 0l;
        int cnt = 0;
        boolean moving = false;
        for (Bus bus : input) {
        sum_lat += bus.lat;
        sum_lng += bus.lng;
        if (cnt == 0) {
            d_lat = bus.lat;
            d_lng = bus.lng;
            // とりあえずタイムスタンプはバッファの初めに入っているデータのものを利用
            timestamp = bus.timestamp;
        } else {
            if (d_lat != bus.lat || d_lng != bus.lng) {
              moving = true;
            }
        }
        cnt++;
        }

        out.collect(new Bus(key, sum_lat / cnt, sum_lng / cnt, timestamp, moving));
    }
    }
)

今回は単純にバッファしたデータに対して、緯度、経度の平均値を求めるとともに、その間に緯度と経度が変化したかどうかで移動中フラグを立てるようにしました。

もちろん、バッファする時間を短くしたり、処理を行うタイミングを変えて平均値の計算をもっと効率的に行ったり、あるいはもっと複雑な処理を実装することもできます。例えば移動の軌跡を分析し、バスが「直進しているか」、「右左折したか」といった走行状況を時々刻々と判定することもできるかもしれません。

sinkの定義

sinkはストリーム処理の結果をFlinkの外へ出力するためのものです。今回はElasticsearchにデータを出力するので、ElasticsearchSinkを用います。この実装は少し長くなりますが、インデックスにデータを追加/更新するためのIndexRequest/UpdateRequestを返すメソッドを、前段からデータが流れてくる度に呼び出されるprocessメソッドの中から呼び出しています。

今回はElasticsearch側のインデックス名、タイプ名ともにbusにするのでそれに合わせます。また、Elasticsearchに書き込むデータはBus#getHashMapから得られる、バスのID、緯度経度、タイムスタンプ、移動状況フラグになります。

.addSink(
    new ElasticsearchSink<Bus>(
    config,
    tranportAddresses,
    new ElasticsearchSinkFunction<Bus>() {
        // データ追加時のリクエストを生成
        public IndexRequest createIndexRequest(Bus bus) {
        return Requests
            .indexRequest()
            .index("bus")
            .type("bus")
            .id(bus.id)
            .source(bus.getHashMap());
        }

        // データ更新時のリクエストを生成
        public UpdateRequest createUpdateRequest(Bus bus) {
        return new UpdateRequest()
            .index("bus")
            .type("bus")
            .id(bus.id)
            .doc(bus.getHashMap())
            .upsert(createIndexRequest(bus));
            //upsertにより既存データが存在しない場合には自動的に追加がなされる
        }

        @Override
        public void process(
        Bus bus,
        RuntimeContext runtimeContext,
        RequestIndexer requestIndexer
        ) {
        requestIndexer.add(createUpdateRequest(bus));
        }
    }
    )
)

これで一連のストリーム処理を実装できました。これまで説明したオペレータを含めた全コードを示しておきます。

Flinkジョブの全コードを表示

package dev.fltech;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
import org.apache.flink.util.Collector;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.geo.GeoPoint;

public class StreamingJob {

  public static void main(String[] args) {
    try {
      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

      Map<String, String> config = new HashMap<>();
      config.put("cluster.name", "elasticsearch");
      config.put("bulk.flush.max.actions", "1");

      List<InetSocketAddress> tranportAddresses = new ArrayList<>();
      tranportAddresses.add(
        new InetSocketAddress(InetAddress.getByName("localhost"), 9300)
      );

      // Sinkにデータ中継サーバを設定
      DataStream<String> busStream = env.socketTextStream(
        "127.0.0.1",
        1234,
        "\n"
      );

      busStream
        .flatMap( // データを解釈してBusオブジェクトに変換
          new FlatMapFunction<String, Bus>() {
            @Override
            public void flatMap(String value, Collector<Bus> out) {
              System.out.println(value);
              String[] t = value.split(",");
              out.collect(
                new Bus(t[0], Float.parseFloat(t[1]), Float.parseFloat(t[2]))
              );
            }
          }
        )
        .keyBy(value -> value.id) // バスのIDごとに仕分け
        .timeWindow(Time.seconds(60)) // 60秒間蓄積
        .apply( // 平均値算出と移動判定
          new WindowFunction<Bus, Bus, String, TimeWindow>() {
            @Override
            public void apply(
              String key,
              TimeWindow window,
              Iterable<Bus> input,
              Collector<Bus> out
            ) {
              double sum_lng = 0f, sum_lat = 0f, d_lng = 0f, d_lat = 0f;
              int cnt = 0;
              boolean moving = false;
              for (Bus bus : input) {
                sum_lat += bus.lat;
                sum_lng += bus.lng;
                if (cnt == 0) {
                  d_lat = bus.lat;
                  d_lng = bus.lng;
                  // とりあえずタイムスタンプはバッファの初めに入っているデータのものを利用
                  timestamp = bus.timestamp;
                } else {
                  // 一度でも緯度経度が変化していたら移動中と判定
                  if (d_lat != bus.lat || d_lng != bus.lng) {
                    moving = true;
                  }                }
                cnt++;
              }

              out.collect(new Bus(key, sum_lat / cnt, sum_lng / cnt, moving));
            }
          }
        )
        .addSink(
          new ElasticsearchSink<Bus>(
            config,
            tranportAddresses,
            new ElasticsearchSinkFunction<Bus>() {
              // Elasticsearchへのデータ追加時のリクエストを生成
              public IndexRequest createIndexRequest(Bus bus) {
                return Requests
                  .indexRequest()
                  .index("bus")
                  .type("bus")
                  .id(bus.id)
                  .source(bus.getHashMap());
              }

              // データ更新時のリクエストを生成
              public UpdateRequest createUpdateRequest(Bus bus) {
                return new UpdateRequest()
                  .index("bus")
                  .type("bus")
                  .id(bus.id)
                  .doc(bus.getHashMap())
                  .upsert(createIndexRequest(bus));
                  //upsertしておくと既存データが存在しない場合には自動的に追加がなされる
              }

              @Override
              public void process(
                Bus bus,
                RuntimeContext runtimeContext,
                RequestIndexer requestIndexer
              ) {
                requestIndexer.add(createUpdateRequest(bus));
              }
            }
          )
        );

      // 定義した処理フローを実行
      env.execute("BusView");

    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  public static class Bus {

    public String id;
    public double lat;
    public double lng;
    public long timestamp; // タイムスタンプ(UNIX_TIME)

    public Bus(String id, double lat, double lng, long timestamp) {
        this.id = id;
        this.lat = lat;
        this.lng = lng;
        this.timestamp = timestamp;
    }

    public Bus(String id, double lat, double lng, long timestamp, boolean moving) {
        this(id, lat, lng, timestamp);
        this.moving = moving;
    }

    public HashMap getHashMap() {
        HashMap<String, Object> map = new HashMap();
        map.put("id", id);
        map.put("location", new GeoPoint(lat, lng));
        map.put("moving", moving);
        map.put("timestamp", timestamp);
        return map;
    }
  }
}

ビルドは次のコマンドでOKです。必要なライブラリが含まれたfat-jarがtarget/busview-1.0-SNAPSHOT.jarとして生成されるはずです。

$mvn packages -DskipTests

Elasticseachのインデックスとマッピングの作成

開発したFlinkのjobを実行する前に、Elasticsearchのインデックスとマッピングを作成しておきます。インデックスとはRDBでいうテーブルで、マッピングはスキーマのようなものです。

まずインデックスを次のようにして作成します。今回のインデックス名はbusにします。

$curl -XPUT "http://localhost:9200/bus"

下のレスポンスが得られればOKです。

{"acknowledged":true,"shards_acknowledged":true,"index":"bus"}

続いてマッピングも定義しておきましょう。次の内容を記載したmap.jsonを用意します。今回、位置情報はgeo_pont型として、またタイムスタンプはFlink側からの出力形式に合わせてformat:epoc_secondを指定したdate型としています。このように位置やタイムスタンプのデータ型を明示しておくことで、Kibanaで可視化する際に型を考慮した表示がなされて便利です。

{
  "bus": {
    "properties": {
      "id": { "type": "string" },
      "moving": { "type": "boolean" },
      "location": { "type": "geo_point" },
      "timestamp": {
          "type": "date",
          "format": "epoc_second"
      }
    }
  }
}

このJSONファイルを指定して下のようにコマンドを実行し、結果が返ってくればOKです。

$curl -H "Content-Type: application/json" -XPUT 'http://localhost:9200/bus/_mapping/bus' -d @map.json

結果

{"acknowledged":true}

デモの実行

それでは開発したデモを動かします。

中継サーバの起動

まず、データ中継サーバを起動します。

$java -jar target/relay-1.0-SNAPSHOT-jar-with-dependencies.jar 

Flinkジョブの起動

次にFlinkのjobを投入します。[FLINK_HOME]の部分はFlinkをインストールした場所に合わせて変えてください。

[FLINK_HOME]/bin/flink run busview/target/busview-1.0-SNAPSHOT.jar

ブラウザからhttp://サーバのIPアドレス:8081を開き、Running Job Listに投入したジョブが表示されていればOKです。なお、ここに表示されているJob Nameは、ソースコード内からenv.execute("BusView");等として指定した文字列となります。下の例ではBusViewとなっています。

f:id:fltech:20210113135311p:plain:w700

これで、Elasticsearchにバス情報データが定期的に書き込まれているはずです。

結果の可視化

それではいよいよ結果を可視化してみましょう。 ブラウザからhttp://サーバのIPアドレス:5601にアクセスし、Kibanaを開きます。

初めにインデックスパターンを定義する必要があるので、Kibanaのダッシュボードの左メニューからManagementを選択し、Index Petterns -> Configure an index pattern -> index petternbus*と入力し、Createを押します。

インデックスパターンを生成すると、下のようにFlink側から投入されたデータを確認できるようになります。

f:id:fltech:20210113135514p:plain:w700

それでは、このデータを地図上にプロットしてみましょう。ダッシュボードの左のメニューからVisualize -> Create a visualization -> Coodinate Mapを選択します。

そして、buckets -> Geo Coordinates -> Fieldlocationを設定して反映すると、次のように車両の位置がプロットされます。

f:id:fltech:20210113135658p:plain:w700

上部のクエリ入力バーにmoving:trueと入力すると、移動中と判定されたバスのみ表示することもできます。

ヒートマップ形式でも表示できます。

f:id:fltech:20210113135846p:plain:w700

なお、Kibanaのデフォルトの地図だと拡大できるレベルが限定されており、バスの位置を詳細に確認できません。そこで、次をkibana.xmlに記入し、利用する地図をOpenStreetMapに切り替え、さらに最大ズームレベルを高めに指定し、kibanaを再起動します。

tilemap.url: "http://tile.openstreetmap.jp/{z}/{x}/{y}.png"
tilemap.options.maxZoom: 18

これによってバスが走行する道路まで判別できるレベルまでズームできるようになります。 f:id:fltech:20210113141439p:plain:w700

最後に、都内広域を走るバスの様子をアニメーションにしてみました。 よく見ると、渋谷区の六本木通り、豊島区の明治通り、新宿区の青梅街道といった幹線道路に沢山走っている様子がわかりますね。 f:id:fltech:20210120160601g:plain:w700

  • このアニメーションは、20分程度録画したもの5秒程度に縮めてタイムラプス再生しています。
  • また、ウィンドウ処理は無効にし、車両位置をそのまま出力しています。

まとめ

今回はOSSのストリーム処理基盤であるApache Flinkを利用し、リアルタイム交通データを可視化するデモの開発チュートリアルを示しました。今回はあくまでチュートリアルとして、単にバスの位置をバッファリングして平均を出すという、あまり意味のない?処理の例を示しましたが、リアルタイムなデータとストリーム処理を掛け合わせて、これまでにない便利なサービスを実現できる可能性を感じられたのではないでしょうか?

もちろんここで示した内容は、とりあえ疎通するレベルの品質です。我々はFlinkをベースとしたストリーム処理基盤であるDracenaの研究開発を行っており、研究開発の現場では、実際のサービス運用で重要となる、スケーラビリティや耐障害性など、ここには書いていない様々なことを考慮しながら基盤の性能向上に取り組んでいます。