2024年に取り組んだ分散処理OSSに対する貢献のまとめです。今年も引き続きHiveに関する活動が多く、Apache Iceberg関連のコントリビューションも増えたのが特徴かなと思います。
- パフォーマンス改善
- バグフィックスやエラー処理の改善
- Hive: STACK UDFの挙動
- Hive: STORED BYのエラーメッセージ
- Hive: CalciteによるUDF書き換えの挙動
- Hive: CBOフォールバックの無効化
- Hive: 主キーによる制約条件が存在する場合にNullPointerExceptionが発生する問題の解消
- Hive: timestamp変換モードが反映されない問題の解消
- Hive: IcebergのDECIMALタイプをバケットトランスフォームが適切に扱っていない問題の解消
- Hive: schematoolのバグ修正
- Hive: Calcite return pathの不整合
- Hive: 実行時統計情報をバックフィルする際に発生するエラーの解消
- Tez: Application MasterがDAGの状態を復旧しそこねる問題の解消
- その他
- まとめ
- 関連記事
パフォーマンス改善
Hive: Partition Aware Optimization(Bucket Map Join)
Hiveにはもともとパーティショニングやバケット化によりストレージレイアウトを制御する仕組みと、そのレイアウトをベースにpushdownを行ったりジョインアルゴリズムを最適化したりする仕組みがありました。Apache Icebergはより柔軟かつ汎用的なストレージレイアウトを実現する、hidden partitioningという仕組みを提供します。以下の例では、テーブルに存在する category
や created_at
というカラムから派生するパーティションを作成しています。
CREATE TABLE test_iceberg ( category STRING, description STRING, created_at TIMESTAMP) PARTITIONED BY SPEC (bucket(8, category), day(created_at)) STORED BY ICEBERG;
HiveのBucket Map Joinのような最適化はIcebergのバケットトランスフォームには未対応でした。そもそもHiveのバケット化は、バケット番号を計算するためのハッシュ関数として意図的にHiveのシャッフルに用いるハッシュ関数と同じものを採用するなど、クエリエンジンとしてのHiveと密結合することで実現しています。そこで、ストレージレイアウトとクエリエンジンの連携をpluggableにすることで、Hiveの高度な最適化をIcebergや他のOpen Table Formatで活用できるようにする、Partition Aware Optimizationというイニシアティブを作りました。
2024年には第一弾として、Bucket Map JoinをIcebergのバケットトランスフォームに適用できるようにしました。Apache Sparkではこの手の実装がStorage Partitioned Joinというプロジェクト内で実装されており、Amazon Web ServicesのTanakaさんに様々なアドバイスをいただきました。
Hive: CTEをマテリアライズする際に統計情報を伝搬させる
HiveのCTEマテリアライゼーションは、複数回アクセスされるCTEをテンポラリなテーブルとして保存する機能です。これにより、一部クエリを高速化したり、20段階くらいネストしている異常なCTEをエラーなく処理したりできるようになります。以下の例だと、cte
の結果がHDFSなどの分散ファイルシステムに保存されます。
SET hive.optimize.cte.materialize.threshold=2; SET hive.optimize.cte.materialize.full.aggregate.only=false; WITH cte AS (SELECT * FROM test WHERE name IS NOT NULL) SELECT * FROM cte UNION ALL SELECT * FROM cte;
Hive on Tezは通常一つのクエリから一つのDAGを生成します。CTEマテリアライゼーションが有効化された場合、次の図のように複数DAGを生成することで中間データの生成と読み取りを実現します。
中間データを保存するテーブルはHiveのテンポラリテーブルに似た仕組みで保存されます。このテーブルの統計情報を取得する仕組みがなく、ここでいう「DAG 2」は効率の悪い実行計画で動作することがよくありました。そこで、「DAG 1」の出力として保存されている統計情報を cte
テーブルをスキャンするオペレーターにコピーすることでCTEマテリアライゼーションがない場合と同様の実行計画が作成されるよう変更を行いました。
- [HIVE-28080] Propagate statistics from a source table to the materialized CTE - ASF JIRA
- [HIVE-28098] Fails to copy empty column statistics of materialized CTE - ASF JIRA
余談ですが、クエリの共通部分をマージする最適化として、Shared Work Optimizerという別のアプローチも実装されています。興味のある方は『Apache Hive 4: パフォーマンス改善まとめ - おくみん公式ブログ』をご覧ください。
Hive: カラムレベル統計情報が不足している場合のヒューリスティクス改善
Hiveは統計情報が不足している場合、ヒューリスティックなアプローチで各オペレーターが出力する行数やデータサイズを見積もります。WHERE col_without_stats LIKE 'abc%'
があるととりあえず出力は半分になるだろう、といった具合にそれっぽい値を当てはめていきます。WHERE
があると基本的には出力は減ることが期待されるのですが、id IN (1, 2, 3)
のように IN
に複数個の条件が指定されていると、Hive 4ではフィルターがない場合とまったく同じ見積もりになってしまう実装になっていました。これは意図していた挙動でないと考え、修正しました。
バグフィックスやエラー処理の改善
Hive: STACK UDFの挙動
STACKというやや特殊なUDTFが、特定の引数の組み合わせで不適切な結果を返すことを発見したので、修正しました。
Hive: STORED BYのエラーメッセージ
Icebergテーブルを作成するために導入された STORED BY
というキーワードに誤った設定値を与えると非常に分かりづらいメッセージが表示される問題があり、修正しました。
Hive: CalciteによるUDF書き換えの挙動
HiveはCalciteを用いて大量のコストベースもしくはルールベースの最適化を実装しています。その一つとして、あるUDFを別の表現に書き換える最適化があります。例えば、AVG(x)
を SUM(x) / COUNT(x)
で書き換える変換などが実装されています。これらの変換が、もとのUDFの出力と整合しないケースがあったため修正を行いました。
- [HIVE-28302] Let SUM UDF return NULL when all rows have non-numeric texts - ASF JIRA
- [HIVE-28082] HiveAggregateReduceFunctionsRule could generate an inconsistent result - ASF JIRA
Hive: CBOフォールバックの無効化
HiveのCBO(Cost Based Optimization)は多くのケースでうまく動くのですが、Hive 4.0時点ではいくつかのエッジケースがあります。そのため、CBOに失敗するとCBOを無効化してSQLを再コンパイルするモードが実装されています。難しいのが、新し目の機能はCBOを前提に実装されていることがあり、CBOを無効化しても回復は見込めない場合があることです。かつそのようなケースに遭遇するとユーザーには「CBOを有効化してください」というエラーメッセージが表示され、混乱のもととなります。そこで、CBOに依存する機能が使われている場合は再コンパイルを無効化するパッチを書きました。
Hive: 主キーによる制約条件が存在する場合にNullPointerExceptionが発生する問題の解消
HiveはCalciteを用いて、PRIMARY KEYやFOREIGN KEYなどの制約条件をもとに実行計画を最適化する機能があります。それが一部動かないケースがあったので修正しました。
Hive: timestamp変換モードが反映されない問題の解消
Hive 4からデフォルトでタイムスタンプ型と他の型との変換が厳しくなっています。ただしHiveはエンタープライズDWHを構築するためのソフトウェアなので、この手の変更は互換性を提供するためのオプションとともに取り込まれます。このケースだと、hive.strict.timestamp.conversion
という設定値を用いて元の挙動を復元することができます。この設定が、コンパイラ側では反映されるけど、Tezの実行時には反映されないという問題があり、修正しました。
Hive: IcebergのDECIMALタイプをバケットトランスフォームが適切に扱っていない問題の解消
Iceberg用のバケットトランスフォームがDECIMAL型の有効桁数などを適切に扱っておらず、エラーになる問題があったため修正しました。
Hive: schematoolのバグ修正
schematoolはHive MetastoreのスキーマをマイグレーションするためのCLIコマンドです。information_schema
などのHiveやHMSの動作に必須でないテーブルを正しくマイグレーションできないケースを発見したため、修正しました。
Hive: Calcite return pathの不整合
HiveのCBOは歴史的経緯により、Calciteの論理プランを一度HiveのASTに変換してからHiveのオペレーターツリーを生成するモードと、Calciteの論理プランをダイレクトにHiveのオペレーターツリーに変換する2つのモードが存在しています。前者がデフォルトです。後者のケースでGroupByOperatorが正しく生成されないケースを発見し、修正しました。
Hive: 実行時統計情報をバックフィルする際に発生するエラーの解消
Hiveにはオペレーターツリーのシグネチャーとともに統計情報を保存し、共通部分を有するクエリが実行される際、シグネチャーをもとに過去のクエリの実行時統計情報をバックフィルする仕組みがあります。これにより、リアルで正しい統計情報を用いて実行計画を作成することができます。ただシグネチャーの計算とそのマッピングはいうほど単純ではなく、もとの実装は少しでも違和感のあるマッピングが発生するとクエリ全体をエラーにするdefensiveな実装となっていました。実行時統計情報はそもそも必須ではないため、そのような状況が発生するとシグネチャーをすべて無効化し、実行時統計情報がない場合と同様に動作するよう変更しました。
実行時統計情報に関する最適化は『Apache Hive 4: パフォーマンス改善まとめ - おくみん公式ブログ』で詳しく解説しています。
Tez: Application MasterがDAGの状態を復旧しそこねる問題の解消
Apache Tezはビルトインなfault toleranceを提供しており、個々のタスクのみならずApplication Masterの状態をもリカバリすることができます。これはApplication Masterが様々なイベントをWALのようなイメージでHDFSに保存することで実現しています。かなり特殊なパターンでこのリカバリーに失敗し、アプリケーションが永遠に停止する問題に遭遇したので修正しました。
その他
Hive: 予約語に関する変更
Hive 4のキーワードが公式ドキュメントに記載されていなかったため、調査をしたうえでドキュメントの更新を行いました。
また、調べるうちにSQL:2023では予約語となっていないキーワードが予約語として定義されていることが判明しました。これはマイグレーション時に意図しないエラーを引き起こす可能性がありかつ不便なので、いくつかをnon-reserived keywordとして定義し直しました。Hiveではnon-reservedなキーワードをオプトイン方式で管理しているため、キーワードを定義する際についつい予約語として追加してしまう問題があります。それを防止するためのテストケースなどを追加したため、最近はレビュー時にむやみに予約語を足していないか確認できるようになりました。
Hive: Flakyテストの修正
なぜか高確率で落ちているテストがあったので修正しました。
Tez: Application Masterやタスクの開始・終了時に任意の処理を差し込むフックの実装
トレジャーデータでは性能問題を調査したりや高速化のヒントを得たりするため、Async Profilerが生成するイベントをデータベースに送信する仕組みを実装しています。
- TreasureData Tech Talk 2022で発表してきました #tdtechtalk - おくみん公式ブログ
- Hive on Tezのメトリクスを任意のデータ基盤に蓄積する方法 - おくみん公式ブログ
この仕組みはTezに独自パッチを当てることで実現していました。この発表後にTezのメンテナが似た仕組みでプロファイル結果を取得する機能を追加していたため、独自パッチを消せるよう、フックする仕組みを汎用的に実装できるようにする提案を行い、マージされました。
Trino: Icebergテーブルのソート順を指定する方法をドキュメント化
Icebergはメタデータとして、クエリエンジンがどのようにデータをソートすべきか指定するためのヒントを保存することができます。TrinoでCREATE TABLE時にそれを指定する方法がわからず、TrinoコミッターのEbiharaさん(@ebyhr)に相談したところ、機能自体は存在するもののドキュメント化されていなかったことが判明しました。同じ問題に遭遇する人が出てくるかもしれないので、ドキュメントを更新しました。
Iceberg: Util.blockLocationsを汎化
Partition Aware Optimizationを実装しているときに、当該APIの定義がやや使いづらかったので修正しました。
Iceberg: Hadoopカタログのタイポ修正
IcebergのHadoopカタログは「version-hint.text」というファイルに最新メタデータのバージョンを保存します。これが「version-hint.txt」と記載している箇所があったので修正しました。
まとめ
今年も様々な開発を行い、Hiveコミッターに招待していただくこともできました。
日本のビッグデータコミュニティもOpen Table Formatを中心に再度盛り上がってきているように感じます。自分もその一員となりつつ、2025年はさらなる挑戦を続けていけたらと思います。
最後に、OSS活動する上で相談に乗っていただいたTanakaさんやEbiharaさん、Hiveコミュニティのみなさん、そして日本のビッグデータコミュニティのみなさんに謝意を表明して2024年を締めくくりたいと思います。ありがとうございました!