Go言語でつくるFIXサーバー:QuickFIX/Go入門

ogp

この記事はenechain Advent Calendar 2024の8日目の記事です。

こんにちは!enechainでソフトウェアエンジニアをしている山添です。 現在は今年10月にリリースした「eSquare Live」のFIX APIを提供するため、FIXサーバー1の開発を担当しています。

この記事では、FIXサーバーの開発においてQuickFIX/Goを使っていて得た知見を共有します。

FIX ProtocolとQuickFIX/Goについて

FIX Protocol2は、金融情報を電子的にやり取りすることを目的に開発された仕様です。詳細な説明は長くなるため、公式ドキュメント3などを参照してください。また、弊社のアドベントカレンダーにもFIXに関する記事がありますので、ぜひご覧ください。 QuickFIX/Goを理解するための重要なポイントを以下にまとめます。

  • FIXはTCP/IP上で動作し、メッセージの仕様や送受信の流れを定義している。
  • クライアントとサーバーはコネクションを維持し、双方向でメッセージをやり取りする。
  • メッセージの再送処理が規定されており、コネクションが切れた場合にはメッセージの欠落がないことをチェックする。

FIXは金融取引向けに開発されたプロトコルであり、金融取引で一般的なメッセージ形式がプロトコルで標準化されていたり、クライアントサーバー間の接続を維持してリアルタイムの双方向通信をするのが特徴です。

enechainでは、電力卸取引のオンライン取引マーケットプレイス「eSquare Live」を提供しています4。このプラットフォームでは、発電事業者や電力小売業者、トレーダーが現物から先物まで、さまざまな商品をリアルタイムで取引できます。電力取引の分野でも、株取引と同様にFIXを利用して取引をする組織が存在するため、eSquare LiveはFIX APIを提供することで、より多くのユーザーに価値を届けることを目指しています。

次にQuickFIXについてですが、QuickFIXはFIXを実装するためのオープンソースソフトウェア(OSS)です。 FIXを通じた通信の実装が主要なプログラミング言語ごとに提供されており、その中のGo言語の実装がQuickFIX/Goです。 QuickFIXのようなエンジンを利用することで、後に説明するFIX仕様の多くを補ってくれるため、開発者はビジネスロジックの実装に専念できます。これは、Webフレームワークを使う際にエンドポイントのロジックの実装に集中できるという状況に似ています。

弊社ではサーバーサイドのアプリケーションを主にGoで実装しており、Goに関する知識やスキルを持つ開発者が多いため、QuickFIX/Goを用いてFIXサーバーの開発を進めています。 他にもFIXエンジンの選択肢はありますが、技術選定についてはこの記事の範囲外です。

FIX Protocol の基礎知識

この節では、本記事を読み進める上で必要となるFIXの基礎知識をいくつか紹介します。FIXの全ては解説できないため、公式ドキュメント[^3]なども併せて参照してください。

メッセージとタグ

FIXでは、取引関連の情報を交換するためのデータを「電文(メッセージ)」と呼びます。代表的なメッセージには、ログイン処理のためのLogonメッセージや、注文処理のためのNewOrderSingleメッセージがあります。 メッセージはタグと値の組み合わせで構成されており、この表現方法は「tagvalue」と呼ばれています。この形式では、各データ項目が「タグ=値」のペアで表現され、それらがセパレータ(通常はASCIIコード0x01)で区切られています。タグは数値で指定され、対応する値は具体的なデータを示します。例えば、タグ8はFIXバージョン、タグ35はメッセージタイプを表します。メッセージ全体はヘッダー、ボディ、トレーラーの3つの部分で構成され、ヘッダーは転送情報、ボディは取引の詳細、トレーラーはチェックサムなどのセキュリティ情報を含みます。 FIXメッセージでよく使用される主要なタグの例を以下に示します。

タグ番号 フィールド名 説明
8 BeginString FIX バージョン
35 MsgType メッセージの種類
34 MsgSeqNum メッセージのシーケンス番号
49 SenderCompID メッセージ送信者
56 TargetCompID メッセージ受信者
55 Symbol 銘柄コードやティッカーシンボル
54 Side 売買の方向

これらのタグを組み合わせて構成されるNewOrderSingleの例を以下に示します。読みやすさの観点でセパレータをスペースで表現しています。

