PostgreSQLで作る非同期ジョブキュー

ogp

はじめに

こんにちは、enechainでエンジニアをしている青戸です。

本記事では、イベント駆動な非同期処理のスループットをPostgreSQLのみで改善した事例を紹介します。外部のメッセージブローカーなどを追加せず、PostgreSQLの機能を使ってシンプルな構成のまま性能を向上させられたので、同じような課題に取り組む方の参考になれば幸いです。

背景と課題

背景

我々のチームでは、社内向けに電力の仲介業務を効率化するためのWebアプリケーションを開発しています。アプリケーションの特性として、電力という公共性の高い商品を扱うことから、我々の市場で公平な取引が行われていることを証明するために省庁への取引ログ提出を求められる場合があります。そのため任意の時点でのシステム状態を復元できる必要があり、これが容易なアーキテクチャとしてCQRS + Event Sourcingパターンを採用しています。

Event Sourcingではドメインイベントが一級市民であるため、自然とイベントをトリガーとした非同期処理が多くなります。我々のアプリケーションでも「注文が作成されたらその派生注文を計算する」「約定したら通知を送る」といった後続業務を、イベント駆動で非同期に実行しています。この非同期処理基盤は、リリースしてしばらくは十分な性能を発揮していましたが、書き込み量の増加に伴いスループットがボトルネックとなり、見直しが必要な状況になりました。

以降では、ボトルネックの背景にある非同期処理の基本構成と改善前のアーキテクチャを整理したうえで、顕在化した課題を見ていきます。なお、イベント駆動な処理フローの設計については、以前の記事「Process Managerパターンで複雑な業務フローを見通しよく実装する」で詳しく紹介していますので、そちらもぜひご覧ください。

イベントをトリガーとした非同期処理の基本的な構成

Event Sourcingを採用したシステムでは、ビジネスロジックの実行結果はドメインイベントとしてイベントストアに永続化されます。非同期処理を担うプロセスは、何らかの方法でイベントストアから未処理のイベントを取り出し、後続業務に対応するハンドラを見つけて処理を実行します。

一般的なイベント駆動の非同期処理のアーキテクチャ図

ここで重要なのが処理するイベントの順序保証です。例えば「注文を作成した」イベントと「注文を変更した」イベントが発生した場合、この2つが処理される順序が入れ替わると、存在しない注文を変更しようとしてエラーになってしまいます。同一の集約に対するイベントは、発生した順序どおりに処理される必要があります。

改善前のアーキテクチャ

改善前のアーキテクチャもPostgreSQLを基盤としており、イベントストアはdomain_eventテーブルに、処理済みイベントの管理は別テーブルに格納する構成でした。下記の図のように定期的にイベントストアをポーリングし、未処理のイベントを1件だけ取り出して対応する複数のイベントハンドラを並列に呼び出すような構成となっていました。すべてのイベントハンドラで処理が完了したら対象のイベントを処理済みとマークし、次のイベントの処理に進みます。

改善前のアーキテクチャ図

構築当初はイベント量が少なく、順序保証と実装のシンプルさを両立するために、意図的にイベントをトリガーとした処理の並列化は行わず直列に処理する方式を採用していました。

顕在化した課題

サービスの成長に伴いイベント量が増加すると、主に2つの課題が顕在化しました。

1. 不要な直列化によるスループットの制限

先述のとおり、同一集約に対するイベントは順序を守って直列に処理する必要がありますが、異なる集約のイベント同士は互いに独立しており、並列に処理しても問題ありません。にもかかわらず、すべてが直列化されていたため、本来並列に処理できるはずのものまで待たされ、スループットが制限されていました。

市場の流動性向上や外部システムとの連携によって書き込みパターンが変化し、イベント量が増加したことで、この不要な直列化がボトルネックとなり、業務にも深刻な影響を与えるような性能低下が発生するようになりました。

2. 未処理イベント取得の性能劣化

未処理のイベントを特定するために、イベントストアと処理済みイベントの管理テーブルをJOINするクエリを使用していました。通常時は高速に動作していましたが、大量のイベントを処理する際に処理済みイベントの管理テーブルへの書き込みが集中し、クエリの性能が大幅に低下することがわかりました。

本来やりたいのは「未処理のイベントを取り出す」というシンプルな操作ですが、その操作自体がボトルネックとなり非同期処理のスループットを低下させてしまうという状況でした。

改善の方針

アーキテクチャ

改善前のアーキテクチャの課題を踏まえ、下記の3つの方針で図のような構成にアーキテクチャを見直すことにしました。

改善後のアーキテクチャ図

1. バッチでエンキューしたジョブを並列に実行する

