BT

最新技術を追い求めるデベロッパのための情報コミュニティ

寄稿

Topics

地域を選ぶ

InfoQ ホームページ アーティクル CloudflareにおけるKafkaの話 ~ メッセージ1兆件に至るまでに学んだ教訓

CloudflareにおけるKafkaの話 ~ メッセージ1兆件に至るまでに学んだ教訓

キーポイント

  • Cloudflareでは、大量のデータを処理するためにKafkaクラスタを採用しており、チームの分離、効果的なスケール、数兆件のメッセージの処理のために開発された汎用メッセージバスクラスタがある。
  • イベント駆動型システムの非構造化通信の問題に対処するためには、強力なコントラクトを定める必要がある。そのため、CloudflareはクロスプラットフォームのデータフォーマットProtobufを用いている。
  • 開発ツールのメトリクスへの投資は、問題を容易に明らかにするために重要である。CloudflareはSDKをOpenTracingとPrometheusのメトリクスで充実させ、システムの挙動を理解し、特にインシデント時により良い意思決定を行うようにした。
  • SDKの採用や使用に一貫性を持たせ、ベストプラクティスを推進するためには、パターンに関する明確なドキュメントを優先させることが重要である。
  • Cloudflareは、柔軟性とシンプルさのバランスを取ることを目指している。設定範囲の広いセットアップは柔軟性が高い一方で、シンプルなセットアップは異なるパイプライン間での標準化を可能にする。

Cloudflareは、サービス間通信のために、6年足らずでKafka上に1兆件以上のメッセージを生成した。会社やアプリケーションサービスチームが成長するにつれて、高速な配信を続けるためにツールを適応させなければいけなかったのである。

本記事では、分散したドメインベースのチームで作業していた初期の頃や、1兆メッセージの大台を達成するためにKafka上にどのように抽象化を展開したのか説明する。

また、ここ数年で発生したスケーラビリティの制限により発生したインシデントや、増大する需要に対処するために適用された手順やパターンについても解説する。

Cloudflareとは?

Cloudflareは、顧客にグローバルネットワークを提供し、ウェブサイト、API、インターネットトラフィックの安全性を確保できる。

このネットワークによって、企業ネットワークを保護できる。また、顧客がエッジ上にアプリケーションを展開して実行可能にするものである。

Cloudflareは、これらの目標を達成するために、CDN、Zero Trust、Cloudflare Workersなどの製品を提供し、悪意のある活動を特定・ブロックして、顧客が業務に集中できるようにしている。

図1:Cloudflareのグローバルネットワーク

Cloudflareのネットワークをエンジニアリングの観点から見ると、Global EdgeネットワークとCloudflareコントロールプレーンという2つの主要コンポーネントがある。

ネットワークの大部分はCloudflareの製品で構築されており、Workersはエッジネットワークには導入され、利用されている。一方、コントロールプレーンはデータセンターの集合体で、Kubernetes、Kafka、データベースをベアメタルで稼働させている。KafkaのプロデューサーとコンシューマーはすべてKubernetesにデプロイされるのが普通だが、具体的なデプロイ先はワークロードや求める成果によって異なる。

この記事では、Cloudflareのコントロールプレーンに注目し、サービス間通信とイネーブルメントツールがどのように運用をサポートするためにスケールされているのかを探ることにする。

Kafkaについて

Apache Kafkaは、複数のブローカーで構成されるクラスタの概念に基づいており、各クラスタには、調整を担当するリーダーブローカーが指定されている。下図では、ブローカー2がリーダーとして機能している。

図2:Kafkaクラスター

メッセージは、例えば、ユーザーに関するイベントであれば、ユーザー作成、ユーザー情報更新などのトピックに分類される。トピックはパーティションに分けられるが、これはKafkaが水平方向に拡張するためのアプローチである。図では、トピックAのパーティションが両方のブローカーに存在し、各パーティションには、その「情報源」を決定する指定されたリーダーが存在する。 レジリエンスを確保するため、パーティションはあらかじめ決められたレプリケーション係数に従ってレプリケートされ、通常は3が最小となる。Kafkaにメッセージを送るサービスはプロデューサーと呼ばれ、メッセージを読むサービスはコンシューマーと呼ばれる。

クラウドフレアの技術文化