8=FIX.4.4 9=176 35=D 49=TW 56=ISLD 34=2 52=20241128-12:30:00.000 11=123456 21=1 55=MSFT 54=1 38=100 40=2 44=150.25 59=0 10=128

このメッセージをtagvalue形式で分解すると、以下のように表現されます。

タグ番号 フィールド名 説明
8 BeginString FIX.4.4 FIX バージョン
9 BodyLength 176 メッセージボディの長さ(バイト数)
35 MsgType D メッセージタイプ
49 SenderCompID TW 接続元組織ID
56 TargetCompID ISLD 接続先組織ID
34 MsgSeqNum 2 メッセージのシーケンス番号
52 SendingTime 20241128-12:30:00.000 メッセージの送信時刻
11 ClOrdID 123456 クライアント側の注文ID
55 Symbol MSFT 銘柄コード
54 Side 1 買い
38 OrderQty 100 100株
40 OrdType 2 指値注文
44 Price 150.25 150.25ドル
59 TimeInForce 0 注文の有効期間:当日
10 CheckSum 128 メッセージのチェックサム

このようにFIXでは、メッセージタイプやメッセージに含まれるべきタグが仕様として標準化されています。これは、金融分野という特定の領域で使用されることを前提としたプロトコルの特性です。標準的なメッセージスキーマは、公式ドキュメントやFIXimate5で確認できます。 また、後の章で詳しく説明しますが、QuickFIX/Goでは標準化されたメッセージ構造に従いながら、アプリケーション特有のカスタムフィールドの追加も可能です。

セッションの概要

FIXには「セッション」という概念が存在します。セッションは、メッセージの送受信を管理し、データの完全性と順序性を維持する役割を果たします。具体的には、2つのシステム間で連続する番号(シーケンス番号)を用いて双方向のメッセージ通信をします。

セッションは以下のライフサイクルを持ちます。

  • セッションの確立: セッションは、クライアントがサーバーに対してLogonメッセージ(MsgType=A)を送信することで開始される。このメッセージには、認証情報やハートビート間隔などのセッションパラメータが含まれる。サーバーがLogonメッセージを受信・承認すると、セッションが確立され、以降のメッセージ交換が可能となる。
  • セッションの維持: セッション中、双方は定期的にハートビートメッセージ(MsgType=0)を交換し、接続の維持と監視をする。また、各メッセージにはシーケンス番号が付与され、メッセージの順序性と完全性を保証する。シーケンス番号の不整合やメッセージの欠落が検出された場合、Resend Requestメッセージ(MsgType=2)を使用して再送を要求し、データの整合性を保つ。
  • セッションの終了: セッションを終了する際には、Logoutメッセージ(MsgType=5)が送信される。これにより、双方がセッションの終了を認識し、適切に接続を切断する。

セッション内で送受信される各メッセージには連続したシーケンス番号が付与され、これにより送受信の順序が管理されます。シーケンス番号は、メッセージの順序性と完全性を確保するために不可欠な要素です。通信中にメッセージが欠落したり順序が乱れたりした場合、受信側はシーケンス番号のギャップを検出し、適切な対処をします。

セッションの再接続時には、シーケンス番号の同期が重要です。再接続時に双方のシーケンス番号が一致しない場合、Resend Requestメッセージを使用して欠落したメッセージの再送を要求します。これにより、通信の整合性が維持され、取引情報の正確性が確保されます。Resend機能は、ネットワーク障害やシステムエラーによるメッセージの損失を補完し、信頼性の高い通信を実現するための重要な仕組みです。

このように、FIXではセッションという概念を用いて、クライアントとサーバーが安定した接続を保つための仕組みを担保しています。 QuickFIX/Goを使用する場合、セッションに関する処理はエンジン内部に組み込まれており、開発者はこれを直接実装する必要がありません。エンジンは、セッションの状態管理、シーケンス番号の管理、ハートビートの送信など、セッションのライフサイクルに関わる処理をします。これにより、開発者はビジネスロジックの開発に集中できます。

Exampleで学ぶQuickFIX/Go

