Node.js上でのBigQueryのスキーマ管理

ogp

この記事は enechain Advent Calendar 2023 の 5 日目の記事です。 enechain で eScan チームのソフトウェアエンジニアをしている Yamato が担当します。

はじめに

弊社の eScan という ETRM ツールでは、電力の調達データ、販売量データ、電力市場の価格、燃料価格などを元に、ユーザのリスク量を算出しています。これらのデータに数理的な処理を通してリスク量を算出しています。以前は Node.js 上で実行していましたが、データ量が増えてきたため、BigQuery に移行しました。

techblog.enechain.com

計算自体は Argo Workflows を基盤にして制御しています。また、ユーザが入力するデータのほとんどは PostgreSQL に保存しており、それらのデータは BigQuery に計算のたびに同期しています。

今回は、そのような環境下で、BigQuery のスキーマをどのように管理しているかをご紹介します。

ちなみに、eScan はフレームワークとして NestJS を用いておりますが、今回の焦点はスキーマの管理に当てており、特にフレームワークが絡む点が無いので、Node.js 上で実行していると考えてください。

BigQuery のスキーマ管理

現在の eScan では 100 近いテーブルを BigQuery 上で扱っています。テーブルには大きく 2 つのパターンがあります。

  • パターン 1: BigQuery 上でのみ扱うテーブル
  • パターン 2: PostgreSQL からコピーしてくるデータを格納するためのテーブル

それぞれでどのように扱っているかを紹介します。

パターン 1: BigQuery 上でのみ扱うテーブル

テーブル定義は以下の型を持つオブジェクトとして TypeScript で管理しています。

type Mode = "NULLABLE" | "REQUIRED";
export type BqColumnDefinition = { name: string; type: string; mode: Mode };
export type BqDefinition = Array<BqColumnDefinition>;

// キーをテーブル名、値をテーブルを定義しているオブジェクトとして扱う
export type BqSchema = Record<string, BqDefinition>;

具体的には以下のようなオブジェクトになります。

const revenueSchema: BqDefinition = [
  {
    name: "requestId",
    type: "INTEGER",
    mode: "REQUIRED",
  },
  {
    name: "yearMonth",
    type: "DATE",
    mode: "REQUIRED",
  },
  {
    name: "revenue",
    type: "FLOAT64",
    mode: "REQUIRED",
  },
];

至ってシンプルな形ですが、現状のユースケースでは十分に機能しています。

パターン 2: PostgreSQL からコピーしてくるデータを格納するためのテーブル

リスク計算で使う大半のインプット情報、例えば契約データや件数データなどは PostgreSQL(ORM は Prisma を使用)に保存されています。 計算を行うたびに、インプット情報を BigQuery にコピーしていますが、常に PostgreSQL と BigQuery のスキーマの状態を一致させる必要がありました。 そこで、 prisma-json-schema-generator を導入し、出力されたスキーマと、BigQuery のテーブルスキーマの差分を判定して BigQuery 側のスキーマに差分がある場合はスキーマ変更を行うような仕組みとしました。

prisma-json-schema-generator

prisma-json-schema-generatorは Prisma のスキーマ定義ファイルから JSON schema を生成するツールです。prisma migrate devを実行すると自動的に JSON schema が生成、更新されます。導入方法等はこちらを参考にしてください github.com

JSON schema をもとに、パターン 1 で紹介したBqDefinitionの形に変換します。

const bqTypeMap: Record<string, string> = {
  number: "FLOAT64",
  integer: "NUMERIC",
  "date-time": "DATETIME",
};

export const generateBqSchema = async (
  // 対象としたいPostgreSQLのテーブル名
  tables: Array<string>
): Promise<BqSchema> => {
  const result: BqSchema = {};
  const prismaSchema: PrismaSchema = await import(
    resolve(__dirname, `../../prisma/generated/json-schema.json`)
  );
  tables.forEach((table) => {
    const tableDefinition = prismaSchema.definitions[table];
    const columns = tableDefinition.properties;

    const schema = Object.entries(columns).reduce(
      (acc: BqDefinition, [name, columnDefinition]) => {
        if (isRelationShipDefine(columnDefinition)) return acc;
        const type = getColumnType(columnDefinition);
        const mode = isRequired(tableDefinition, name)
          ? "REQUIRED"
          : "NULLABLE";
        acc.push({
          name,
          type,
          mode,
        });
        return acc;
      },
      []
    );

    result[table] = schema;
  });
  return result;
};