過去、CloudflareはモノリシックなPHPアプリケーションだったが、会社の成長と多様化に伴い、このアプローチが制限的でリスクが高いことが判明したのである。

現在では、特定のツールやプログラミング言語を強制するのではなく、チームがサービスの構築と保守の権限を与えられている。同社は実験を奨励し、効果的なツールやプラクティスを提唱している。アプリケーションサービスチームは、ベストプラクティスを組み込んだパッケージ化されたツールを提供して、他のチームを成功に導くチームで、比較的新しくエンジニアリング組織に加わったものだ。これによって開発チームは価値の提供に集中できる。

緊密な結合

製品の提供が拡大するにつれて、他チームから独立し、自分のペースで作業して動けるようになるためのチーム独自の方法を見つける必要があった。くわえてエンジニアリングチームは、バックオフリクエストと作業完了の保証を精度高くコントロールする必要があった。

すでに大量のデータを処理するためにKafkaクラスターを運用していたので、汎用的なメッセージバスクラスターの作成に時間を割くことにした。オンボーディングは簡単で、リポジトリへのプルリクエストが必要で、レプリケーション戦略、保持期間、ACLなど新しいトピックに必要なすべてをセットアップする。この図は、メッセージバスクラスターが、異なるチームからの独立にどのように役立つかを示している。

図3:汎用的なメッセージバスクラスター

例えば、3つのチームは、特定のサービスを意識することなく、監査ログシステムが関心を持つメッセージを発信できる。結合が少なくなったことで、エンジニアリングチームはより効率的に作業でき、効果的に拡張できる。

非構造化コミュニケーション

イベント駆動型のシステムでは、結合を避けるために、システムは相互に認識するべきではない。当初、私たちには強制的なメッセージフォーマットがなく、プロデューサーチームがメッセージをどのように構造化するか決定するように任されていた。これは、構造化されていないコミュニケーションにつながり、チームが強力なコントラクトを設定していない場合、処理不能なメッセージ数が増えるという課題をもたらしていた。

構造化されていないコミュニケーションを避けるため、チームはKafkaのエコシステム内でソリューションを探し、Apache Avroprotobufという2つの選択肢を見つけ、後者が選ばれた。以前はJSONを使用していたが、互換性を強制するのが難しく、JSONのメッセージはprotobufと比較して容量が大きくなりがちなことがわかったのである。

図4:protobufメッセージ

Protobufは、厳密なメッセージタイプと固有の前方互換性、後方互換性を提供し、複数のプログラム言語でコードを生成できることも大きなメリットである。チームはprotobufメッセージに対する詳細なコメントを推奨しており、改行検出や文体ルールの強制にはUber社のオープンソースツールであるPrototoolを使用している。

Figure 5: Switching to Protobuf

Protobufだけでは十分ではなかった。異なるチームが同じトピックにメッセージを発することはままあるし、フォーマットが期待したものと違うためにコンシューマーが処理できないこともある。さらに、Kafkaのコンシューマーとプロデューサーの設定は簡単な作業ではなく、ワークロードに関する複雑な知識が必要だった。ほとんどのチームがGoを使っていたため、Goで「メッセージバスクライアントライブラリ」を構築し、ベストプラクティスを取り入れ、チームが速く動けるようにした。

チームが同じトピックに対して異なるメッセージを発するのを避けるため、(クライアント側で)トピックごとに1つのprotobufメッセージタイプを強制する、という物議を醸す決断を下した。この決定により、簡単に採用できるようになったが、その結果、複数のパーティションをレプリケートした多数のトピックが作成され、最終的な最小レプリケーション係数は「3」となってしまった。

コネクターの検討

ツールや抽象化を導入することで、Kafkaインフラストラクチャの簡素化は大きく進んだが、ベストプラクティスに確実に沿うには、さらなるユースケースやパターンが必要であることに気づいたため、チームはコネクター・フレームワークを開発した。

図6:コネクター・フレームワーク

Kafkaコネクタをベースにしたこのフレームワークでは、KafkaやCloudflareのEdgeデータベースQuicksilverのように、エンジニアは。あるシステムから読み取って別のシステムにプッシュするサービスを作成できる。プロセスを簡略化するため、サービス作成のテンプレートとしてCookiecutterを使用しており、エンジニアはCLIにいくつかのパラメータを入力するだけでよい。