この節ではQuickFIX/Goの基本的な使い方を紹介します。QuickFIX/GoにはExampleをまとめたリポジトリがあり、そこで基本的なクライアントとサーバーの実装方法を知ることができます。実際にExampleを動かす際は手元にcloneしておいてください。

Exampleの起動方法

まずはサーバーを起動します。サーバーの実装にはexecutorとordermatchがありますが、今回はよりシンプルなexecutorを起動します。executorは出された注文に対しての約定データを返却するサーバーです。標準出力でセッションが生成されたことを確認できます。

> go run ./qf.go executor

==== Event: ====
 |Time:         2024-11-29 13:21:43.824946 +0000 UTC
 |Session:      FIXT.1.1:ISLD->TW
 |Content:      Created session

セッション名(上記の例だとFIXT.1.1:ISLD->TW)はFIXバージョンと接続組織IDから構成されています。後ほど設定ファイルについては説明しますが、Exmpleのデフォルトでは、複数のFIXバージョンで接続元組織ISLDと接続先組織TWのセッションが作成されます。

次にサーバーを起動した状態で、クライアントを起動してみます。このクライアントは、サーバーにNewOrderSingleやMarketDataRequestを送信するためのクライアントです。

> go run ./qf.go tradeclient

1) Enter Order
2) Cancel Order
3) Request Market Test
4) Quit
Action:

クライアント起動時のサーバー側のログを見てみると、Logonリクエストを受信できていることが分かります。

<=== Incoming FIX Msg: <===
 |Time:         2024-12-01 03:25:45.003984 +0000 UTC
 |Session:      FIXT.1.1:ISLD->TW
 |Content:      8=FIXT.1.19=7435=A34=149=TW52=20241201-03:25:45.00256=ISLD98=0108=30141=Y1137=710=133

Contentはセパレータが省略されて出力されるので少し見にくいですが、35=Aという部分がLogonメッセージ(MsgType=A)であることを表しています。

これでクライアントとサーバーが起動したのでクライアントからサーバーにリクエストを送信してみます。

1) Enter Order
2) Cancel Order
3) Request Market Test
4) Quit
Action: 1 <- NewOrderSingleを送信するAction

1) FIX.4.0
2) FIX.4.1
3) FIX.4.2
4) FIX.4.3
5) FIX.4.4
6) FIXT.1.1 (FIX.5.0)
BeginString: 6 <- FIXバージョンを指定
ClOrdID: 1 <- クライアントが指定するID、適当な値を入力する
1) Buy
2) Sell
3) Sell Short
4) Sell Short Exempt
5) Cross
6) Cross Short
7) Cross Short Exempt
Side: 1 <- 買いや売りなどを指定する
1) Market
2) Limit
3) Stop
4) Stop Limit
OrdType: 2 <- 2は指値で注文を入れることを意味する
Symbol: PRODUCT_SYMBOL <- 株などのシンボルを意味する、適当な値を入力する
OrderQty: 5 <- 注文数量
1) Day
2) IOC
3) OPG
4) GTC
5) GTX
TimeInForce: 1 <- 注文が1日で有効期限になるように設定する
Price: 990 <- 注文価格
SenderCompID: TW <- 接続元組織ID(ExampleではTWで固定)
TargetCompID: ISLD <- 接続先組織ID(ExampleではISLDで固定)

Use a TargetSubID?: <- SubID(何も入力せずにEnterを入力する)
Sending: 8=FIXT.1.19=13635=D34=1349=TW52=20241201-03:31:18.44356=ISLD11=121=138=5.0040=244=990.0054=155=PRODUCT_SYMBOL59=060=20241201-03:30:16.59210=160

サーバー側のログを見ると、セッションからNewOrderSingle(MsgType=D)を受け取れていることが分かります。同時に、クライアントへのレスポンスとして、ExecutionReport(MsgType=8)を送信できていることも確認できます。

<=== Incoming FIX Msg: <===
 |Time:         2024-12-01 03:31:18.444525 +0000 UTC
 |Session:      FIXT.1.1:ISLD->TW
 |Content:      8=FIXT.1.19=13635=D34=1349=TW52=20241201-03:31:18.44356=ISLD11=121=138=5.0040=244=990.0054=155=PRODUCT_SYMBOL59=060=20241201-03:30:16.59210=160
