電力取引プラットフォームのリアルタイム通知を支えるアーキテクチャ

ogp

はじめに

enechainでエンジニアをしているryuです。

enechainでは、JEPX(日本卸電力取引所)のスポット市場・時間前市場向けの発注端末であるeSquare Live for JEPXを開発しています。このプラットフォームでは、入札結果や約定結果をリアルタイムにユーザーへ届ける必要があり、PostgreSQL LISTEN/NOTIFYとgRPC Server Streamingを組み合わせてリアルタイム通知を実現しました。

本記事では、この仕組みの全体像と各要素の設計・実装についてご紹介します。

背景

日本の電力取引所であるJEPX(日本卸電力取引所)では、スポット市場・時間前市場など主に短期の電力卸取引を担っています。

スポット市場(一日前市場) は、翌日に受け渡す電力を前日に取引する市場です。1日を30分刻みの48コマに分け、各コマに対して入札します。午前10時に入札が締め切られ、ブラインド・シングルプライスオークション方式で約定価格が決定されます。

時間前市場(当日市場) は、スポット市場の取引後に生じた需給のミスマッチを調整するための市場です。スポット市場とは異なり、ザラ場方式で取引されます。前日17時から受渡の1時間前まで取引可能で、条件が合えばリアルタイムに約定が成立します。

特に時間前市場ではザラ場方式により随時約定が発生するため、結果を即座にユーザーに通知することが重要です。

全体アーキテクチャ

リアルタイム通知の全体像は以下の通りです。

全体アーキテクチャ

JEPXから入札結果や約定結果を受け取ると、それらをDBに保存し、あわせて通知データもDBに保存します。この通知データの保存をきっかけに、PostgreSQLのLISTEN/NOTIFY機構によってWorkerがリアルタイムに検知します。

Workerは受け取った通知をObserverに渡し、Observerが接続中の全クライアントに同時配信します。配信にはgRPC Server Streamingを使用し、BFF層でConnect RPCに変換してフロントエンドに届けています。フロントエンドでは通知を受け取ると、入札結果や約定結果を再取得して画面に反映します。

以降のセクションでは、この流れを構成する各要素について詳しく見ていきます。

PostgreSQL LISTEN/NOTIFY

PostgreSQL LISTEN/NOTIFY

通知の起点にはPostgreSQLのLISTEN/NOTIFYを使用しています。LISTEN/NOTIFYはPostgreSQLに組み込まれたPub/Sub機構で、ポーリングと違いデータ変更をリアルタイムに受け取れます。

通知が発生するのは、JEPXから入札結果や約定結果などを受け取ったタイミングです。受け取った情報を通知用のnotificationsテーブルにINSERTすると、PostgreSQLのトリガーでpg_notifyが自動実行される設計にしています。

-- notificationsテーブルへのINSERT時に自動実行される関数
CREATE OR REPLACE FUNCTION notify_on_notification_insert()
RETURNS trigger AS $$
BEGIN
    -- 行のchannelカラムに指定されたチャネルに、dataカラムの内容を通知
    PERFORM pg_notify(NEW.channel::text, NEW.data::text);
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- INSERTごとに上記の関数を実行するトリガー
CREATE TRIGGER notifications_insert_notify
AFTER INSERT ON notifications
FOR EACH ROW
EXECUTE FUNCTION notify_on_notification_insert();

アプリケーションコードから直接pg_notifyを呼ぶのではなく、INSERT + トリガーを採用しました。これには以下のメリットがあります。

  • 通知データがDBに永続化される: 通知の履歴がテーブルに残るため、障害調査やデバッグ時に追跡できる
  • トランザクションと連動する: INSERTがロールバックされれば通知も送られないため、データと通知の不整合が起きない
  • どこからINSERTしても自動的に通知される: アプリケーション側でPublishを呼び忘れるリスクがない