コネクタの設定プロセスはシンプルで、コードを変更することなく、環境変数を通して実行できる。

以下の例では、リーダーはKafka、ライターはQuicksilverである。コネクタは、トピック1とトピック2から読み込み、関数pf_edgeを適用するように設定されている。この構成で、メトリクスやアラートなど、本番に移行するために必要なものもすべて含まれており、チームは簡単にベストプラクティスに沿うことができる。チームには、カスタム変換を登録するオプションがあり、これが唯一書く必要のあるコード部分となる。

図7:シンプルなコネクタ

例えば、コミュニケーション・プリファレンスサービスでコネクタを活用している。ユーザーがCloudflareのダッシュボードでマーケティング情報をオプトアウトしたい場合は、このサービスとやりとりしてそれを実現するのである。コミュニケーション・プリファレンスのアップグレードはデータベースに保存され、Kafkaにメッセージを送る。トランザクションメールサービス、顧客管理システム、マーケットメールシステムの3つの異なるソースシステムに変更が反映、同期するための別々のコネクターを使用する。このアプローチにより、最終的にシステムに一貫性が生まれ、Kafkaが提供する保証を活用して、プロセスがスムーズに行われるようにしているのだ。

図8:コネクタとコミュニケーション・プリファレンス

可視化の取り組み

パンデミックの期間中、急速に顧客数が増加したため、スループットが増加し、私たちが作成した抽象化機能の一部にスケーラビリティの問題があることが明らかになった。

その一例が、私たちがKafkaの顧客のために処理している監査ログだ。私たちは、これらのログを管理するシステムを構築し、プロデューサー・チームがイベントを生成し、その間に私たちがイベントを聞き取り、データベースにデータを記録できるようにした。

図9:Auditログ用のlog pushを追加する

私たちは、APIと、監査ログデータをCloudflare R2やAmazon S3などの様々なデータバケットにプッシュできるlog pushという統合によって、この情報を公開している。

パンデミックの期間中、多くの監査ログが登録され、顧客は最新のデータを取得するために当社のAPIを使用するようになった。このアプローチはスケーラブルではなかったため、この問題に対処するためにパイプラインを開発することにした。監査ログのイベントをリスニングし、バケットに直接保存するために適切な形式に変換する小さなサービスを作成し、APIに過度の負荷をかけることなく、この問題を解決した。

ログが蓄積され、それを迅速に消去できないため、遅延やSLA違反が発生し、さらなる問題に直面した。SDKには診断ツールやインスツルメンテーションがなかったため、ボトルネックがKafkaからの読み込みなのか、変換なのか、データベースへの保存時なのか、タイムラグの原因がわからなかった。

図10:ボトルネックはどこにあるのか?

私たちは、SDKをPrometheusのメトリクスで強化し、メッセージの処理にかかる各ステップの時間をヒストグラムで計測することで、この問題に対処することにした。これは、遅いステップを特定するのに有効だったが、特定のメッセージに対して、どのコンポーネントがより長い時間をかけているのかを把握できなかった。これを解決するために、私たちはOpenTelemetryを調査し、そのトレースインテグレーションに注目した。というのもKafka上のOpenTracingには、あまり良いインテグレーションがなく、本番インシデント中に異なるサービス間でトレースを伝播することは困難だったからである。

チームがSDKをOpenTracingをアップグレードすることで、バケットへのデータのプッシュとKafkaからの読み込みがボトルネックになっていることを特定でき、それらの問題の修正に優先順位をつけて対処した。

図11:ボトルネックを特定する

SDKにメトリクスを追加することで、クラスタとサービスの健全性をよりよく把握できた。

ノイズの増加への対応

大量のメトリクスを収集した結果、不健全なアプリケーションや遅延問題に関連する多くのアラートが表示され、オンコールがノイジーになってしまうという問題が発生した。

図12:アラートパイプライン

基本的なアラートパイプラインは、PrometheusとAlertManagerで構成されており、PagerDutyにページすることになる。サービスの再起動やスケールアップ/ダウンが理想的ではなかったため、Kubernetesを活用してヘルスチェックを実装する方法を検討することにした。

