NestJSで作るバッチシステムとアーキテクチャの遍歴

ogp

この記事は enechain Advent Calendar 2023 の 13 日目の記事です。 本日は eScan デスクのエンジニアの小沢が担当します。

eScan では以前 BigQuery + Argo Workflowsを利用した計算処理基盤の構築 で紹介した通りリスク計算では様々な計算をしており、その中で使われるデータはユーザーが入力する情報だったり、計算固有のデータや、日々変化する価格情報など多岐にわたります。その中で日々変化するような価格情報はデータプラットフォームデスクが管理するデータを毎日取り込みeScan側で加工し、最終的にはBigQueryへ書き込む。という流れで扱っており、この仕組みをバッチシステムとしてプロダクト初期から構築しています。

本記事では、バッチシステムがプロダクト初期から現在(※2023年12月時点)にかけて、開発初期から現在にかけてどのような変化を遂げてきたか、その時々の状況を振り返りながら紹介します。

初期構成

プロダクト初期では、特定の時間になったらデータを取り込み・加工・書き込みを行うバッチ処理が複数必要でした。このとき、開発メンバーは私を含めて5名でeScan本体の開発が活発な状況であることと、互いの開発サイクルは異なることを想定して別のリポジトリで開発することとしました。

バッチシステムの構成は、他のメンバーも開発することを想定してeScanのバックエンドと同じ技術スタックとしました。

image1

このときバッチシステムを稼働させる実行基盤は定まっていなかったのですが、システム自体はKubernetes上で実行することは前提条件としてあったため、それと合わせて以下の要件が実現可能であるArgo Workflowsを採用しました。

※当時の技術調査時の要件を一部抜粋しています - workflowやプロセスの自動化が出来る - 端的に言うと、Cloud workflow(GCP)やStep functions(AWS)のようなものがほしい - 手動実行・定期実行(cron)が出来る - GUIのダッシュボードがある - ジョブ一覧、成功・失敗などのステータス閲覧 - ジョブのリラン、停止などの操作が出来る - ジョブに対する入出力が見れる - ログが見れる(datadogに連携されていれば見れなくても可) - 直列・並列処理、条件分岐、エラーハンドリング、リトライができる - コード管理出来る - ジョブ間(step間)のパラメータ渡しが出来る - 繰り返し使うようなもの(モジュール的なもの)に専用の定義がある

ディレクトリ構成とアーキテクチャ

バッチシステムの構成をバックエンドと同じ構成としましたが、ディレクトリ構成はNestJSのWorkspaces機能を利用したmonorepo構成としました。monorepoを採用した背景には、実装するアプリケーションとArgo Workflowsの1ジョブを同じ単位で扱い、アプリケーションの責務を明確にしたかったというのが大きな目的としてありました。

また、バッチシステムは長時間実行されることがあるためジョブが失敗した場合に最初からやり直すとその日のうちに対応が終わらないといったケースが考えられます。そのときは、成功済みのものはリトライせずに失敗したところからリトライできると残りのジョブだけ実行すれば良いので再度長時間待つということはなくなります。この他にも、ジョブが失敗した場合はどのような修正すればよいか判断するために中間データは常に保持しておくことも重要です。 そのためジョブは意味のある単位でなるべく小さく保つことが重要で、それに加え、ジョブが小さくなることで同じ処理や同じデータを扱う場合はジョブを再利用できるとうメリットにも繋がります。

これらを考慮し、構築したバッチステムのディレクトリ構成は以下となります。ここには記載されていませんが、Argo Workflowsの定義ファイル(yaml)は、別ディレクトリで管理しています。

.
|-- apps
|   |-- app-1
|   |   |-- Dockerfile
|   |   |-- src
|   |   `-- test
|   `-- app-2
|       |-- Dockerfile
|       |-- src
|       `-- test
|-- common
|   |-- types
|   `-- utils
|-- libs
|   |-- database-lib
|   |   |-- prisma
|   |   |   |-- migrations
|   |   |   `-- schema.prisma
|   |   `-- src
|   |   |   |-- table-name-1
|   |   |   `-- table-name-2
|   |   `-- tsconfig.lib.json
|   |-- env-lib
|   |   `-- src
|   |   `-- tsconfig.lib.json
|   `-- logger-lib
|       `-- src
|       `-- tsconfig.lib.json
|-- nest-cli.json
|-- package.json
|-- tsconfig.json
`-- yarn.lock

ジョブに該当するのはapps配下の各ディレクトリ(app-1、app-2)で、アプリケーションに必要な処理はこの中で記述するようにしました。また、共通で使うようなライブラリはインターフェースのみを公開し、アプリケーション側は必要なlibsのみをDIし、公開されたインターフェースを通じてやりとりをするという実装方針としました。