const getColumnType = (prismaProperty: PrismaProperty): string => {
  const type = prismaProperty.type;

  if (isDateTime(prismaProperty)) {
    return "DATETIME";
  }

  if (Array.isArray(type)) {
    // bqTypeMapのkeyにtypeが存在する場合はそのvalueを返す
    const bqType = type.find((t) => bqTypeMap[t]);
    return bqType ? bqTypeMap[bqType] : "STRING";
  }
  return bqTypeMap[type] || "STRING";
};

const isRequired = (prismaTable: PrismaTable, column: string): boolean => {
  return prismaTable.required.includes(column);
};

const isDateTime = (prismaProperty: PrismaProperty): boolean => {
  return !!prismaProperty.format?.includes("date-time");
};

const isRelationShipDefine = (prismaProperty: PrismaProperty): boolean => {
  return (
    !!prismaProperty.anyOf ||
    !!prismaProperty.$ref ||
    (prismaProperty.type === "array" && !!prismaProperty.items?.["$ref"])
  );
};

補足

generateBqSchemaが JSON schema から引数で渡された任意のテーブルをBqDefinitionへと変換させています。

注意が必要なのは、Prisma 上での型と BigQuery 上での型で互換性がないものに対する考慮です。一番わかりやすいのは Enum 型で、BigQuery ではそのまま使用できないため STRING に問答無用で変換をかけています。

実装済みの型変換
Prisma の型 BigQuery の型
Enum STRING
number FLOAT64
integer NUMERIC
date-time DATETIME

また、Prisma の Relationship も JSON schema で吐き出されるので、カラムとして定義されないように除外する処理を挟んでいます。

スキーマの同期

このように生成したスキーマは、BigQuery SDK の機能を使ってスキーマの同期を行うようにしています。テーブル追加時はもちろん、カラムの追加や削除にも対応が可能なようにしています。実装自体は BigQuery SDK クラスを継承する形で、migrate メソッドを生やしています。

export class BqClient extends BigQuery {
  public datasetId = process.env.BQ_DATASET_ID;

  constructor() {
    super();
  }

  async migrate(
    schemaMap: BqSchema,
  ): Promise<void> {
    const dataset = this.dataset(this.datasetId);

    await Promise.all(
      Object.entries(schemaMap).map(async ([tableName, schema]) => {
        // 同じテーブル名で既存テーブルから取得を試みる
        const table = await this.getTable(tableName);

        // 既存テーブルが存在しなければ新規にテーブル作成
        if (!table) {
          await dataset.createTable(tableName, {
            schema,
          });
          return;
        }

        // 既存テーブルと更新差分がなければなにもしない
        if (
          !isEqual(existingFields, newSchema);
        ) {
          return;
        }

        // テーブルの更新・削除を加味して新しいスキーマを作る
        const { newSchema, deleting } = this.convertSchema(
          table.metadata,
          schema
        );
        table.metadata.schema = newSchema;

        await dataset.table(tableName).setMetadata(table.metadata);
        await this.deleteColumns(tableName, deleting);
      })
    );
  }

  private convertSchema(
    existingMetadata: Metadata,
    newSchema: BqDefinition
  ): { newSchema: BqDefinition; deleting: BqDefinition } {
    const existingSchema: BqDefinition = existingMetadata.schema?.fields;
    if (!existingSchema) {
      throw new InternalServerErrorException("not found schema");
    }

    // requiredカラムのものをnullableに変更することは許容しない
    newSchema = newSchema.map((newField) => {
      const existingField = existingSchema.find(
        (existingField) => existingField.name === newField.name
      );
      if (
        newField.mode === "REQUIRED" &&
        existingField &&
        existingField.mode === "NULLABLE"
      ) {
        newField.mode = "NULLABLE";
      }
      return newField;
    });

    // カラムが削除されていた場合、newSchemaにも追加し、削除しないようにする
    const deletingFields = existingSchema.filter((existingField) => {
      return !newSchema.find((newField) => {
        return newField.name === existingField.name;
      });
    });
    newSchema = newSchema.concat(deletingFields);

    // 新規カラム追加は、nullableにする
    const newFields = newSchema.filter((newField) => {
      return !existingSchema.find((existingField: BqColumnDefinition) => {
        return newField.name === existingField.name;
      });
    });
    newSchema = newSchema.map((f) => {
      const newFieldNames = pluck(newFields, "name");
      if (newFieldNames.includes(f.name)) {
        return {
          ...f,
          mode: "NULLABLE",
        };
      }
      return f;
    });

    return {
      newSchema,
      deleting: deletingFields,
    };
  }
}