===> Outgoing FIX Msg: ===>
 |Time:         2024-12-01 03:31:18.44568 +0000 UTC
 |Session:      FIXT.1.1:ISLD->TW
 |Content:      8=FIXT.1.19=15135=834=1349=ISLD52=20241201-03:31:18.44556=TW6=990.0011=114=5.0017=131=990.0032=5.0037=138=5.0039=254=155=PRODUCT_SYMBOL150=2151=0.0...

以上が、Exampleを使ってサーバーとクライアントで通信をする例でした。以降の節では、サーバーとクライアントの起動からメッセージ処理まででどのような処理がされているのかを追ってみます。

設定ファイル

QuickFIX/Goでは、設定ファイルを使用して各種パラメータを定義できます。この設定ファイルには、セッションの動作や接続情報を詳細に制御するための設定が含まれます。 この設定ファイルは、プレーンテキスト形式で記述され、各パラメータは「キー=値」の形式で指定されます。セクションは角括弧で囲まれ、セッションごとに異なる設定を指定できます。

以下は、Exampleから抜粋したQuickFIX/Goの設定ファイルのサンプルです。

[DEFAULT]
SocketAcceptPort=5001
SenderCompID=ISLD
TargetCompID=TW
ResetOnLogon=Y
FileLogPath=tmp

[SESSION]
# セッション固有の設定
BeginString=FIXT.1.1
DefaultApplVerID=7

この設定ファイルは以下のように読み込むことができます。

import (
    "github.com/quickfixgo/quickfix"
)

func main() {
    cfg, err := os.Open(cfgFileName)
    if err != nil {
        return fmt.Errorf("error opening %v, %v", cfgFileName, err)
    }
    defer cfg.Close()
    stringData, readErr := io.ReadAll(cfg)
    if readErr != nil {
        return fmt.Errorf("error reading cfg: %s,", readErr)
    }

    appSettings, err := quickfix.ParseSettings(bytes.NewReader(stringData))
    if err != nil {
        return fmt.Errorf("error reading cfg: %s,", err)
    }
}

サンプルのパラメータの説明を下記テーブルにまとめました。他のパラメータについては、QuickFIX/Goの公式ドキュメントに詳しく記載されています。

設定項目 説明
SocketAcceptPort コネクションをリッスンするためのソケットポート
SenderCompID 接続元組織ID
TargetCompID 接続先組織ID
ResetOnLogon ログオン要求を受け取ったときにシーケンス番号をリセットするかどうか
FileLogPath ログを保存するディレクトリ
BeginString セッションが使う FIX バージョン
DefaultApplVerID セッションのアプリケーションバージョンID

設定ファイルを使わずにAcceptorの設定をする方法もあります。例えば下記のコードで上記の設定ファイルと同じ設定を適用できます。この方法であれば、環境変数などから読み込んだ値をAcceptorに渡すこともできます。

サンプルコード

import (
    "github.com/quickfixgo/quickfix"
    "github.com/quickfixgo/quickfix/config"
)

func main() {
    appSetting := quickfix.NewSettings()
    appSetting.GlobalSettings().Set(fixconfig.SocketAcceptPort, "5001")
    sessionSetting.Set(config.SenderCompID, "ISLD")
    sessionSetting.Set(config.TargetCompID, "TW")
    appSetting.GlobalSettings().Set(config.ResetOnLogon, "Y")
    appSetting.GlobalSettings().Set(config.FileLogPath, "tmp")

    sessionSetting := quickfix.NewSessionSettings()
    sessionSetting.Set(config.BeginString, quickfix.BeginStringFIXT11)
    sessionSetting.Set(config.DefaultApplVerID, quickfix.ApplVerIDFIX50)
    if _, err := appSetting.AddSession(sessionSetting); err != nil {
        return err
    }
}

メッセージのルーティングとロジックの実装

QuickFIX/Goでは、quickfix.Applicationインターフェースを実装することで、メッセージの受信と送信のロジックを実装できます。例えばFromAppメソッドでは、サーバーがクライアントからリクエストを受け取った際の挙動を実装できます。

import (
    "github.com/quickfixgo/quickfix"
    fix50nos "github.com/quickfixgo/fix50/newordersingle"
)

