HistoryLoggingServiceのAPIを用いてHive on Tezのメトリクスをデータプラットフォームに保存するという、ややマニアックなテクニックを紹介します。本記事は「Distributed computing (Apache Spark, Hadoop, Kafka, ...) Advent Calendar 2022」14日目の記事として執筆しました。
Hive on Tezのメトリクス
まずはHive on Tezにどんなメトリクスが存在するか紹介しておきます。Apache Tezはなかなか見やすいWeb UIを持っていて、様々な情報へ手軽にアクセスすることができます。例えばDAG全体の開始・終了時刻、実行時間といった情報や
Vertex・Task・Task Attemptレベルの情報、
DAG・Vertex・Task・Task Attemptレベルのカウンター、
DAGのノードを描画したもの、
どのVertexが支配的であったか、などを表示することができます。
メトリクスを保存する仕組み
こんなにも便利なTez UIは実はただのステートレスなWebアプリケーションで、メトリクスそのものはYARN Timeline Serverに保存されています。
TezにはHistoryLoggingServiceという各種イベント、例えばDAGの開始やTaskの終了等を受け取るためのインターフェースが定義されています。多くの本番環境ではATSHistoryLoggingServiceを用いるように設定されており、TezのApplication Masterがそのクラスを用いてそれぞれのイベントをTimeline Serverへ送信している、というわけです。
メトリクスをTimeline Serverとデータ基盤にマルチポストする
Tez UIは特にクエリ単位の情報を参照するにはこの上なく便利です。一方表示できる情報が限られており、クエリを横断した検索能力も高くないため、かゆいところ全てに手が届く類のシステムではありません。SQLやSparkを用いた柔軟な分析を行うためにはDWHのような分析基盤にメトリクスをコピーするのが一番手っ取り早いでしょう。
ただTimeline Serviceに格納されたデータを上手に取りこぼしなく取り込むのは結構難しい。そこでHistoryLoggingServiceを自作することで、Tezから直接容易にメトリクスを転送できるようになるというのがこの記事で伝えたい内容です。
HistoryLoggingServiceはプラッガブルで、設定値 tez.history.logging.service.class
で任意の実装を選択することができます。したがってATSHistoryLoggingServiceと互換を保ったままHistoryLoggingServiceを拡張してやれば、Tez元来の機能を維持しつつ他の場所へもメトリクスを転送することができます。
以下に一番簡単な例として、ATSHistoryLoggingServiceを継承し、イベントをTimeline Serviceに送りつつ標準出力にも書き出すような実装を載せています。実際はFluentdやKafkaに転送する処理を実装することになるでしょう。その場合 serviceInit
メソッドや serviceStop
メソッドを実装してクライアントライブラリの設定やバッファのフラッシュを行う必要があります。HistoryLoggingServiceのAPIを実装する際は必ずATSHistoryLoggingService側の実装を呼び出す必要があることに注意してください。
package org.apache.tez.dag.history.logging; import org.apache.tez.dag.history.DAGHistoryEvent; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; import org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService; public class CustomHistoryLoggingService extends ATSHistoryLoggingService { @Override public void handle(DAGHistoryEvent event) { // Process the event through ATSHistoryLoggingService super.handle(event); // Process the event through the custom logic sendToStdout(event.getHistoryEvent()); } private void sendToStdout(HistoryEvent event) { switch (event.getEventType()) { case TASK_ATTEMPT_FINISHED: sendTaskAttemptFinishedToStdout((TaskAttemptFinishedEvent) event); break; // You can catch many types of events default: break; } } private void sendTaskAttemptFinishedToStdout(TaskAttemptFinishedEvent event) { System.out.printf( "TaskAttemptFinishedEvent(id=%s, state=%s, start=%d, finish=%d)%n", event.getTaskAttemptID(), event.getState(), event.getStartTime(), event.getFinishTime() ); } }
tez.history.logging.service.class
にカスタマイズしたCustomHistoryLoggingServiceのFQCNを指定します。
<property> <name>tez.history.logging.service.class</name> <value>org.apache.tez.dag.history.logging.CustomHistoryLoggingService</value> </property>
クエリを実行しTez Application Masgerのログを見てみると実装した内容が反映されていることがわかります。
zookage@client-node-0:~$ yarn logs -containerId container_1671011282351_0002_01_000001 -logFiles stdout 2022-12-14 10:16:22,726 INFO client.RMProxy: Connecting to ResourceManager at yarn-resourcemanager-0.yarn-resourcemanager.zookage.svc.cluster.local./10.1.21.175:8032 2022-12-14 10:16:22,890 INFO client.AHSProxy: Connecting to Application History server at yarn-timelineserver-0.yarn-timelineserver.zookage.svc.cluster.local./10.1.21.182:10200 Container: container_1671011282351_0002_01_000001 on yarn-nodemanager-2.yarn-nodemanager.zookage.svc.cluster.local_8041 LogAggregationType: AGGREGATED ======================================================================================================================= LogType:stdout LogLastModifiedTime:Wed Dec 14 10:11:22 +0000 2022 LogLength:378 LogContents: 2022-12-14 10:05:45 Running Dag: dag_1671011282351_0002_1 TaskAttemptFinishedEvent(id=attempt_1671011282351_0002_1_00_000000_0, state=SUCCEEDED, start=1671012351141, finish=1671012352811) TaskAttemptFinishedEvent(id=attempt_1671011282351_0002_1_01_000000_0, state=SUCCEEDED, start=1671012352904, finish=1671012353057) 2022-12-14 10:05:53 Completed Dag: dag_1671011282351_0002_1 End of LogType:stdout ***********************************************************************
終わりに
HistoryLoggingServiceをカスタマイズするとTezのメトリクスを簡単に任意のビッグデータ基盤に格納できるよ、というお話でした。
なおこの記事は後日「Distributed computing (Apache Spark, Hadoop, Kafka, ...) Advent Calendar 2022」にて公開するHive Distributed Profiling System実装方法の説明を簡単にするために書きました。万人に必要とされる情報ではないと思いますが、誰かの参考になれば嬉しいです。