改善前はWorkerがイベントを1件ずつ取得して処理する構造で、イベント同士の並列化はできず、1件処理するごとに未処理イベント検索の重いクエリが走ることがスループットの制約になっていました。

改善後は、図の上流にある enqueuer が複数のイベントをまとめてジョブに変換し、中央のジョブキューに書き込みます。下流の worker はジョブキューから並行にジョブを取り出して処理を実行します。エンキューをバッチ化することで1件あたりの検索オーバーヘッドを下げ、ジョブを並列に実行することでスループットをスケールさせる構造です。

2. 同一集約内では直列化し、異なる集約間では並列化する

改善前はすべての処理を直列に実行していましたが、同一集約に対するジョブは直列に、異なる集約に対するジョブは並列に実行するようにします。これにより、イベントの順序保証を維持しつつスループットを向上させることができます。 例えば、改善前のアーキテクチャでは event1(entity_id=1) についてすべての処理が完了するまで後続の event2(entity_id=2) の処理が開始できませんでしたが、改善後は event1 と同一集約に対するイベントである event3 以外のイベントから作られたジョブは並行で処理できるようになります。

3. Workerインスタンスを水平にスケールできる構造にする

我々のシステムには計算量の多い処理も含まれるため、単一インスタンス内で goroutine を増やすだけではCPUが飽和するケースがあります。処理能力を伸ばす手段としてはインスタンスのスケールアップも考えられますが、スケールアウトであれば障害時の可用性も同時に確保できるため、複数のWorkerインスタンスから同時にジョブを取り出せる構造を設計目標としました。

実装方式の選定

上記の方針を実現するためにいくつか実装方針を検討しましたが、最終的にはPostgreSQLベースのジョブキューを自作する方式を採用しました。ジョブキューを実現する基盤として検討した選択肢とそれぞれのメリット・デメリットは以下のとおりです。

方式 メリット デメリット
マネージドのメッセージングサービスを利用
(Cloud Pub/Sub、Cloud Tasks など)
- 運用コストが低い
- 無制限にスケールするサービスならワークロードの大規模化に対応しやすい
要件によってはある程度の自作が必要になる
自前ホストしたメッセージングサービスを利用
(Kafka、NATS など)
- ジョブの直列実行など必要な機能は自前開発なしに実現できる
- メッセージング基盤を水平にスケールできるため、ワークロードの大規模化に対応しやすい
- クラスタの運用コストが発生
- システム構成が複雑になる
PostgreSQLで自作 - DBのみで完結し構成がシンプル
- 要件に合わせて柔軟に実装できる
- 自作する分の実装コストがかかる
- DBの性能がボトルネックになりやすく、ワークロードの大規模化に対応できない可能性がある

我々のプロダクトの要求としては、将来的な負荷の増加を見越してもせいぜい数百~数千jobs/s程度のスループットが出れば十分であり、PostgreSQLベースの方式でも十分に耐えられる規模だと考えられました。PostgreSQLはすでにシステムの中核として使用しているため、新たなインフラの追加や技術の学習コストも発生しません。将来的に規模が拡大した場合には改めて専門のサービスの導入を検討することとし、まずはシンプルなPostgreSQLベースの方式で始めることにしました。

以降のセクションでは、デキュー(ジョブの取得と実行)とエンキュー(イベントの検知とジョブの生成)それぞれのコンポーネントについて設計と実装を解説します。

デキュー: ジョブの取得と実行

ジョブの設計

まず、ジョブキューの基本的な設計について説明します。

ジョブのライフサイクル

ジョブは以下のステータスで管理されます。

  • pending: 処理待ち。Workerによるデキューを待っている状態
  • in_progress: 処理中。Workerがジョブを取得し、処理を実行している状態
  • completed: 完了。処理が正常に終了した状態
  • failed: 失敗。規定回数のリトライを超えても処理が成功しなかった状態

直列化キーによる順序保証

ジョブ間の直列化制御は、ジョブごとに設定する直列化キーで行います。同じ直列化キーを持つジョブは発生順に直列に実行されることが保証され、異なる直列化キーのジョブ同士は互いに独立しているため並列に実行されます。

直列化キーには「直列に実行したいジョブに共通する識別子」を設定します。今回のようにDDDに基づくEvent Sourcingの文脈では、通常は集約の識別子とハンドラの識別子の組が直列化キーの自然な単位になります。異なる集約のジョブ同士はもちろん、同一集約のイベントでも異なるハンドラで処理するジョブは独立して実行できるためです。別の文脈では、単に集約IDだけで十分なケースもあれば、より複雑なキーを使うケースもあるでしょう。ジョブキューの仕組みとしては、どのようなキーを使うかはジョブを生成する側に委ねられる形にしておくと柔軟です。

