おくみん公式ブログ

おくみん公式ブログ

Google Cloud Dataflow で Google BigQuery へストリーミング ETL するの巻

f:id:okumin:20170818025111p:plain

okumin.com のアクセスログを Google BigQuery で分析するために、ETL パイプラインを構築しました。
Apache Beam(Scio) + Google Cloud Dataflow を用いてログの加工及び BigQuery へのストリーミングインサートを行うという構成です。
この記事ではその全体像と個々のコンポーネントの簡単な説明を行います。

アーキテクチャ

f:id:okumin:20170819185401p:plain

次の流れで ETL を行っています。

  1. Google Container Engine(GKE) 上に配置した Nginx コンテナのログを Stackdriver Logging へ集約
  2. Stackdriver Logging のログを Google Cloud Pub/Sub のトピック「okumin-com-access-log-raw-v1」へエクスポート
  3. Google Cloud Dataflow で「okumin-com-access-log-raw-v1」から取り出した Stackdriver Logging エントリーを変換し、Cloud Pub/Sub のトピック「okumin-com-access-log-refined-v1」へ書き込み
  4. Cloud Dataflow で「okumin-com-access-log-refined-v1」から取得した変換データを Google BigQuery へインサート

これに加え、記事トップの絵が示しているように監視 & Slack への通知を行っているコンポーネントも存在します。
全部説明すると長くなるので監視部分は『Google Cloud Dataflow でアクセスログを監視して Google Cloud Functions で Slack へアラートを送信するの巻』で解説します。

下ごしらえ

先にいくつかのリソースを作っておきました。
全部 Terraform で管理しています。

  • Cloud Pub/Sub
    • トピック「okumin-com-access-log-raw-v1」を作成
      • 加工前の生データを格納するトピック
    • サブスクリプション「okumin-com-access-log-raw-v1-etl」を作成
      • 変換処理が「okumin-com-access-log-raw-v1」のデータを読み込むためのサブスクリプション
    • トピック「okumin-com-access-log-refined-v1」を作成
      • 加工後のきれいなデータを格納するトピック
    • サブスクリプション「okumin-com-access-log-refined-v1-etl」を作成
      • BigQuery へのロード処理が「okumin-com-access-log-refined-v1」のデータを読み込むためのサブスクリプション
  • BigQuery
    • データセット「okumin_com」とテーブル「okumin_com_access_log_v1」を作成
  • Cloud Storage
    • Cloud Dataflow のテンプレートを設定するためのバケット「okumin-dataflow-prod」を作成

手で作成する場合は以下を参考にしてください。

「okumin_com_access_log_v1」テーブルは日付によるパーティショニングを有効にしています。

ETL の流れ

Nginx ログの収集

okumin.com が GCP 上で不死鳥のように蘇りました #GCP - おくみん公式ブログ』で紹介した通り、okumin.com は GKE 上に構築されています。
Kubernetes on GKE にはコンテナの標準出力を Stackdriver Logging へ送信する機能があるので、ログを集めるのは簡単です。
okumin.com の実体は Nginx なので、次のような設定を書いてやるだけです。

http {
    ……
    log_format ltsv "time:$time_iso8601"
                    "\thost:$remote_addr"
                    "\tforwardedfor:$http_x_forwarded_for"
                    "\tvhost:$host"
                    "\tmethod:$request_method"
                    "\turi:$request_uri"
                    "\tprotocol:$server_protocol"
                    "\tforwarded_proto:$http_x_forwarded_proto"
                    "\treqsize:$request_length"
                    "\tua:$http_user_agent"
                    "\treferer:$http_referer"
                    "\tstatus:$status"
                    "\tsize:$body_bytes_sent"
                    "\treqtime:$request_time"
                    "\truntime:$upstream_http_x_runtime"
                    "\tapptime:$upstream_response_time"
                    "\tvia:$http_via"
                    "\tcache:$upstream_http_x_cache"
                    "\tcloudtracecontext:$http_x_cloud_trace_context";

    access_log /dev/stdout ltsv;
 ……
}

Stackdriver Logging エントリーのエクスポート

Stackdriver Logging が受信したデータは Google Cloud Storage、Google BigQuery、Google Cloud Pub/Sub へエクスポートすることができます。
エクスポートされるデータは LogEntry 形式です。
BigQuery に格納したいのはアクセスログ部分のみであるため、一旦 Cloud Pub/Sub のトピック「okumin-com-access-log-raw-v1」へエクスポートし、後段の処理できれいなスキーマに加工してから BigQuery へ保存することにしました。

LogEntry からの変換

変換及び BigQuery への書き込みは Google Cloud Dataflow で行いました。
Cloud Dataflow は、Apache Beam で記述したストリーム処理やバッチ処理を実行するためのマネージドサービスです。

Apache Beam は Java ライブラリですが、Spotify が公開している Scio を使うと Scala で Spark っぽく記述することができます。

次のコードは Scio を使って書いた、アクセスログを加工するための処理です。
BigQuery のテーブルと同じスキーマを持つ AccessLog クラスに変換し、Cloud Pub/Sub へ保存します。
再度 Cloud Pub/Sub へ保存しているのは、加工後の AccessLog を『Google Cloud Dataflow でアクセスログを監視して Google Cloud Functions で Slack へアラートを送信するの巻』で使いたかったからです。