こうすることで、アプリケーションは自身のディレクトリ内のみに関心を向けてコードを書くことができ、libに依存するものはDIとインターフェースのみだけ定義すれば良い。という構成となりました。また、libs内はテストコードで担保されているため、libsの実態を使うようなインテグレーションテストはほぼ書かなくてもアプリケーションとして問題のない状態を実現することができました。

運用で見えてきた課題

この構成で運用すると以下のような課題が見えてきました。

  • CIが遅いことが原因で障害回復までに時間がかかってしまう問題
  • Prismaのスキーマファイルの同期が煩雑になり、スキーマをデグレードさせてしまう問題

CIが遅いことが原因で障害回復までに時間がかかってしまう問題

CIにはGitHub ActionsとCloudBuildを併用しており、その中で各アプリケーションごとにテストやイメージビルドを行っていました。

初期のタイミングで20弱ほどのジョブがあり、1つのジョブで見ると数分で終わるものの、数が多いと並列に実行してもかなりの時間を要することになってしまいました。このときは変更があったもののみをCIの対象にするような仕組みが準備できていなかったことも要因の一つに挙げられます。

image3

Prismaのスキーマファイルの同期が煩雑になり、スキーマをデグレードさせてしまう問題

バックエンドとバッチシステムは同じRDBを参照するため、スキーマファイルを同期させる必要がありました。スキーマの同期方法として、サブモジュールを使ったり、スキーマファイルをパッケージ化するなど検討しましたが最終的にはバックエンドのスキーマを正として、毎回バッチシステム側で差分を取り込み、バッチシステム側で修正が発生したらバックエンド側へ戻す。という方法を取りました。

バッチシステム側で頻繁にテーブル変更が発生することはありませんでしたが、必ず作業が発生し、仮にバックエンド側への戻し作業でミスが発生したら意図しないテーブル変更が発生してしまうため、今思うと危険な作業であったと感じます。

プロダクト構成の見直し

プロダクトを運用する中で上記のような課題に直面し、特に CIが遅いことが原因で障害回復までに時間がかかってしまう問題 は開発スピードを落としてしまう大きな原因となっていました。この状態は開発スピードを維持・向上させるために早く脱却する必要があったため、バッチシステムをバックエンドへ移植することにしました。

バックエンドに移植することにした背景は以下のとおりです。

  • バックエンドのテストやビルドのジョブは一つであるため、バッチシステムほど長い時間はかからない
  • Prismaのスキーマファイルの同期作業がなくなる
  • ライブラリのバージョンアップなど2重でかかっていたメンテナンスコストを削減できる

これらを選択した背景には、バックエンドのイメージファイルを使い、引数によって起動するアプリケーションを分ける。という仕組みが確立されたことが大きく関係しています。

引数によって起動するアプリケーションを分ける。という仕組み

プロダクト開発が進むと様々機能や仕組みが必要になってきます。eScanも最初はWebアプリケーションとしてのバックエンドとバッチシステムだけでしたが、非同期処理を実行するためのワーカーが必要になりました。

ここでどういうアプローチを取るか検討しましたが、バッチシステムのように別リポジトリで管理することは今後の開発スピードに影響すると考え、バックエンドと同じリポジトリで開発、同じイメージファイルにまとめ、エントリーポイントによって起動するものを分けるというアプローチを選び、実際に以下のようなコードに落とし込みました。

const runMode = (value: string) => {
  const result = value.match(/--run-mode=(\w+)/)
  switch (result?.[1]) {
    case 'web':
      return 'web'
    case 'worker':
      return 'worker'
    default:
      throw new Error(`Unexpected run mode: ${result?.[1]}`)
  }
}


const workerTypes = {
  worker1: Worker1AppModule,
  worker2: Worker2AppModule,
}

const detectWorkerAppModule = () => {
  return (
    Object.entries(workerTypes).find(([typeName]) => {
      return argv.some((arg) => arg === `--worker-type=${typeName}`)
    })?.[1] ?? workerTypes.worker1
  )
}

async function listenWorker() {
  const module = detectWorkerAppModule()
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    module,
    {
      strategy: new NullTransportStrategy(),
      bufferLogs: true,
    },
  )
  await app.listen()
}

async function bootstrap() {
  const mode = runMode(argv.join(' '))

  switch (mode) {
    case 'worker':
      await listenWorker()
      break
    default: {
      const app = await NestFactory.create(AppModule, {
        bufferLogs: true,
      })

      await app.listen(5000)
    }
  }
}

void bootstrap()

実際にアプリケーションを起動するときは、 webの場合は今まで通り nest start で起動し、ワーカーの場合は nest start -- --run-mode=worker --worker-type=worker1で起動するようになりました。

この他にも、この仕組みを更にエンハンスしたNest Commanderのようなコマンドベースのエントリーポイントも誕生しました。

バッチシステム移植後のディレクトリ構成

バッチシステム移植後は、このコマンドベースのエントリーポイントから起動するようにし、最終的には以下のようなディレクトリ構成となりました。