優先度による実行順の制御

ジョブには優先度を持たせることができ、異なる直列化キーを持つジョブ同士の実行順序を制御します。優先度が高いジョブは、発生時期が後であっても優先度の低いジョブを飛び越して先に実行されます。

この仕組みが必要になった背景として、我々のアプリケーションでは非同期処理が「ユーザーのリクエスト実行 → 画面の更新」のフローに組み込まれていることが挙げられます。リアルタイム性が重視されているため、ユーザーインタラクションを引き起こす処理はなるべく低レイテンシで実行したい一方で、外部システムへの通知や履歴の作成などインタラクションに関係ない処理は多少遅れても構いません。そのため、ジョブ単位で優先度を設定し、ユーザー体験に直結する処理を優先して処理できるようにしています。

リトライとクラッシュリカバリ

ジョブには可視時刻という属性があり、これが2つの役割を果たします。

クラッシュリカバリ: ジョブをデキューする際に、可視時刻を「現在時刻 + タイムアウト」に設定します。もしWorkerインスタンスが処理中にクラッシュした場合、ジョブは in_progress のまま残りますが、タイムアウト後に可視時刻を過ぎるため他のWorkerインスタンスが再びデキューできるようになります。

リトライ間隔の制御: 処理が失敗した場合、Worker側でジョブを pending に戻しつつ可視時刻を未来の時刻に更新することで、リトライを実現しています。規定回数リトライしても成功しなかったジョブはfailedとしてマークされ、後続のジョブをブロックしません。

これらを踏まえたjobテーブルの定義は以下のとおりです。

CREATE TABLE job (
    id                UUID PRIMARY KEY,
    args              JSONB NOT NULL,                       -- ジョブの引数(イベントの内容)
    status            VARCHAR(20) NOT NULL DEFAULT 'pending',
    priority          INTEGER NOT NULL DEFAULT 0,           -- 実行順の優先度(小さいほど優先)
    visible_at        TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
    retry_count       INTEGER NOT NULL DEFAULT 0,           -- 試行回数(初回実行時から1となる)
    serialization_key VARCHAR(64) NOT NULL,                 -- 直列化キー
    created_at        TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
);

デキュークエリ

次にデキュークエリの設計について説明します。「同じ直列化キーを持つジョブは直列に、異なる直列化キーのジョブは並列に」という要件を、1つのSQLクエリで実現しています。

デキュー処理を1つのクエリで完結させることにはいくつかのメリットがあります。まず、候補の選定からステータス更新までがアトミックに実行されるため、複数のWorkerインスタンスが同時に動作してもロック競合や二重デキューなどのレースコンディションを考慮する必要がなくなります。仮にこの処理を複数のクエリに分割すると、処理の間にジョブの状態が変化しないようロックを長時間保持したり、アプリケーション側に整合性を保つためのロジックを実装したりする必要があり、複雑さが増してしまいます。

WITH locked AS (
    -- Step 1: 処理可能なジョブの候補をバッチで取得
    SELECT id, serialization_key, priority, created_at
    FROM job
    WHERE status IN ('pending', 'in_progress')
      AND visible_at <= NOW()
    ORDER BY priority, created_at, id
    LIMIT :batch_size * :N
    FOR UPDATE SKIP LOCKED
),
first_per_key AS (
    -- Step 2: 各直列化キーごとに最古のジョブを1つ選択
    SELECT DISTINCT ON (serialization_key) id, serialization_key, priority, created_at
    FROM locked
    ORDER BY serialization_key, created_at, priority, id
),
blocked AS (
    -- Step 3: 現在実行中 or リトライ待ちの直列化キーを特定
    SELECT DISTINCT serialization_key
    FROM job
    WHERE (status = 'in_progress' AND visible_at > NOW())
       OR (status = 'pending' AND retry_count > 0 AND visible_at > NOW())
),
candidate AS (
    -- Step 4: ブロックされていないジョブのみを最終候補に
    SELECT fpk.id
    FROM first_per_key fpk
    WHERE NOT EXISTS (
        SELECT 1 FROM blocked
        WHERE blocked.serialization_key = fpk.serialization_key
    )
      -- 同じ直列化キーで、この候補より先に処理されるべきジョブがないことを確認
      AND NOT EXISTS (
        SELECT 1 FROM job prior
        WHERE prior.serialization_key = fpk.serialization_key
          AND prior.status IN ('pending', 'in_progress')
          AND (prior.priority, prior.created_at, prior.id)
              < (fpk.priority, fpk.created_at, fpk.id)
    )
    ORDER BY fpk.priority, fpk.created_at, fpk.id
    LIMIT :batch_size
)
-- Step 5: アトミックにステータスを更新して返却
UPDATE job
SET status      = 'in_progress',
    visible_at  = NOW() + :timeout,
    retry_count = retry_count + 1
