おくみん公式ブログ

おくみん公式ブログ

Hive on Tezのメトリクスを任意のデータ基盤に蓄積する方法

HistoryLoggingServiceのAPIを用いてHive on Tezのメトリクスをデータプラットフォームに保存するという、ややマニアックなテクニックを紹介します。本記事は「Distributed computing (Apache Spark, Hadoop, Kafka, ...) Advent Calendar 2022」14日目の記事として執筆しました。

qiita.com

Hive on Tezのメトリクス

まずはHive on Tezにどんなメトリクスが存在するか紹介しておきます。Apache Tezはなかなか見やすいWeb UIを持っていて、様々な情報へ手軽にアクセスすることができます。例えばDAG全体の開始・終了時刻、実行時間といった情報や

DAG Details

Vertex・Task・Task Attemptレベルの情報、

All Vertices

DAG・Vertex・Task・Task Attemptレベルのカウンター、

DAG Counters

DAGのノードを描画したもの、

Graphical View

どのVertexが支配的であったか、などを表示することができます。

Vertex Swimlane

メトリクスを保存する仕組み

Tez UI Architecture

こんなにも便利なTez UIは実はただのステートレスなWebアプリケーションで、メトリクスそのものはYARN Timeline Serverに保存されています。

TezにはHistoryLoggingServiceという各種イベント、例えばDAGの開始やTaskの終了等を受け取るためのインターフェースが定義されています。多くの本番環境ではATSHistoryLoggingServiceを用いるように設定されており、TezのApplication Masterがそのクラスを用いてそれぞれのイベントをTimeline Serverへ送信している、というわけです。

メトリクスをTimeline Serverとデータ基盤にマルチポストする

Tez UIは特にクエリ単位の情報を参照するにはこの上なく便利です。一方表示できる情報が限られており、クエリを横断した検索能力も高くないため、かゆいところ全てに手が届く類のシステムではありません。SQLやSparkを用いた柔軟な分析を行うためにはDWHのような分析基盤にメトリクスをコピーするのが一番手っ取り早いでしょう。

ただTimeline Serviceに格納されたデータを上手に取りこぼしなく取り込むのは結構難しい。そこでHistoryLoggingServiceを自作することで、Tezから直接容易にメトリクスを転送できるようになるというのがこの記事で伝えたい内容です。

HistoryLoggingServiceはプラッガブルで、設定値 tez.history.logging.service.class で任意の実装を選択することができます。したがってATSHistoryLoggingServiceと互換を保ったままHistoryLoggingServiceを拡張してやれば、Tez元来の機能を維持しつつ他の場所へもメトリクスを転送することができます。

以下に一番簡単な例として、ATSHistoryLoggingServiceを継承し、イベントをTimeline Serviceに送りつつ標準出力にも書き出すような実装を載せています。実際はFluentdやKafkaに転送する処理を実装することになるでしょう。その場合 serviceInit メソッドや serviceStop メソッドを実装してクライアントライブラリの設定やバッファのフラッシュを行う必要があります。HistoryLoggingServiceのAPIを実装する際は必ずATSHistoryLoggingService側の実装を呼び出す必要があることに注意してください。

package org.apache.tez.dag.history.logging;

import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService;

public class CustomHistoryLoggingService extends ATSHistoryLoggingService {
  @Override
  public void handle(DAGHistoryEvent event) {
    // Process the event through ATSHistoryLoggingService
    super.handle(event);

    // Process the event through the custom logic
    sendToStdout(event.getHistoryEvent());
  }

  private void sendToStdout(HistoryEvent event) {
    switch (event.getEventType()) {
      case TASK_ATTEMPT_FINISHED:
        sendTaskAttemptFinishedToStdout((TaskAttemptFinishedEvent) event);
        break;
      // You can catch many types of events
      default:
        break;
    }
  }

  private void sendTaskAttemptFinishedToStdout(TaskAttemptFinishedEvent event) {
    System.out.printf(
        "TaskAttemptFinishedEvent(id=%s, state=%s, start=%d, finish=%d)%n",
        event.getTaskAttemptID(), event.getState(), event.getStartTime(), event.getFinishTime()
    );
  }
}

tez.history.logging.service.class にカスタマイズしたCustomHistoryLoggingServiceのFQCNを指定します。

<property>
  <name>tez.history.logging.service.class</name>
  <value>org.apache.tez.dag.history.logging.CustomHistoryLoggingService</value>
</property>

クエリを実行しTez Application Masgerのログを見てみると実装した内容が反映されていることがわかります。

zookage@client-node-0:~$ yarn logs -containerId container_1671011282351_0002_01_000001 -logFiles stdout
2022-12-14 10:16:22,726 INFO client.RMProxy: Connecting to ResourceManager at yarn-resourcemanager-0.yarn-resourcemanager.zookage.svc.cluster.local./10.1.21.175:8032
2022-12-14 10:16:22,890 INFO client.AHSProxy: Connecting to Application History server at yarn-timelineserver-0.yarn-timelineserver.zookage.svc.cluster.local./10.1.21.182:10200
Container: container_1671011282351_0002_01_000001 on yarn-nodemanager-2.yarn-nodemanager.zookage.svc.cluster.local_8041
LogAggregationType: AGGREGATED
=======================================================================================================================
LogType:stdout
LogLastModifiedTime:Wed Dec 14 10:11:22 +0000 2022
LogLength:378
LogContents:
2022-12-14 10:05:45 Running Dag: dag_1671011282351_0002_1
TaskAttemptFinishedEvent(id=attempt_1671011282351_0002_1_00_000000_0, state=SUCCEEDED, start=1671012351141, finish=1671012352811)
TaskAttemptFinishedEvent(id=attempt_1671011282351_0002_1_01_000000_0, state=SUCCEEDED, start=1671012352904, finish=1671012353057)
2022-12-14 10:05:53 Completed Dag: dag_1671011282351_0002_1

End of LogType:stdout
***********************************************************************

終わりに

HistoryLoggingServiceをカスタマイズするとTezのメトリクスを簡単に任意のビッグデータ基盤に格納できるよ、というお話でした。

なおこの記事は後日「Distributed computing (Apache Spark, Hadoop, Kafka, ...) Advent Calendar 2022」にて公開するHive Distributed Profiling System実装方法の説明を簡単にするために書きました。万人に必要とされる情報ではないと思いますが、誰かの参考になれば嬉しいです。