リアルタイム通知の仕組みとしてはGoogle Cloud Pub/Subも選択肢にありました。Pub/Subはマネージドサービスとして高い可用性と再試行の仕組みを備えており、PostgreSQLの障害に依存しない独立した配信基盤を構築できます。一方で、DB書き込みとPublishが別の操作になるため、両者の一貫性を保つ仕組みを別途設計する必要があります。また、再試行によって通知のタイミングがDB書き込みから大きくずれると、リアルタイム性が損なわれるという懸念もあります。

LISTEN/NOTIFYにはメッセージの再送機能がなく、PostgreSQLの障害時には通知も停止するという制約があります。ただし、PostgreSQLが停止している状況では入札結果や約定結果の保存自体ができないため、通知が停止すること自体は問題になりません。また、本システムではクライアントが画面を開いている間だけ通知を受け取る設計であり、最新のデータはAPIから取得できるため、通知の取りこぼし自体は許容できます。むしろ、再試行によって実際のDB書き込みから遅れて届く通知は、リアルタイム通知としてはかえって混乱を招きます。今回はDB書き込みが通知の起点であり、データと通知の一貫性およびリアルタイム性を重視したため、LISTEN/NOTIFYを採用しました。

Observerパターン

Observerパターン

次に、LISTEN/NOTIFYで受け取った通知を複数のクライアントに同時配信する仕組みを見ていきます。

gRPCハンドラが直接LISTENする設計にすると、クライアントごとにDB接続が必要です。DB接続数には上限があるため、クライアントが増えると接続数の上限に達するリスクがあります。そこで、Workerが1つのDB接続でLISTENし、受け取った通知を複数のクライアントに配信する構成にしています。この配信はGoアプリケーション内部でインメモリに行われ、1対多の配信にObserverパターンを採用しています。

Observerパターンは、あるオブジェクト(Subject)の状態変化を、登録された複数のObserverに自動的に通知するデザインパターンです。Subjectは「誰が購読しているか」を管理し、状態が変化すると全Observerに通知します。Observerは購読と解除を自由に行えるため、クライアントの接続・切断に柔軟に対応できます。

Observerパターン概念図

今回の構成では、Subjectが通知の配信元として全Observerを管理し、WorkerからSubjectに通知が届くと全Observerに配信します。ObserverはクライアントのgRPCストリームごとに1つ作成され、通知を受けるとgRPCストリームを通じてクライアントに送信します。

Workerから直接gRPCストリームに通知を送ることも可能ですが、Workerが各クライアントの接続管理を担うことになり、責務が肥大化します。Observerパターンを挟むことで、Workerは「通知を受け取ってSubjectに渡す」だけに専念でき、クライアントの接続・切断の管理はSubject/Observerに分離できます。クライアントが増減してもWorker側の変更は不要です。

Workerによる通知の受け取り

PostgreSQLのLISTENで受け取った通知は、WorkerがGo Channelを経由してObserverに渡します。

func (w *worker) Run(ctx context.Context) error {
    ch := make(chan *Notification)

    go func() {
        for {
            select {
            case data := <-ch:
                w.subject.Notify(ctx, data)
            case <-ctx.Done():
                return
            }
        }
    }()

    if err := w.listener.Listen(ctx, ch); err != nil {
        return err
    }
    return nil
}

listener.ListenがPostgreSQLのLISTENで通知を待ち受け、受信した通知をGo Channelに送ります。Workerはchannelから通知を取り出し、subject.NotifyでObserverに配信します。

gRPCストリームとの繋ぎ方

gRPCのストリームハンドラでは、Observerを作成してSubjectに登録します。NewObserverの引数には通知を受けた時の処理を関数で渡すことができます。ここではstream.Sendを呼ぶことで、通知がそのままgRPCストリームに流れる仕組みとなっています。

func (s *server) NotifyBid(req *NotifyBidRequest, stream NotifyBidStream) error {
    errChan := make(chan error)

    obs := observer.NewObserver(func(n *Notification) {
        if err := stream.Send(n); err != nil {
            errChan <- err
        }
    })

    s.subject.Subscribe(obs)
    defer s.subject.Unsubscribe(obs)

    select {
    case err := <-errChan:
        return err
    case <-stream.Context().Done():
        return stream.Context().Err()
    }
}