WHERE id IN (SELECT id FROM candidate)
RETURNING *;

CTEの各ステップを順番に解説します。

Step 1: locked - 処理可能なジョブの候補をバッチで取得します。対象はpendingなジョブに加えて、visible_atを過ぎたin_progressなジョブも含めています。後者はクラッシュ等によってステータスがin_progressのまま残っているジョブを対象とするためで、これにより処理中のままロストしてしまったジョブをリカバリできます。

もう一つのポイントはFOR UPDATE SKIP LOCKEDです。通常のFOR UPDATEは対象行がロックされている場合にロックの解放を待ちますが、SKIP LOCKEDを付けるとロックされている行をスキップして次の候補を取得します。これにより、複数のWorkerインスタンスが同時にデキューしてもブロックし合うことなく、それぞれ別のジョブを取得できます。

なお、必要数のN倍を取得しているのは、後続のステップでフィルタリングされることを見越して十分な候補を確保するためです。

Step 2: first_per_key - DISTINCT ONを使って、lockedで取得した候補の中から、直列化キーごとに最も古いジョブを1つずつ選択します。これにより、同じ直列化キーを持つジョブが複数あっても、最古の1件だけがデキュー候補となり、残りは先行ジョブの完了を待つことになります。

Step 3: blocked - 現在すでに実行中(visible_atがまだ到来していないin_progress)、またはリトライ待ちのジョブが存在する直列化キーを特定します。これらのキーに属するジョブは、先行するジョブの完了を待つ必要があるためデキュー対象外とします。

Step 4: candidate - Step 2の候補からStep 3のブロック対象を除外し、実際にデキューするジョブを決定します。加えて「同じ直列化キーで、この候補より先に処理されるべきジョブが存在しないこと」も確認しています。これは、複数のWorkerが同時にクエリを実行したときに、同じ直列化キーを持つ別々の行をそれぞれの locked で取得してしまい、結果として同一キーのジョブが並行実行されてしまうのを防ぐためです。最後に優先度・作成時刻順に並べて LIMIT :batch_size で必要件数だけ取り出します。

Step 5: UPDATE ... RETURNING - 候補ジョブのステータスをin_progressに更新し、visible_atにタイムアウトを設定して結果を返却します。Step 1からここまでが1つのトランザクション内でアトミックに実行されます。

改善前の1件ずつ取得する重いクエリと比べて、改善後のデキュークエリはバッチで複数件を取得するシンプルな構造になっており、高負荷時でも安定したパフォーマンスが出るようになっています。

Workerの並行実行

Workerは定期的にデキュークエリを実行してバッチでジョブを取得し、各ジョブをgoroutineで並行に処理します。各goroutineの中ではジョブに紐づくハンドラを探して実行します。

func (w *Worker) processJobs(ctx context.Context, sem chan struct{}) error {
    // セマフォの空きスロット数に応じてデキューするバッチサイズを決定
    availableSlots := cap(sem) - len(sem)
    if availableSlots <= 0 {
        return nil
    }

    batchSize := min(availableSlots, w.config.DequeueBatch)
    jobs, _ := w.jobStore.DequeueJobs(ctx, batchSize, jobTimeout)

    for _, job := range jobs {
        sem <- struct{}{} // セマフォで並行数を制御
        go func(j *Job) {
            defer func() { <-sem }()
            w.processJob(ctx, j) // 対応するハンドラを探して実行
        }(job)
    }
    return nil
}

Workerの同時実行数はセマフォで制御し、空きスロットがある分だけをデキューのバッチサイズに指定しています。

また、デキュークエリがFOR UPDATE SKIP LOCKEDを使用しているため、複数のWorkerインスタンスが同時にデキューしてもロックが競合しません。これにより、方針3で述べたCPU負荷や可用性要件に応じてインスタンスを追加できる構造を実現しています。

エンキュー: イベントの検知とジョブの生成

次にエンキューの設計について説明します。

ポーリング方式の課題

Enqueuerの役割は、イベントストアに新しいドメインイベントが追加されたことを検知し、対応するジョブを生成することです。

新しいアーキテクチャのエンキュー部分は、当初は従来と同様にイベントストアをポーリングして未処理のイベントを取得する方式で実装しました。

ポーリング方式で難しいのは、どのイベントが未処理かを正確に判別するところです。SERIAL型の連番やタイムスタンプといった列の値でソートした順序は、トランザクションのコミット順と一致するとは限りません。ある位置までを処理済みとマークしても、まだコミットされていなかったトランザクションが後からコミットされると、その位置より前のイベントが取りこぼされてしまいます。そのため、単純な「処理位置」のカーソルだけでは正しく扱えず、どのイベントを処理済みとしたかを個別に記録して、それを参照しながら未処理分を拾い上げる仕組みが必要になります。

