おくみん公式ブログ

おくみん公式ブログ

分散処理OSSへのコントリビューション in 2023

Contributions to Apache Hive

2023年に取り組んだ分散処理OSSに対する貢献のまとめです。今年はApache Hiveのコミュニティが活性化したのでHiveやTezに対する貢献が多めです。

この記事は『Distributed computing (Apache Spark, Hadoop, Kafka, ...)のカレンダー | Advent Calendar 2023 - Qiita』24日目として執筆しました。若干遅れて申し訳ございません。

データ不整合の解消

ネストしたCTEをマテリアライズするとデータが消失する問題

Hiveには複数回参照されたCommon Table Expressionの計算結果を使い回し、重複計算を避ける機能があります。以下の例だとCTE x が閾値である二回以上参照されているので最適化の対象となります。

SET hive.optimize.cte.materialize.threshold=2;
WITH x AS (SELECT count(*) AS num FROM test)
SELECT num FROM x UNION ALL SELECT num * 2 FROM x;

CTEが複雑にネストしているとTezのVertexが正しい順番で実行されず、データロストしてしまう問題を発見したので修正しました。

LIMIT OFFSET Pushdownのバグ修正

LIMITはいくつかのケースでプッシュダウンが可能です。例えばLEFT OUTER JOIN後に結果を100行のみ取得する場合、左側のテーブルは100行だけスキャンすれば正しい結果を返すことができます。

SELECT *
FROM access
LEFT OUTER JOIN users ON access.user_id = users.user_id
LIMIT 100;

とはいえクエリを分散実行する場合、本当にきっちり100行だけ出力することは困難です。独立した分散並列タスクが何行スキャンすることができて、最終的に最大何行出力できるかは事前に確定しないためです。その結果、上記のSQLの場合すべてのタスクが100行ずつスキャンする実行計画が生成されます。多少必要以上のスキャンが発生するものの、タスク間のコーディネーションいらずの単純な仕組みで十分高速に動作します。

LIMIT Pushdown

一方LIMITと異なりOFFSETは単純にプッシュダウンすることができません。例えば LIMIT 100 OFFSET 100000000 のOFFSET 100,000,000を高度に並列化した全タスクに適用すると、1行も出力されないのは想像に難くないでしょう。しかしHiveはこのケースをプッシュダウンしてしまっていたため、正しくない結果が返ることがありました。

Wrong OFFSET Pushdown

そこでとりあえずLIMITとOFFSETを合計した値をプッシュダウンし、最後にシングルタスクのReducerを追加することで正しい結果が出力されるよう変更しました。

LIMIT OFFSET Pushdown

なおORDER BYが指定されている場合はグローバルソートの実行後でないと必要な100行が特定できないため、プッシュダウンは実行されずこの問題は発生しません。そもそもORDER BYのないOFFSETは有効に使えるケースがほぼないため、そのパターンをエラーにするパラメータも追加しました。

0: jdbc:hive2://hive-hiveserver2:10000/defaul> SET hive.strict.checks.offset.no.orderby=true;
No rows affected (0.004 seconds)
0: jdbc:hive2://hive-hiveserver2:10000/defaul> SELECT *
. . . . . . . . . . . . . . . . . . . . . . .> FROM access
. . . . . . . . . . . . . . . . . . . . . . .> LEFT OUTER JOIN users ON access.user_id = users.user_id
. . . . . . . . . . . . . . . . . . . . . . .> LIMIT 100 OFFSET 100000000;
Error: Error while compiling statement: FAILED: SemanticException OFFSET without ORDER BY are disabled for safety reasons. If you know what you are doing, please set hive.strict.checks.offset.no.orderby to false and make sure that hive.mapred.mode is not set to 'strict' to proceed. Note that you may get errors or incorrect results if you make a mistake while using some of the unsafe features. (state=42000,code=40000)

パフォーマンス改善

Auto Reduce Parallelismの改善

Hive on TezにはAuto Reducer Parallelismという機能があります。これは実行時の統計情報を利用して並列度を途中で変更するための機能です。

Auto Reducer Parallelism

Tezレベルではパイプライン化の効用と実行時統計情報の正しさのトレードオフをとるために、後続Vertexをスタートするタイミングをチューニングできるようになっています。例えばMap 1とReducer 2から構成されるDAGをデフォルトパラメータで実行すると、Map 1が25%終了したタイミングでReducer 2の並列度が確定し、Map 1の残りと並列に実行を開始します。そのパラメータがHiveからは調整できなかったので設定できるようにしました。これは異常なdata skewがあったり、どうしても正確な並列度が欲しかったりするケースで役立ちます。