type executor struct {
    *quickfix.MessageRouter
}

func newExecutor() *executor {
    e := &executor{MessageRouter: quickfix.NewMessageRouter()}
    e.AddRoute(fix50nos.Route(e.OnFIX50NewOrderSingle))

    return e
}

// quickfix.Application interface
func (e executor) OnCreate(sessionID quickfix.SessionID)                           {}
func (e executor) OnLogon(sessionID quickfix.SessionID)                            {}
func (e executor) OnLogout(sessionID quickfix.SessionID)                           {}
func (e executor) ToAdmin(msg *quickfix.Message, sessionID quickfix.SessionID)     {}
func (e executor) ToApp(msg *quickfix.Message, sessionID quickfix.SessionID) error { return nil }
func (e executor) FromAdmin(msg *quickfix.Message, sessionID quickfix.SessionID) quickfix.MessageRejectError {
    return nil
}

// Use Message Cracker on Incoming Application Messages
func (e *executor) FromApp(msg *quickfix.Message, sessionID quickfix.SessionID) (reject quickfix.MessageRejectError) {
    return e.Route(msg, sessionID)
}

quickfix.MessageRouterを使うことで、メッセージタイプごとにメッセージをルーティングできます。Exampleでも使われていますが、QuickfixのXMLスキーマを元に生成されたパッケージ(上の例ではgithub.com/quickfixgo/fix50)を使うことで簡単にルーターにルートを追加できます。

メッセージの送信

セッションにメッセージを送信する際には、適切なメッセージオブジェクトを作成し、ターゲットのセッションを引数にquickfix.SendToTargetをコールします。 まず、送信したいメッセージの種類に応じて、対応するメッセージオブジェクトを作成します。例えば、新規注文作成のタイミングでサーバーはクライアントにExecutionReport(MsgType=8)を送信する必要がありますが、ExecutionReportのメッセージオブジェクトは下記のように生成して送信できます。

import (
    "github.com/quickfixgo/quickfix"
    fix50er "github.com/quickfixgo/fix50/executionreport"
)

func (e *executor) OnFIX50NewOrderSingle(msg fix50nos.NewOrderSingle, sessionID quickfix.SessionID) (err quickfix.MessageRejectError) {
    // ...

    clOrdID, err := msg.GetClOrdID()
    if err != nil {
        return
    }

    execReport := fix50er.New(
        // ExecutionReportの必須パラメータ
    )

    // OptionパラメータをSetXXXメソッドで設定する
    execReport.SetClOrdID(clOrdID)
 
    // ...

    sendErr := quickfix.SendToTarget(execReport, sessionID)
    if sendErr != nil {
        utils.PrintBad(sendErr.Error())
    }

    return
}

実践で役立つQuickFIX/Goの知識

ここからは、QuickFIX/Goを実践で扱うために知っておいた方が良い踏み込んだトピックについていくつか解説します。Exampleで基礎的なメッセージの送受信はできるようになりましたが、セッションの持ち方や高度な扱いを知ることによってQuickfix/Goを使った開発や運用に役立つでしょう。

シーケンス番号とメッセージの永続化

FIXではメッセージの再送処理などを実現するためにシーケンス番号やメッセージの内容を永続化する必要があります。QuickFIX/Goは、これらの保存オプションとして、Memory、File、SQL、MongoDBをサポートしています。 Exampleでは、デフォルトでMemory Storeが使用されています。しかし、この設定ではサーバーの再起動時にセッションデータが失われ、メッセージの再送要求やシーケンス番号の管理に支障をきたす可能性があります。そのため、本番運用ではセッションデータの永続化を実現するために、別のオプションを選択する必要が出てくるでしょう。

以下に、PostgreSQLを使用した設定例を示します。 QuickFIX/GoのリポジトリにテーブルのDDLが含まれているので、まずはこのDDLを使ってデータベースを作成します。ローカル環境で試したい場合はDockerなどでデータベースを起動するのが良いでしょう。

下記がexecutorのExampleで、Memory Storeの代わりにSQL Storeを使う例です。アプリケーションのグローバル設定にSQL Storeとして使うデータベースの設定を追加し、sql.NewStoreFactory関数でSQL Storeを初期化できます。