この構造は、冒頭の課題2で説明した問題を引き起こします。処理済みの記録はイベントを処理するたびに書き込みが発生し、その記録を参照する読み取りもポーリングのたびに発生するため、イベント量に比例して両方のコストが積み上がります。実際に、書き込みが集中する高負荷時にはクエリの性能が劣化していたことが観測されていました。テーブル構造の工夫によってある程度の改善は可能かもしれませんが、pull型で処理済みイベントを追跡するという構造自体を変えない限りは、イベント量に比例したコストが発生し続けます。

今回の改善ではイベント取得のバッチ化とジョブ実行のWorker分離によりポーリングクエリの実行頻度を大きく下げたため、課題2のような深刻な性能低下は大幅に改善されました。しかし、構造的な制約自体は残っており、より大きな改善のためにはアプローチ自体を見直す必要がありました。そこで、次に説明する方式に移行することにしました。

WALを利用したエンキュー

ここで着目したのは、PostgreSQLがトランザクションをコミットする際にWAL(Write-Ahead Log)に変更を記録しているという点です。WALには変更がコミット順にシリアルに記録されているため、WALを読み取れば未処理のイベントを読み飛ばしてしまう問題は原理的に発生しません。

PostgreSQLの論理レプリケーション機能を使うと、特定テーブルへの変更をWALからストリーミングで受信できます。この機能を利用して、イベントストアへのINSERTを検知しジョブを生成する方式に移行しました。全体像は下図のとおりで、受信側(Enqueuer)はWALメッセージを順にパースしてジョブテーブルにINSERTしていくだけの、シンプルな構造になります。

WALからジョブを生成するフロー

ストリームは、イベントストアへの書き込みが発生したトランザクション単位で、以下のようなメッセージの塊として送られてきます。

  • Begin: トランザクション開始の目印。コミット時のLSNなどが含まれる
  • Relation: 対象テーブルのスキーマ情報(列名・型など)。InsertMessage にはタプルの生データしか含まれないため、パース時にこの情報が必要
  • Insert: INSERT の内容(タプルデータ)
  • Commit: トランザクション終了の目印

これらのメッセージはBegin → (Relation) → Insert → ... → Commit の順で送られます。同一トランザクションで複数のイベントが発生した場合は、Insertメッセージが複数送られます。

実装

上記のフローを実現するための実装を説明します。まず、セットアップとして、PostgreSQL側でPublicationとReplication Slotを作成します。

-- イベントストアの変更を公開する
CREATE PUBLICATION enqueuer_publication FOR TABLE domain_event;

-- 論理レプリケーション用のスロット
SELECT pg_create_logical_replication_slot('enqueuer_slot', 'pgoutput');

アプリケーション側では、pglogreplを使用してWALの変更をストリーミングで受信します。pglogreplはPostgreSQLの論理レプリケーションプロトコルをGoから扱うためのライブラリです。

また、Enqueuerがどこまで処理したかを記録するためのenqueuer_stateテーブルも用意します。これはEnqueuer全体で1行だけ存在するシングルトンテーブルで、最後に処理したWAL上の位置(LSN: Log Sequence Number)を保持します。プロセスが再起動したときはこのlast_lsnを起点にレプリケーションを再開できます。

CREATE TABLE enqueuer_state (
    id       INTEGER NOT NULL DEFAULT 1,
    last_lsn BIGINT  NOT NULL DEFAULT 0,

    PRIMARY KEY (id),
    CHECK (id = 1)  -- シングルトン制約(常に1行だけ存在)
);

これらを踏まえた、WALメッセージを受信してジョブを生成する処理の概要は以下の擬似コードのとおりです(エラーハンドリングなどは省略しています)。

type Receiver struct {
    // 受信中のトランザクションに属するイベント(Begin〜Commitの間だけ保持)
    txBuffer           []Event       // このトランザクション内のINSERTイベント
    currentTxCommitLSN pglogrepl.LSN // このトランザクションがコミットされる位置

    // コミット済みでジョブ化待ちのイベント(一定量まとめてジョブテーブルにINSERTする)
    pending    []Event       // ジョブ化待ちのイベント
    pendingLSN pglogrepl.LSN // まとめてINSERTするときにチェックポイントとして保存するLSN

    // 永続化が完了しているLSN(Standby Statusでサーバーに報告する値)
    processedLSN pglogrepl.LSN

    // 対象テーブルのスキーマ情報(RelationMessageで送られてくる。InsertMessageのパースに必要)
    targetRel *pglogrepl.RelationMessageV2
}