Kubernetesでは、ヘルスチェックには「liveness」「readiness」「startup」の3種類が存在する。Kafkaの場合、通常はHTTPサーバーが公開されていないため、レディネスプローブを実装しても意味がないため、別のアプローチを実装した。

図13:ヘルスチェックとKafka

ヘルスチェックのリクエストを受けると、トピックの一覧表示など、ブローカーとの基本的な通信を試み、その応答が成功すればチェックはパスする。しかし、アプリケーションは正常でも、メッセージの生成やコンシューミングできないケースもあり、そのため、コンシューマーに対してよりスマートなヘルスチェックを実装することになった。

図14:ヘルスチェックの実装

Kafkaの現在のオフセットは、パーティション上で最後に利用可能なオフセットであり、コミットされたオフセットは、コンシューマが正常にコンシューミングした最後のオフセットである。
ヘルスチェック中にこれらのオフセットを取得することにより、コンシューマが正しく動作しているかどうかを判断できる。オフセットを取得できない場合は、根本的な問題があると考えられ、そのコンシューマに異常があることが報告される。オフセットが取得可能な場合、最後にコミットしたオフセットと現在のオフセットを比較する。それらが同じであれば、新しいメッセージは追加されておらず、コンシューマは健全であるとみなされる。最後にコミットしたオフセットが異なる場合は、以前に記録した最後にコミットしたオフセットと同じかどうかを確認し、コンシューマがスタックして再起動が必要な状態になっていないことを確認する。このプロセスにより、オンコールでのエクスペリエンスが改善され、顧客のユーザー体験が向上した。

「追いつかない」問題

私たちは、チームがメールシステム用にKafkaからイベントを生成できるシステムを使用していた。これらのイベントには、例えば「under attack」というテンプレートが含まれており、攻撃を受けているウェブサイトの情報と攻撃者の身元がメタデータとともに含まれていた。

私たちはそのイベントをリスニングし、レジストリからメールテンプレートを取得し、それに追記して顧客に配信していた。しかし、負荷の問題が発生した。生産率が急上昇し、コンシューマーにタイムラグが生じ、重要なOTPメッセージやSLOに影響が出るようになったのだ。

図15:コンシューマーのタイムラグ

バッチによる解決

この問題に対処するため、さまざまな解決策を検討し始めたが、初期の解決策であるパーティションとコンシューマの拡張では、大きな改善は見られなかった。

図16:バッチ・アプローチ

私たちは、シンプルだが効果的なアプローチとして、一度に一定数のメッセージを処理し、変換を適用し、それらをバッチ処理するバッチ・コンシューミングを導入することにした。これは効果的であることが証明され、チームは高い生産率を容易に処理できるようになった。

図17:バッチ処理でコンシューマーにタイムラグが生じない

ドキュメンテーションについて

SDKを開発する中で、多くの開発者がSDKを使用する際に問題に遭遇していることがわかった。ある人はバグに遭遇し、またある人は特定の機能の実装方法や特定のエラーの解釈について確信が持てなかった。そこで、Googleチャット上に、ユーザーが質問できるチャンネルを設置。1人の担当者が対応し、Wikiで調査結果や回答を文書化することに時間を費やした。これにより、SDKの全体的なユーザーエクスペリエンスを向上させることができた。

結論

学ぶべき教訓が4つある。

  • 柔軟性とシンプルさのバランスを常に考えよう。設定範囲の広いセットアップは柔軟性が高いが、シンプルなセットアップは異なるパイプライン間での標準化を可能にする。
  • 可視化しよう。できるだけ早くSDKにメトリクスを追加することで、チームがシステムの挙動を理解し、特にインシデント発生時により良い意思決定を行える。
  • 1つの強力で厳格なコントラクトを実施することで、トピック内で何が起こっているのか、誰が書き、誰が読んでいるのかを知ることができ、強い可視性を得られる。
  • 自分が行った良い仕事を文書化することで、質問に答えたり、制作上の問題のデバッグを手伝ったりするのに時間を費やす必要がないようにする。これは、GoogleチャットやWikiのようなチャンネルで実現ができる。

これらのルールに従うことで、ストレスの高い状況でもシステムを改善し、顧客に満足してもらえた。

作者について

この記事に星をつける

おすすめ度
スタイル

BT