func execute(cmd *cobra.Command, args []string) error {
    // ...

    appSettings, err := quickfix.ParseSettings(bytes.NewReader(stringData))
    if err != nil {
        return fmt.Errorf("error reading cfg: %s,", err)
    }

    // SQL storeの設定
    appSetting.GlobalSettings().Set(fixconfig.SQLStoreDriver, "postgres")
    appSetting.GlobalSettings().Set(fixconfig.SQLStoreDataSourceName, cfg.Database.DSN())
    appSetting.GlobalSettings().Set(fixconfig.SQLStoreConnMaxLifetime, cfg.Database.MaxLifeTimeSeconds())
    messageStoreFactory := sql.NewStoreFactory(appSetting)

    logger := utils.NewFancyLog()
    app := newExecutor()

    utils.PrintConfig("acceptor", bytes.NewReader(stringData))
    acceptor, err := quickfix.NewAcceptor(app, messageStoreFactory, appSettings, logger)

    // ...
}

SQL Storeを使ってクライアントとサーバーでメッセージの通信をしてみると、セッションのシーケンス番号やメッセージがデータベースに永続化されていることを確認できます。Memory StoreではなくSQL Storeを使うことで、FIXサーバーの耐障害性を向上させることができるでしょう。

セッション管理の詳細

「セッションの概要」でセッションのライフサイクルについて説明しました。セッションはLogonからLogoutまでの単位であり、その間にクライアントとサーバーはコネクションを維持します。ここでは、QuickFIX/Goがどのようにセッションを管理し、メッセージを処理しているのかを詳しく見ていきます。これらを理解することで、セッションの状態遷移やメッセージ処理の流れを把握しやすくなり、デバッグやトラブルシューティングの際に役立ちます。

QuickFIX/Goにおけるセッション管理の中心的な役割を果たすsession構造体について説明します。QuickFIX/Goでは、各セッションごとにsession構造体がインスタンス化され、メッセージの送受信やセッションの状態管理が行われます。以下に、今回の説明に関連するフィールドをsession構造体から抜粋しました。

type session struct {
    // ...

    store MessageStore
    sessionID SessionID
    messageOut chan<- []byte
    messageIn  <-chan fixIn
    stateMachine
    application  Application

    // ...
}
  • store: メッセージの記録や再送のためのメッセージやシーケンス番号を永続化する。
  • sessionID: セッションを識別するIDで、FIXバージョンと送信元組織のID、送信先組織のIDから構成される。
  • messageOutとmessageIn: messageOutはセッションから送信されるメッセージを、messageInはセッションに受信されるメッセージを処理するために使用される。どちらもメッセージの送受信を非同期で行うためのチャネルであり、別のゴルーチンでnet.Connから受け取ったメッセージを通知するために使われる。
  • stateMachine: セッションの状態遷移や、メッセージの受信、タイムアウトイベントの処理を行い、セッションのライフサイクルを管理する。
  • application: FIXプロトコルのアプリケーションレベルのメッセージ処理を行う。メッセージの送受信時にアプリケーション固有のロジックを実行するために使用される。

サーバークライアント間で送受信されるメッセージは、messageOut、messageInを通してアプリケーションレイヤーに渡ります。また、メッセージの送受信の際には、それぞれのメッセージのシーケンス番号が正しいか検証され、正しい場合には最新のシーケンス番号をstoreに永続化します(Memory Storeを使う場合は永続化されないです)。

また、sessionはstateMachineとして実装されており、クライアントの接続状態をstateで管理しています。それぞれのstateでメッセージの受信時にどのような挙動をすべきかが実装されており、例えばlogonStateの状態でLogonリクエストを重複して受け取った場合は新しい接続を拒否するように実装されています。状態遷移を示した図も合わせてご参照ください。

stateDiagram
    [*] --> latentState
    latentState --> logonState: Logonを受け取る
    logonState --> latentState: Logon時にエラーが発生<br/>or<br/>Logonが拒否される
    logonState --> inSession: Logonの成功
    inSession --> logoutState: セッションレイヤーでの<br/>エラー発生
    inSession --> latentState: Logoutの成功
    logoutState --> latentState: Logoutの成功
    inSession --> resendState: 不正なシーケンス番号
    resendState --> inSession: シーケンス番号の<br/>不整合解消