またHiveはAuto Reducer Parallelismの恩恵が小さい場合それを無効化します。その実装がやや大雑把でAuto Reducer Parallelismが効いてほしいところでも効かなくなることがあったのでより緻密に挙動をコントロールできるようにしました

Fair Routingの開発

TezにはAuto Reducer Parallelismで使われているShuffleVertexManagerを拡張したFairShuffleVertexManagerというクラスが実装されています。ShuffleVertexManagerは後続処理の並列度が過剰な場合に、パーティションをマージすることで計算リソースを節約します。FairShuffleVertexManagerはそれに加えてdata skewのあるパーティションを複数Reducerで分散処理する機能を追加してくれます。

Fair Routing

ShuffleVertexManagerはパーティションのデータ量を非圧縮状態のサイズで計算します。一方FairShuffleVertexManagerは圧縮サイズで計算するケースと非圧縮サイズで計算するケースが混在しており、一貫性のためにすべて非圧縮サイズを使用するように変更しました。

また、FairShuffleVertexManagerをJOINと組み合わせて使う場合、小さい入力ソースのパーティションは関連するすべてのReducerにブロードキャストする必要があります。Tezはまだこのパターンをサポートしていないため、エッジが複数あるVertexにFairShuffleVertexManagerを設定するとエラーになります。

FairShuffleVertexManager + JOIN

この制約はUNION ALLのように同質の入力が複数存在する場合には不要なはずで、取り除く提案をしています。

あとはこの機能を触ってたら先にサンプルのTezアプリケーションを作ってみてはどうかと提案されたため、それにも着手しています。

ジェネリックなAM or TaskレベルのフックをTezに追加

Hive Distributed Profiling Systemを簡単に実装する方法 - おくみん公式ブログ』で解説したように、TDではApplication MasterやTask Attempt単位のプロファイリングを行うためにTezにパッチをあてています。それにインスパイアされたのかどうかは不明ですが、Tez本体にも非常によく似た機能が追加されていました

とはいえ今の形だとTDのパッチと互換性のある形で運用はできないので、プロファイラを仕込んでいる部分をpluggableにしたいという要望をサンプルの実装を添えて出してみました。

UDTFの出力に対するヒューリスティクスの改善

Hiveは実行計画を作成する際にヒューリスティックな計算式を用いて各種タイミングのデータ量を見積もります。例えば WHERE の中に = オペレータがあると行数は1/2になるだろう、といった具合です。UDTFオペレータの計算式に少し修正できるポイントがあったのでパッチを書きました。

MASK_HASHの改善

各行を処理するたびにログが出力されるようになっていました。性能劣化やディスクフルが怖いので修正しました。

[HIVE-27575] MASK_HASH UDF generate a log per row - ASF JIRA

Icebergのパーティショニング情報を利用したJOINアルゴリズムの実装

Apache IcebergはHiveネイティブテーブルより柔軟なパーティショニングをサポートしています。つまりHiveのネイティブテーブルに対して実装されている最適化は理論上Icebergでも利用することができます。現状はBucket Map Joinのような高度な最適化はまだ実装されていないので、そのプロトタイプを実装してみたりしています。

新機能追加

COLLECT_MAP UDFの実装

Trinoにある map_agg に相当するものが欲しかったので実装してみました。

Hive Metastoreクライアントをpluggableにする変更

Hiveのクエリエンジンは常にHMSをsingle source of truthとして扱うことを前提に作られています。世の中には異なるデータカタログをプライマリなソースとして扱いたい環境もあり、そのパッチが便利そうだったため引き継いで開発や議論に参加しています。

Cosmetic Changes

ちょっとしたエラーメッセージの変更や、ZooKage開発時に見かけたWARNINGを解消したりしています。

まとめ

今年携わったOSSコントリビューションのまとめでした。日本で似たような活動をしている人はまだそこまで多くないため、仲間が増えるといいなと思いつつ記事を書きました。

ビッグデータ関連のOSSはその性質上相互に連携する側面があるため、来年は今持っているナレッジを利用して他のプロダクトにも進出していきたいと思っています。

最後に、今年の後半継続的にOSSに貢献できたのはレビューしてくださったメンテナの皆様のおかげです。感謝の気持を持ちつつ、自分自身も積極的にコードレビューを手伝っていくつもりです。