『Google Cloud Dataflow で Google BigQuery へストリーミング ETL するの巻』で加工したアクセスログを集計し、一定の条件を満たすと Slack へアラートを飛ばすシステムを作りました。
Apache Beam(Scio) + Google Cloud Dataflow を用いてログの集計と監視を行い、問題のあるアクセスが見つかったら Google Cloud Functions で Slack へ通知するという構成です。
ストリーミング ETL の記事と同様にその仕組みを紹介します。
アーキテクチャ
次の流れで監視を行います。
- Google Cloud Dataflow で Google Cloud Pub/Sub のトピック「okumin-com-access-log-refined-v1」から取得したアクセスログを解析し、問題があれば「okumin-com-alert-v1」へアラートメッセージを書き込み
- Google Cloud Functions で「okumin-com-alert-v1」からアラートメッセージを取り出し、Slack へ通知
「okumin-com-access-log-refined-v1」へアクセスログを保存するまでの流れは『Google Cloud Dataflow で Google BigQuery へストリーミング ETL するの巻』をご覧ください。
下ごしらえ
前準備として次の作業を行いました。
- Cloud Pub/Sub
- サブスクリプション「okumin_com_access_log_refined_v1_monitor」を作成
- 「okumin-com-access-log-refined-v1」を読み込むためのサブスクリプション
- Cloud Pub/Sub は fan-out できるため、ETL で用いたものと同じトピックをサブスクライブ
- トピック「okumin_com_alert_v1」を作成
- アラートメッセージを登録するためのトピック
- サブスクリプション「okumin_com_access_log_refined_v1_monitor」を作成
- Slack
- Incoming Webhooks の URL を取得
監視の流れ
監視ジョブ
Scio でストリーム処理を記述します。
sc // Cloud Pub/Sub からアクセスログ(Json)を取得 .customInput( "monitor-input", PubsubIO.readStrings().fromSubscription(in).withIdAttribute(UniqueIdKey) ) // ストリームデータを60秒のウィンドウに区切る .withFixedWindows(Duration.standardSeconds(60)) // アクセスログ(Json)をデシリアライズし AccessLog クラスに変換 .flatMap(AccessLogs.fromJson) // ステータスコードをキー、そのステータスコードを持つログの件数をバリューとする Map を計算 .aggregate(Map.empty[Int, Long])( (acc, log) => acc.updated(log.status, acc.getOrElse(log.status, 0L) + 1), (acc1, acc2) => { val keys = acc1.keySet ++ acc2.keySet keys.foldLeft(Map.empty[Int, Long]) { case (acc, key) => acc.updated(key, acc1.getOrElse(key, 0L) + acc2.getOrElse(key, 0L)) } } ) // よさそうな Map であれば捨てる、悪そうな Map であれば通す // 本当は5xx系レスポンスを返しているもの、みたいな条件でフィルタするのがよい // 確認のため404とかでもアラートが飛ぶようにした .filter(_.exists { case (status, count) => status != 200 && status != 304 && count > 10 }) // アラートメッセージを伝えるための Json を組み立て .map { stat => val message = stat.map { case (key, value) => s"$key -> $value" }.mkString(", ") Json.stringify(Json.obj("message" -> message)) } // Json を Cloud Pub/Sub へ書き込み .saveAsCustomOutput("monitor-output", PubsubIO.writeStrings().to(out))
Dataflow へのデプロイ方法は『Google Cloud Dataflow で Google BigQuery へストリーミング ETL するの巻』で紹介したので省略します。
Slack への通知
アラートメッセージの Slack 通知は Cloud Function で行います。
Cloud Function は AWS Lambda のようなもので、イベントにフックして JavaScript の関数を実行させるものです。
今流行りのサーバレスアーキテクチャを実現できます。
ここでは Cloud Pub/Sub への書き込みをトリガーとします。
- Google Cloud Pub/Sub Triggers
- サードパーティ サービスの通知の設定
- Google Cloud Container Builder のイベントを Slack に通知する例
次のような JavaScript を作成します。
「SLACK_WEBHOOK_URL」を環境変数かなにかで指定したい気持ちがありましたが、現状よしなにパラメータを渡す仕組みがなさそうなので JS にハードコードしてしまうのが楽そうです。
const IncomingWebhook = require('@slack/client').IncomingWebhook; const SLACK_WEBHOOK_URL = "YOUR_WEBHOOK_URL"; const webhook = new IncomingWebhook(SLACK_WEBHOOK_URL); module.exports.subscribe_alert = (event, callback) => { const pubsubMessage = event.data; const data = JSON.parse(new Buffer(pubsubMessage.data, 'base64').toString()); const message = { text: data.message } console.log(message); webhook.send(message, (err, res) => { callback(err); }); };
次に package.json を作成します。
{ "name": "slack-alert", "version": "0.0.1", "description": "Alert to Slack", "main": "index.js", "dependencies": { "@slack/client": "3.9.0" } }
最後に index.js と package.json のある場所で以下のコマンドを実行します。
「TRIGGER_TOPIC」にはアラートが登録されるトピックを、「STAGE_BUCKET」には Cloud Functions のパッケージを配置するための適当なバケットを記述します。
サブスクリプションは Cloud Functions をデプロイすると勝手に作ってくれます(むしろ自作したサブスクリプションを登録させてほしい気がしますが)。
TRIGGER_TOPIC=okumin-com-alert-v1 STAGE_BUCKET=okumin-dataflow-prod # Currently only us-central1 is supported gcloud beta functions deploy subscribe_alert \ --trigger-topic ${TRIGGER_TOPIC} \ --region us-central1 \ --stage-bucket ${STAGE_BUCKET}
デプロイが完了すると、フィルタ条件に引っかかった場合に通知が飛んでくるようになります。
まとめ
以上、Cloud Pub/Sub、Cloud Dataflow、Cloud Functions を用いてストリーム処理による監視システムを構築しました。 ちょっとした処理をイベント駆動で実行したい場合、Cloud Functions は手頃でいいですね。
今回作ったものは監視というにはあまりにも単純なものですが、もう少し頭のいい人が実装すれば普通に使えるシステムになりそうです。