クライアントが接続するとSubscribeでObserverが登録され、切断するとdefer Unsubscribeで自動的に解除されます。Subjectに通知が届くと、全Observerにgoroutineで並行配信され、各gRPCストリームに通知が送られます。実際のコードでは組織IDによるフィルタリングも行い、自分の組織に関係のある通知だけが配信されるようにしています。

gRPC Streaming → Connect RPC

gRPC Streaming → Connect RPC

最後に、gRPC Server Streamingの通知をフロントエンドに届ける部分を見ていきます。gRPCはHTTP/2上でProtocol Buffersを使って通信するRPCフレームワークです。gRPCが提供する通信方式の中でも、リアルタイム通知に最適なのがServer Streamingです。クライアントが1度リクエストを送ると、サーバーがストリームを通じて複数のメッセージを順次送信します。

API通信でgRPCを使用しているため、リアルタイム通知もgRPCのServer Streamingで統一しました。WebSocketやServer-Sent Events (SSE)を別途導入する場合と比較して、既存のgRPCサーバーにメソッドを追加するだけで済み、別のプロトコルのエンドポイントを管理する必要がありません。また、Protocol Buffersによる型安全な通信をそのまま活かせます。

ただし、ブラウザからgRPCを直接利用できないため、BFF(Backend for Frontend)層でConnect RPCに変換しています。BFFは複数のバックエンドAPIの集約やデータ整形を担い、フロントエンドの実装を単純に保つために導入しています。プロトコル変換もこの層で行っています。Connect RPCはgRPC互換のHTTPベースのプロトコルで、ブラウザから直接利用できます。BFFはバックエンドのgRPC Server Streamingに接続し、受信した通知をConnect RPCのストリームとしてフロントエンドに転送します。

func (h *handler) NotifyBid(ctx context.Context, req *connect.Request[NotifyBidRequest], stream *connect.ServerStream[NotifyBidResponse]) error {
    // バックエンドのgRPC Server Streamingに接続
    grpcStream, err := h.client.NotifyBid(ctx, &NotifyBidRequest{
        OrganizationId: organizationID,
    })
    if err != nil {
        return err
    }

    // gRPCから受信した通知をConnect RPCに変換して送信
    for {
        resp, err := grpcStream.Recv()
        if err != nil {
            return err
        }
        if err := stream.Send(toConnectResponse(resp)); err != nil {
            return err
        }
    }
}

BFFはバックエンドのgRPC Server Streamingに接続し、Recvで通知を受信するたびにtoConnectResponseでConnect RPCのレスポンスに変換してSendで送信します。このforループによって、バックエンドからの通知がConnect RPCのストリームとしてフロントエンドに継続的に転送されます。

フロントエンドではConnect RPCのストリームを受信し、通知に応じて入札一覧などのデータを再取得します。ブラウザが一時的に切断された場合、ストリームは自動的に再確立されますが、切断中の通知は失われます。ただし、次の通知が届いた時点でデータは最新の状態に更新されます。

まとめ

本記事では、eSquare Live for JEPXにおけるリアルタイム通知の仕組みを紹介しました。

  • PostgreSQL LISTEN/NOTIFY: notificationsテーブルへのINSERTをトリガーにして通知を発行。トランザクション内で完結するため、データと通知の一貫性が保たれる
  • Observerパターン: Workerが1つのDB接続でLISTENし、Observerを通じて複数クライアントに同時配信
  • gRPC Server Streaming → Connect RPC: BFF層でプロトコルを変換し、フロントエンドに通知を届ける

これらを組み合わせることで、PostgreSQLからフロントエンドまで一貫したリアルタイム通知を実現しました。

この記事がリアルタイム通知の実装を検討している方の参考になれば幸いです。


enechainでは、事業拡大のために共に挑戦する仲間を募集しています。興味がある方はぜひお声がけください!