func (r *Receiver) receive(ctx context.Context) error {
    // レプリケーションモードでPostgreSQLに接続
    conn, _ := pgconn.Connect(ctx, "replication=database")

    // enqueuer_stateテーブルから前回の処理位置(LSN)を取得してレプリケーションを再開
    r.processedLSN = loadLastProcessedLSN(ctx)
    pglogrepl.StartReplication(ctx, conn, slotName, r.processedLSN, options)

    lastFlush, lastStandby := time.Now(), time.Now()

    for {
        // イベントが来ないときも定期的にループに戻ってフラッシュ/Standby Status
        // の判定ができるよう、ReceiveMessageはデッドライン付きで呼び出す
        msgCtx, cancel := context.WithDeadline(ctx, time.Now().Add(receiveTimeout))
        rawMsg, err := conn.ReceiveMessage(msgCtx)
        cancel()

        if err == nil {
            // 論理レプリケーションのメッセージはCOPYプロトコルのフレーム(CopyData)で
            // 送られてくるので、中身を取り出してWALデータとしてパースする
            copyData := rawMsg.(*pgproto3.CopyData)
            xld, _ := pglogrepl.ParseXLogData(copyData.Data[1:])
            logicalMsg, _ := pglogrepl.ParseV2(xld.WALData, false)

            switch walMsg := logicalMsg.(type) {
            case *pglogrepl.RelationMessageV2:
                // 対象テーブルのスキーマ情報を保持しておく(後続のInsertをパースするのに使う)
                if isTargetRelation(walMsg) {
                    r.targetRel = walMsg
                }

            case *pglogrepl.BeginMessage:
                // トランザクション開始: バッファを初期化し、コミットLSNを控えておく
                r.txBuffer = nil
                r.currentTxCommitLSN = walMsg.FinalLSN

            case *pglogrepl.InsertMessageV2:
                // INSERT検知: 対象テーブルのものだけをバッファに蓄積
                if r.targetRel != nil && walMsg.RelationID == r.targetRel.RelationID {
                    r.txBuffer = append(r.txBuffer, parseInsert(r.targetRel, walMsg))
                }

            case *pglogrepl.CommitMessage:
                // コミット検知: バッファをフラッシュ対象に移し、LSNも引き継ぐ
                r.pending = append(r.pending, r.txBuffer...)
                r.pendingLSN = r.currentTxCommitLSN
            }
        }
        // タイムアウト時はメッセージ処理をスキップして下の判定へ進む

        // バッチサイズ超過 or 一定時間経過でまとめてエンキュー
        if len(r.pending) >= maxBatchSize || time.Since(lastFlush) > flushInterval {
            // ジョブのバルクINSERTとenqueuer_state.last_lsnの更新を同一トランザクションで実行
            storeJobsAndLSN(ctx, r.pending, r.pendingLSN)
            r.pending = nil
            r.processedLSN = r.pendingLSN // 永続化が完了した時点でprocessedLSNを進める
            lastFlush = time.Now()
        }

        // 一定間隔でStandby Statusを送信してスロットの進行状況を報告
        if time.Since(lastStandby) > standbyInterval {
            pglogrepl.SendStandbyStatusUpdate(ctx, conn,
                pglogrepl.StandbyStatusUpdate{WALWritePosition: r.processedLSN})
            lastStandby = time.Now()
        }
    }
}

// 受信したイベントをジョブに展開し、ジョブのバルクINSERTと
// enqueuer_state.last_lsnの更新を同一トランザクションで実行する
func storeJobsAndLSN(ctx context.Context, events []Event, lsn pglogrepl.LSN) error {
    // 各イベントに対して、処理可能なハンドラの数だけジョブを組み立てる
    jobs := []JobParam{}
    for _, evt := range events {
        for _, handler := range handlers {
            if !handler.CanHandle(evt) {
                continue
            }
            jobs = append(jobs, JobParam{
                ID:               newUUID(),
                Args:             marshalEvent(evt),
                Priority:         handler.Priority(),
                SerializationKey: hash(handler.SerializationKey(evt) + handler.ID()),
            })
        }
    }

    // ジョブのバルクINSERTとlast_lsn更新を同一トランザクションで実行
    return withTx(ctx, func(tx *Tx) error {
        bulkInsertJobs(tx, jobs)
        updateLSN(tx, lsn) // enqueuer_state.last_lsn を更新
        return nil
    })
}