.
|-- prisma
|   `-- migrations
`-- src
|   |-- cmd
|   |   `-- commands
|   |-- logger
|   |-- escan
|   |   |-- domain
|   |   |   |-- model
|   |   |   |-- repository
|   |   |   `-- service
|   |   |-- infrastructure
|   |   |   `-- repository
|   |   |-- modules
|   |   |-- presentation
|   |   |   |-- controller
|   |   |   |-- dto
|   |   |   `-- resolver
|   |   |-- usecase
|   |   `-- workflow
|   |       |-- job-1
|   |       |-- job-2
|   |       |-- database-lib
|   |           |-- table-name-1
|   |           `-- table-name-2
|   `-- types
|-- Dockerfile
|-- nest-cli.json
|-- package.json
|-- pnpm-lock.yaml
|-- tsconfig.json
|-- vite.config.ts
`-- vitest.config.ts

バックエンドでは元々monorepoは使用していませんでしたが、以下の理由によりバッチシステムの移植時にもmonorepoは導入しませんでした。

  • 起動するアプリケーションを分けられるためmonorepoのメリットが薄まった
  • ビルドツールをViteにしていたため、バックエンド+バッチシステムでのビルドやホットリロードに時間がかからなくなった

その他にも、libsでまとめていた環境変数の取り扱いはバックエンドとバッチシステムで統合したり、既存のロガーを使うなど対応を行いました。

結果として、バッチシステムの基本的なDIや実装方法は変更なく、ディレクトリ階層(src/escan/workflow配下)のみ大きく変更される形で移植することができました。

運用で見えてきたさらなる課題

バッチシステムをバックエンドへ移植することで開発自体は安定してきましたが、開発当初から以下の問題が発生していました。

  • スロークエリが発生する問題
  • OOMが発生してしまう問題

クエリの改善や省メモリ化といった対応は行いましたが解決には至りませんでした。

スロークエリが発生する問題

スロークエリが発生している箇所を調べると、レコード数が数百万あり、クエリの対象レコードに絞ると数十万件というレコードに対して集計関数を使っているというクエリが該当しました。

しかし、このクエリ自体はすでに最適化されており、インデックスもすでに考慮済みであったためRDB上ではこれ以上改善することは難しい状況にありました。また、このデータは日々蓄積されていくためクエリのパフォーマンスは少しずつ悪化していく可能性が高い状態でした。

OOMが発生してしまう問題

バッチシステムを運用していると、たまにOOMで落ちてしまうジョブがありました。そのジョブでは大量のデータを加工し、RDBに書き込む必要がありましたが、加工に必要なデータセットの量が多く削減には限界がありました。最低限OOMにならないしきい値は確認できたため、ヒープの調整とKubernetes側のLimitRangeの修正は行いジョブ失敗にならないような対応だけ行っていましたが、根本解決には至っていませんでした。

実行基盤の移植

上記の問題は実行基盤であるNode.js上で解決するには難しい問題でしたが、バッチシステムの開発から半年が経過したタイミングで、BigQuery上で安定してジョブを実行する基盤が整えられました。どちらの課題もデータ量に依存するものであり、今後データ量が増えてもBigQuery上であれば大きな問題になることは少ないと考え、SQLで実装できる処理はすべてBigQuery上で行うように移植をしました。

並行稼働と数値検証

最初の移植では、Node.jsかつTypeScriptという構成に変更はなく、メインのコード部分の修正は行っていた無いため既存のテストのみで数値検証は担保されていました。しかし、今回はNode.jsかつTypeScriptで実装されたものをSQLで置き換えるため、ロジック移行部分に不安がありました。そのため、既存のバッチシステムと新しいバッチシステムを並行稼働し、日々数値検証を行い修正の妥当性を検証しました。

最後に数値検証の結果に異常がないことを確認し、新しいバッチシステムだけ残し現行のバッチシステムを停止させました。結果として、OOMが発生していたバッチ処理はなくなり処理時間も30分程度かかっていた処理が10分未満で終わるようになりました。

最後に

今回は NestJSで作るバッチシステムとアーキテクチャの遍歴について紹介しました。初回構築段階で今のような構成になるとは思っていませんでしたが、運用して挙げられた課題は少しずつ解消されているため、現在の形にたどり着いてよかったと感じています。これからも日々の監視を行い、課題と向き合いながら運用が苦にならないバッチシステムを作っていきたいと思います。

明日は同じeScanデスクの平田さんから「BigQuery Emulatorの活用例、直面した問題の紹介とその解決アプローチ」についてご紹介いただきます。eScanはBigQuery Emulatorを活用しており、その中で発生した問題や解決策を紹介していただけると思いますので是非チェックしてみてください。

enechainでは、一緒に事業を拡大していける仲間を絶賛募集中です。 herp.careers