サーバーを実装する際にstateMachineを意識することはないと思いますが、QuickFIX/Goのコードリーディングする際はどのようなstateが存在するか知っておくことが役立つと思います。stateはlatentStateとして初期化され、Logonが成功するとinSessionに移行します。inSessionがFIXメッセージを処理するためのstateです。そしてLogoutが成功すると再びlatentStateに戻ります。

次に、FIXサーバーがどのようにコネクションを開始し、アプリケーションレイヤーにメッセージがルーティングされるかを見ていきます。

FIXサーバーの起動からメッセージ処理の流れ

各フローの詳細は以下の通りです。

  1. FIXサーバーは起動時にnet.Listenerを作成し、クライアントからの接続を待ち受ける。同時に、別のゴルーチンでセッションのメッセージ受信処理が開始される。
  2. クライアントからの接続リクエストを受け付ける。
  3. クライアントからの接続を元にメッセージを送受信するためのゴルーチンが起動する。このゴルーチンではwriteLoopreadLoopが動作し、コネクションへのメッセージの読み書きする。同時に、handleConnectionsessionの間でメッセージを通知するためのチャネルを初期化する。
  4. クライアントからのリクエストはreadLoopで受信し、チャネルを通じてセッションにメッセージが送信される。サーバーからのメッセージはチャネルを通じてwriteLoopに通知され、コネクションに書き込まれる。

sessionmessageInチャネルがメッセージを受け取ると、シーケンス番号のバリデーションが行われた後に、FromAppフックがコールされます。シーケンス番号のバリデーションにはメモリのキャッシュが使用される実装になっています。これはFIXプロトコルが単一のサーバーで実行されることを前提にしたプロトコルであることが起因しており、このような実装がFIXサーバーのスケールを難しくする要因です。アプリケーションレイヤーでメッセージが処理された後、シーケンス番号がインクリメントされて永続化されます。

session管理の詳細はFIXを扱う上で必ずしも知る必要はありませんが、デバッグの際には非常に役立ちます。セッションの状態やメッセージ処理、永続化の流れを理解することで、問題の特定や解決が容易になり、より深い理解を促進する助けとなることを願っています。

カスタムフィールドの扱い

ここではQuickFIX/Goでカスタムフィールドを扱うときの実装方法を説明します。FIXはメッセージのフィールドまで標準化されたプロトコルですが、アプリケーションの種類によっては標準化されていないフィールドを扱う必要が出てきます。enechainの場合、電力業界特有のフィールドを扱う必要があり、金融業界を軸に設計されたFIXの標準フィールドだけでは足りないケースがあります。 Exampleで見たように、QuickFIX/GoではFIXで標準化されたメッセージやフィールドは、github.com/quickfixgo/fixNというパッケージをimportすることで扱えます(NはFIXバージョンです)。このパッケージは、quickfixに置かれているschemaを元に自動生成されたコードで構成されています。 例えばExecutionReportの場合、executionreport.New関数で必須フィールドを渡してメッセージを初期化し、オプショナルなフィールドはSetXXX関数で設定できるようにコード生成されています。

func (e *executor) OnFIX50NewOrderSingle(msg fix50nos.NewOrderSingle, sessionID quickfix.SessionID) (err quickfix.MessageRejectError) {
    // ...

    execReport := executionreport.New(
        // ExecutionReportの必須パラメータ
    )

    // OptionパラメータをSetXXXメソッドで設定する
    execReport.SetClOrdID(clOrdID)
 
    // ...
}

ここでメッセージにカスタムフィールドを設定する際には、SetXXXのような形式でフィールド値を設定できない(カスタムフィールドのSetterはコード生成されていない)ので、別の方法を使う必要がでてきます。 この問題を解決するには2つの選択肢があります。

  • カスタムのタグを定義してSetFieldという汎用的なメソッドでカスタムフィールドを追加する
  • カスタムフィールドを定義したXMLから生成したコードを使う

1つ目の方法は簡単です。QuickFIX/Goでタグは単なるintの拡張型なので次のようにカスタムタグを定義してSetFieldでフィールドを設定できます。