このコードは、大きく以下の役割が1つのループの中で協調する形になっています。

  • 受信とパース: PostgreSQLから送られてくるWALメッセージをCopyDataから取り出し、ParseXLogDataParseV2の順で論理メッセージに変換します。InsertMessageにはタプルの生データしか含まれないため、先行して送られてくるRelationMessageでスキーマ情報を保持しておき、それを使ってparseInsertで列を取り出します。ReceiveMessageにデッドラインを設けているのは、メッセージが来ない間も定期的にループに戻り、下のフラッシュや Standby Status の判定ができるようにするためです。
  • トランザクション境界の追跡: BeginMessageInsertMessageV2CommitMessageの順序に沿って、コミット確定前のイベントをtxBufferに蓄積し、Commitが到着したタイミングでpendingに移動します。WALにはトランザクションがコミットされた順序で変更が記録されるため、この境界を追跡することでコミットされた変更のみをジョブに変換することが保証され、ポーリング方式で問題だった取りこぼしが原理的に発生しません。
  • バッチでのフラッシュ: pendingが一定件数または一定時間のどちらかの条件を満たしたら、storeJobsAndLSNでジョブのバルクINSERTとenqueuer_state.last_lsnの更新を同一トランザクションで実行します。高負荷時のオーバーヘッドを抑えつつ低負荷時のレイテンシも抑えるための仕組みです。フラッシュ成功時にprocessedLSNを進めておくことで、プロセスが再起動した際にその位置からレプリケーションを再開できます。
  • Standby Statusの送信: PostgreSQLサーバーに「どのLSNまで処理したか」を定期的に報告する必要があります。これを怠るとサーバー側はスロットが進んでいないとみなし続け、WALが蓄積してディスクを圧迫する恐れがあります。報告するLSNはフラッシュ済みのprocessedLSNとし、まだ永続化されていないpendingLSNを誤って報告しないように注意しています(そうすると、クラッシュ時にサーバーが先にWALを回収してデータを失う恐れがあります)。また、対象テーブルへの書き込みがないアイドル期間でも、サーバーから受信している最新のWAL位置までprocessedLSNを進めるようにする必要もあります(上の擬似コードでは省略していますが、他テーブルへの書き込みで進むWALに対してスロットが置いていかれないよう、lastServerWALEndを追跡してprocessedLSNに反映するロジックを入れています)。
  • 再開時の冪等性: Standby Statusで報告するLSNをフラッシュ済みに限っている関係上、クラッシュ後の再開時にはすでにジョブ化済みのメッセージを再受信する可能性があります。このようなケースでも重複ジョブが作られないよう、ジョブ側にユニーク制約を設けてON CONFLICT DO NOTHINGで冪等に処理する仕組みを入れています(擬似コードでは省略)。

コード中のEventはWALのINSERTをアプリケーション側で扱いやすい形に変換した型です。parseInsertRelationMessageの列定義とInsertMessageのタプルデータを突き合わせ、必要な列(IDやペイロードなど)を取り出してEventに詰め直す関数で、どの列を拾ってどんな型に詰めるかはアプリの都合で決められます。我々のシステムでは、ドメインイベントをジョブのソースとして使っているため、そのドメインイベントに相当する型をEventとして扱っています。

storeJobsAndLSNでは、受信したイベントを処理可能なハンドラの数だけジョブに展開したうえで、withTxの中でバルクINSERTとenqueuer_state.last_lsnの更新を同一トランザクションで実行しています。こうすることで、ジョブだけが書き込まれてLSNの更新に失敗するといった中途半端な状態が発生しません。どこかの書き込みで失敗した場合は両方ロールバックされるため、再起動時には最後に成功したlast_lsnから読み直せば取りこぼしなく再処理できます。

直列化キーは「ハンドラが指定する識別子 + ハンドラID」のハッシュで計算しており、ハンドラが集約IDを指定するようにすれば「集約 × ハンドラ単位」で直列化が効きます。

最後に、PostgreSQLの論理レプリケーションスロットは同時に1つのコネクションからしか消費できないという制約があるため、複数のEnqueuerインスタンスが並行してWALを受信することはできません。この制約を満たすために、PostgreSQLのpg_try_advisory_lockによるリーダー選出を行い、ロックを取得できたインスタンスだけがWALの受信とジョブの生成を担う構成にしています。一定間隔でロック取得を再試行するため、リーダーがクラッシュしても別のインスタンスが引き継げます。

この仕組みによって、エンキュー処理は「WALから順番に送られてくる変更をストリーミングで受信し、順次ジョブに変換する」というシンプルな構造になりました。処理済みテーブルを参照するポーリングクエリが不要になったため、短時間に大量のイベントが書き込まれる状況でも安定したレイテンシでジョブが生成されるようになりました。

結果

以上の変更により、下記のような点が改善されました。