sc
  // Cloud Pub/Sub に保存されている LogEntry(Json) を取得
  .customInput("transform-input", PubsubIO.readStrings().fromSubscription(in))
  // LogEntry から Nginx ログ(LTSV)を取り出す
  .flatMap { entry =>
    try {
      val json = Json.parse(entry).as[JsObject]
      val line = json.value("textPayload").as[String]
      Some(line)
    } catch {
      case NonFatal(e) =>
        logger.error(s"Failed parsing LogEntry. $entry", e)
        None
    }
  }
  // LTSV 文字列を自作 AccessLog クラスへ変換
  .flatMap(AccessLogs.fromNginxLog)
  // AccessLog クラスを元に Cloud Pub/Sub のメッセージを作成
  .map { log =>
    val uniqueId = log.cloud_trace_context.getOrElse(UUID.randomUUID().toString)
    val attributes = Collections.singletonMap(UniqueIdKey, uniqueId)
    val json = AccessLogs.toJson(log).getBytes(StandardCharsets.UTF_8)
    new PubsubMessage(json, attributes)
  }
  // Cloud Pub/Sub へメッセージを書き込み
  .saveAsCustomOutput(
    "transform-output",
    PubsubIO.writeMessages().withIdAttribute(UniqueIdKey).to(out)
  )

BigQuery へのロード

変換データは最終的に BigQuery へ保存します。
BigQuery のストリーミングインサートが低遅延な分析を可能にします。

BigQuery へのデータのストリーミング

sc
  // Cloud Pub/Sub に保存されている Json を取得
  .customInput(
    "load-input",
    PubsubIO.readStrings().fromSubscription(in).withIdAttribute(UniqueIdKey)
  )
  // Json を AccessLog クラスへデシリアライズ
  .flatMap(AccessLogs.fromJson)
  // BigQuery へ保存
  .saveAsCustomOutput("load-output", BigQueryIO
    .write()
    // AccessLog の time フィールドに基づいたパーティションへ書き込む
    .to(AccessLogs.dayPartitioner(out))
    .withFormatFunction(AccessLogs.formatter)
    .withSchema(AccessLog.schema)
    .withCreateDisposition(CreateDisposition.CREATE_NEVER)
    .withWriteDisposition(WriteDisposition.WRITE_APPEND))

最後に ETL アプリケーションのデプロイ方法を紹介しておきます。
小規模なので、1つのアプリケーションで変換パイプラインと BigQuery への書き込みパイプライン両方を実行しました。

Cloud Dataflow ではテンプレートの作成と、テンプレートの実行という2つのフェーズを経てジョブを実行するのが丁寧なやり方のようです。

Google Cloud Dataflow テンプレート

こんな感じの main メソッドを持ったシングルトンオブジェクトを作成し、

okumin-com-etl/AccessLogEtl.scala at 227ee20a602240def5960af91e87f8593dafe429 · okumin/okumin-com-etl · GitHub

こんな感じのコマンドを実行するとテンプレートの作成と実行が行われます。

TEMP_LOCATION="gs://okumin-dataflow-prod/okumin_com/access_log/v1/etl/temp"
TEMPLATE_LOCATION="gs://okumin-dataflow-prod/okumin_com/access_log/v1/etl/template"
TRANSFORM_INPUT="projects/okumin-prod/subscriptions/okumin-com-access-log-raw-v1-etl"
TRANSFORM_OUTPUT="projects/okumin-prod/subscriptions/okumin-com-access-log-refined-v1-etl"
LOAD_INPUT="projects/okumin-prod/topics/okumin-com-access-log-refined-v1"
LOAD_OUTPUT="okumin-prod:okumin_com.okumin_com_access_log_v1"

# テンプレート作成
# transformInput, loadInput は実行時に渡したかったが、
# Pub/Sub のサブスクリプションを RuntimeValueProvider で渡すとなぜか壊れたのでこのタイミングで指定
sbt "runMain com.okumin.etl.AccessLogEtl --runner=DataflowRunner --project=okumin-prod --zone=asia-northeast1-a --network=primary --subnetwork=regions/asia-northeast1/subnetworks/asia-northeast1-default --diskSizeGb=16 --workerMachineType=n1-standard-1 --maxNumWorkers=1 --streaming=true --tempLocation=${TEMP_LOCATION} --templateLocation=${TEMPLATE_LOCATION} --transformInput=${TRANSFORM_INPUT} --loadInput=${LOAD_INPUT}"

# 上記テンプレートを実行
gcloud beta dataflow jobs run \
  okumin-com-access-log-etl-v1 \
  --gcs-location ${TEMPLATE_LOCATION} \
  --parameters transformOutput=${TRANSFORM_OUTPUT},loadOutput=${LOAD_OUTPUT}

f:id:okumin:20170818015953p:plain

まとめ

以上、Stackdriver Logging、Cloud Pub/Sub、Cloud Dataflow、BigQuery を用いて低遅延なストリーミング ETL を実現しました。
Apache Beam や Cloud Dataflow についての情報の少なさに若干苦しみましたが、概ねスムースに ETL 基盤を構築することができました。
Scio も使いやすく、Spark 民にはオススメです。

今回変換したアクセスログを取得して監視を行うシステムも作ってみたりしたので、関心があれば合わせてご覧ください。

Google Cloud Dataflow でアクセスログを監視して Google Cloud Functions で Slack へアラートを送信するの巻 - おくみん公式ブログ

誤算

okumin.com は大したトラフィックがないので共有コアの安いインスタンスを使おうと思っていたのですが、Cloud Dataflow の設定「workerMachineType」に「f1-micro」や「g1-small」を指定すると怒られました……
Cloud Dataflow を使わずに Direct Runner などでしのげないか調査中です。

関連リンク