func (e *executor) OnFIX50NewOrderSingle(msg fix50nos.NewOrderSingle, sessionID quickfix.SessionID) (err quickfix.MessageRejectError) {
    // ...

    execReport := executionreport.New(
        // ExecutionReportの必須パラメータ
    )

    // カスタムフィールドの設定
    var customTag = quickfix.Tag(10000)
    execReport.SetField(customTag, quickfix.FIXString("custom value"))
 
    // ...
}

しかし、この方法だとFieldに設定される型を指定できないことや、コードを追わないとメッセージの構造を把握できなくなってしまうことが問題になるでしょう。 そこで1つ目の方法よりは手間ですが、独自のフィールドを定義したXMLを作って、そこからコードを生成できます。 XMLからコード生成をするにはgenerate-fixというツールが利用できます。

$ go install github.com/quickfixgo/quickfix/cmd/generate-fix@v0.9.6

試しにExampleのexecutorを拡張するので、QuickFIXのスキーマからFIX50SP2.xmlとFIXT11.xmlをExampleリポジトリのディレクトリにコピーしてきます。さらに、生成されたコードの置き場としてgen/fixディレクトリを作ります。 現時点でのディレクトリ構成は下記のようになっているはずです。

.
├── gen
│   └── fix
├── go.mod
├── go.sum
├── qf.go
└── schema
    ├── FIX50SP2.xml
    └── FIXT11.xml

この状態で下記のコマンドを実行すると、gen/fix配下にschemaを元にしたコードが生成されます。pkg-rootなどはパッケージ構成に従って変更してください。

$ cd gen/fix
$ generate-fix -pkg-root github.com/quickfixgo/examples/gen/fix ../../schema/FIX50SP2.xml

XMLからコードが生成できるようになったら、あとは追加したいカスタムフィールドをXMLに追記してからコードを再生成するだけです。例えば、FIX50SP2.xmlのL58L5625CustomFieldというフィールドを追加してコードを再生成してみます。

 <message name='ExecutionReport' msgcat='app' msgtype='8'>
        <field name='CustomField' required='N' /> <!-- 追加 -->
        <!-- ... -->
    </message>
    <!-- ... -->
    <field number='10000' name='CustomField' type='STRING' /> <!-- 追加 -->
    <!-- ... -->

そうすると、生成されたexecutionreport.ExecutionReportには、SetCustomFieldというセッターが含まれており、これを使うことでカスタムフィールド付きのメッセージオブジェクトを作ることができます。この方法では、フィールドの型も指定できますし、メッセージのスキーマ元に宣言的に開発を進められるのが利点です。

import "github.com/quickfixgo/examples/gen/fix/fix50sp2/executionreport"

func (e *executor) OnFIX50NewOrderSingle(msg fix50nos.NewOrderSingle, sessionID quickfix.SessionID) (err quickfix.MessageRejectError) {
    // ...

    execReport := executionreport.New(
        // ExecutionReportの必須パラメータ
    )

    // カスタムフィールドの設定
    execReport.SetCustomField("custom value")
 
    // ...
}

カスタムフィールドの数が少なく簡易的に実装したい場合は1つ目の方法を採用できますし、スキーマをしっかり管理したい場合には2つ目の方法を採用するのが良いと思います。

まとめ

この記事では、Go言語を用いたFIXサーバーの開発において、QuickFIX/Goを活用する方法を解説しました。FIX Protocolの基本概念やQuickFIX/Goの設定、メッセージのルーティングとロジックの実装方法、カスタムフィールドの扱い方、セッション管理の詳細について紹介しました。FIXに関する情報は多くないため、本記事が開発者の方々にとって少しでも参考になれば幸いです。

enechainでは、事業拡大のために共に技術力で道を切り拓いていく仲間を募集しています。

herp.careers

herp.careers


  1. FIX ProtocolではサーバーをAcceptor・クライアントをInitiatorと呼びますが、本記事ではサーバー・クライアントに統一します。
  2. Financial Information eXchange Protocolの略、以降FIXと呼ぶ
  3. FIX Trading Communityの公式サイトに仕様が掲載されています。
  4. こちらのテックブログで詳しく紹介されているのでご覧ください。
  5. FIXimateはFIX Trading Communityが管理するウェブアプリケーションで、電文の内容やタグの意味を調べることができます。