並列実行によるスループット向上: 改善前はイベントを1件ずつ取得して、そのイベントに対応する複数のハンドラを並列に呼び出す形で処理していました。つまり異なるイベント同士はイベントをまたいで直列化されており、並列度は1イベントに紐づくハンドラの数が実質的な上限でした。改善後は直列化キー単位でのみ順序を守り、それ以外のジョブは並行して処理できるため、異なる集約に対するイベントを並列に処理できるようになりました。

Workerの水平スケーリング: デキュークエリでFOR UPDATE SKIP LOCKEDを使ってロックを取ることで、複数のWorkerインスタンスが同時にデキューしても互いに競合することなくジョブを取得できます。これにより、CPUが飽和した場合にもインスタンスを増やすだけで対応でき、インスタンス障害時やデプロイ時の可用性も同時に確保できるようになりました。

書き込み集中時のレイテンシ安定化: ポーリング方式ではイベント取得のクエリが書き込み集中時に重くなり、検知からジョブ生成までのレイテンシが不安定でした。論理レプリケーションはWALから変更をストリーミングで受信する仕組みで、書き込み量が増えてもクエリが重くなるような現象は発生しないため、書き込みが集中する状況でも安定したレイテンシでイベントをジョブに変換できるようになりました。

参考までに、本番環境で高負荷が発生した時間帯のログを分析した結果を以下に示します。改善前後どちらも、ピーク時におよそ300 jobs/s前後のジョブ生成が発生する高負荷の時間帯を計測対象としています。なお、スループットやレイテンシの絶対値は処理内容やDB・Workerインスタンスのスペックによって大きく変わるため、数値そのものにはあまり意味がありませんが、あくまで同じ環境での改善前後の差を比較するための参考値として捉えてください。

指標 改善前 改善後
ジョブ処理レイテンシ p50 60 s 139 ms
ジョブ処理レイテンシ p99 360 s 375 ms
ピークスループット(1s) 69 jobs/s 388 jobs/s

改善前はスループットがジョブの生成ペースに追いついていないことから、書き込みが集中するタイミングでは処理待ちが大量に発生し、平常時は数百msで完了する処理がp50で約1分、p99で6分近くまで悪化する状況でした。改善後は数百倍オーダーでレイテンシが改善し、p99でも数百ms程度に収まっています。結果として、改善前に実業務で発生していた「書き込みが集中するタイミングで非同期処理が大きく遅延する」という課題は解消できました。

また、スループットだけでなくキューの待ち時間も体感レイテンシを大きく左右します。改善後のジョブ全体レイテンシを区間別に見ると、以下のようになっています。

区間 p50 p99
エンキュー遅延(イベント発生→ジョブ化) 57 ms 184 ms
キュー待ち時間(ジョブ化→Worker着手) 30 ms 195 ms
実処理時間(Worker着手→完了) 39 ms 120 ms

いずれの区間も数十ms~数百ms程度に収まっていますが、特に注目したいのはエンキュー遅延が書き込み集中時でもp99で200ms弱に安定している点です。改善前のポーリング方式では書き込み量に連動してエンキュークエリが重くなりレイテンシが膨らんでいましたが、WAL受信方式に切り替えたことで負荷の上下に関わらず安定したレイテンシで処理できるようになっています。

この方式は処理をPostgreSQLに集約しているため、本当に大規模なワークロードを扱うシステムではいずれDBがボトルネックになり、専門のメッセージングサービスなどを利用しなければ負荷を捌くのは難しくなるでしょう。ただ、今回の計測時点ではDBやWorkerインスタンスの性能は飽和しておらず、DBのスケールアップやWorkerの水平スケールの余地もまだ残っているため、我々のシステムに関して言えば当面はこの方式で問題なく運用できると見込んでいます。

まとめ

本記事では、PostgreSQLの機能を活用してイベント駆動な非同期処理基盤を改善した事例を紹介しました。

設計のポイントは、ジョブに直列化キーを持たせて順序保証の粒度を柔軟に調整できるようにしたことと、イベントの取得をポーリングから論理レプリケーションに切り替えたことでした。前者で順序保証と並列実行を両立し、後者で書き込み負荷に起因するクエリのボトルネックを取り除きました。

PostgreSQLはデータの永続化先として扱われることが多いですが、こうした機能を活用すれば非同期処理の中核まで任せられます。インフラを増やして構成を複雑にする前に、手元のPostgreSQLでどこまでできるかを検討してみるのも良いかもしれません。


enechainでは、日本のエネルギーの未来を一緒に作っていく仲間を募集しています。興味を持っていただけた方は下記のリンクからぜひご応募ください。

herp.careers https://herp.careers/v1/enechain/HAX-kGNHA1dVherp.careers