補足

カラムの削除は、table.setMetadataでは実行できません。したがって、ALTER TABLE文を別で発行することで対応しています。

  private deleteQuery(tableName: string, deleting: BqDefinition) {
    return `ALTER TABLE ${this.datasetId}.${tableName} ${deleting
      .map(({ name }) => `DROP COLUMN IF EXISTS ${name}`)
      .join(", ")};`;
  }

  private async deleteColumns(tableName: string, deleting: BqDefinition) {
    if (deleting.length === 0) {
      return;
    }
    const query = this.deleteQuery(tableName, deleting);
    const [job, _] = await this.createQueryJob(query);
    await this.waitComplete(job, 1000 * 60 * 5);
  }

また、SDK のcreateQueryJobメソッド周りについても工夫しています。createQueryJobの返り値は Event Emitter になっており、コールバック地獄になりがちですが、waitCompleteメソッド内で Promise 化し、async 関数で処理できるようにしています。

  async waitComplete(job: Job, timeout: number): Promise<BigQueryJobStatus> {
    const realTimeout = timeout - 1000 * 60 * 1
    let sId = {} as NodeJS.Timeout
    let tId = {} as NodeJS.Timeout
    return new Promise<BigQueryJobStatus>((resolve) => {
      sId = setTimeout(() => {
        return resolve('LONGRUN')
      }, realTimeout)

      if (!job.metadata.status?.state) {
        throw new InternalServerErrorException(
          'ジョブのステータスが取得できませんでした。',
        )
      }
      const pollInterval = 1000
      tId = setInterval(() => {
        if (job.metadata.status.state === 'RUNNING') {
          job.poll_((_, __) => {
            console.log(`polling big query job: ${job.metadata.id}`)
          })
        }
        if (job.metadata.status.state === 'DONE') {
          return resolve('SUCCESS')
        }
        if (job.metadata.status.state === 'FAILED') {
          return resolve('FAIL')
        }
      }, pollInterval)
    })
      .then((ret) => {
        return ret
      })
      .catch((err) => {
        throw err
      })
      .finally(() => {
        clearTimeout(sId)
        clearInterval(tId)
      })
  }

運用

上記のスキーマの同期は、BigQuery 上のスキーマが適切に保たれるように,パターン 1 の BigQuery 上のみのスキーマに関してはリリースのたびに、パターン 2 の PostgreSQL のスキーマとの同期に関しては計算実行時に毎回実行されます。 PostgreSQL 側でマイグレーションがあった際も、特に気にすることなく BigQuery のスキーマを追随してくれるのは煩わしさから開放され、開発体験が良いものとなっています。

一方で、現状の課題も存在します。 例えばカラムが NULLABLE から REQUIRED に変更できないことによる差異が生まれてしまうことです。

eScan では、各ユーザごとに Dataset を分けて管理しています。 ユーザの Dataset は、ユーザが eScan を使用を開始するタイミングで作成され、同時にテーブルが作られます。

あるタイミングまでは NULLABLE だったカラムが、何らかの事情で変更が入り REQUIRED に変更された場合を考えてみます。 すると、変更前までに作られた Dataset では NULLABLE だったが、変更以降に作成された Dataset では REQUIRED になっている、という状態が発生します。

このことが即障害につながることはあまり考えられないですが、差分が発生することにより将来的に影響が及び兼ねないものになっています。 今後の展望としてはそのあたりを上手く吸収し、より強靭な仕組みにしていけるよう取り組んでまいります。

まとめ

本記事では eScan が BigQuery のスキーマをどのように管理しているかをご紹介いたしました。 実際に 100 近いテーブルを上記の方法で運用しておりますが、特に大きなトラブルはなく、安定した稼働が行えています。 大きなデータを扱う際、BigQuery は強力なツールとなりますが、そのスキーマ管理については様々な方法があると思います。 本記事が参考になれば幸いです。

次回 Advent Calendar6 日目は@K-Fujimura さんが Human in the Loop についての素敵な記事を書いてくれます!乞うご期待!

Advent Calendar の記事を通して enechain に興味を持った方、面白いことをやっているな、と少しでも感じてくださった方は、ぜひお気軽に以下からご応募よろしくお願いします。

herp.careers