bigframes.pandasを使った計算処理の検討

ogp

概要

eScanは小売事業者が目標とする利益に対して最適な電力ポートフォリオを実現するために必要なリスクヘッジの材料を提示するシステムです。 以前BigQuery + Argo Workflowsを利用した計算処理基盤の構築で紹介した通りeScanのリスク計算の基盤としてBigQueryを使っています。 BigQueryは完全マネージド型のデータハウスであり、eScanで用いるような大規模なデータに対しても高速なクエリ実行が可能です。そのため、スケーラビリティとスピードという観点で恩恵を受けられています。 他方で、GoogleSQL(BigQueryで使われるSQL)はSQLなので、テスタビリティやモジュール化、クエリやテーブルの依存関係の管理に課題があります。 BigQueryの恩恵を受けつつ、現在抱えている課題を解決できないかと考えていました。そこで、今回は新しいフレームワークであるbigframes.pandasを検証し、実運用を想定した検討を行いました。

現在のバッチ処理の記述方法

はじめに現在のeScanにおける計算のバッチ処理の記述方法を紹介し、それに対する課題点を列挙していきます。

リスク計算や粗利益計算をするためには様々なバッチ処理が必要です。 下記は我々がよく書くバッチ処理の型です。ユーザ定義関数や一時テーブルを用意して、WITH句でステップバイステップに結果セットを作り、最終的に結果を中間テーブルや結果テーブルに格納するという流れになります。

-- 関数の定義
CREATE TEMP FUNCTION awesomeFunction(args: DATE) RETURNS BIGNUMERIC AS (
    CAST(... AS BIGNUMERIC)
);

-- 一時テーブル
CREATE
    OR REPLACE TEMP TABLE awesomeMatrix AS (
    SELECT
        [snippet]
         IF(
             -- 0除算を避けるためのチェック
             SQRT(1 - EXP(-2 * 10 * args3)) = 0
             OR SQRT(1 - EXP(-2 * 10 * args4)) = 0,
             0,
             (
                 -- 複雑な計算
                 (EXP(-10 * (args3 + args1 - 2 * LEAST(args1, args2))) - EXP(-10 * (args2 * args3)))
                 / SQRT(1 - EXP(-2 * 10 * args1))
                 / SQRT(1 - EXP(-2 * 10 * args4))
             )
         ) AS complex_value
    FROM (
        SELECT
            [snippet]
        FROM awesomeTable1
        CROSS JOIN awesomeTable2
        ) t
    );


BEGIN
    DELETE FROM dataset.TARGET_TABLE;
    INSERT INTO
        dataset.TARGET_TABLE ( blah blah blah )
    WITH nyaan AS (
        SELECT
            COALESCE(awesomeParameter, 0),
            GREATEST(
                    DATE_DIFF(awesomeDay, @date, DAY) / 365,
                    0
                )
        FROM table1
        LEFT OUTER JOIN table2
            ON ...
        LEFT OUTER JOIN table3
            ON ...
        ),
         ...
        )
    SELECT (blah blah blah) FROM result GROUP BY ...;

現時点(2024-03)での上記のようなSQLのバッチ処理ファイルの総数が 103 あり、以下のような統計です。

  • 最小行数: 9
  • 最大行数: 779
  • 総行数の中央値: 92
  • 総行数の期待値: 135.8

BigQueryにロジックを移行してからもバッチ処理ファイル数は増え続けています。しかし、それに伴い下記のような課題を感じるようになりました。

1. SQLの複雑さと機能不足

プログラミング言語(特に命令型)はアルゴリズムの実装やデータ処理が得意であるのに対して、SQL(宣言型)はデータの抽出、更新、挿入、削除をするために設計されています。このような設計の違いから可読性、再利用性の違いが生まれます。

可読性: ベクトル・行列計算や数値計算に特化した関数が最近の数値計算ライブラリ(例:NumPy、pandasなど)には豊富に用意されていますが、SQLではそれらの関数が少ないため、上述のバッチ処理の例のように複雑な実装が必要になります。その結果、SQLの可読性は低くなると言えます。

再利用性:あるバッチで記述された計算を別のバッチでも再利用したいケースがあります。特に複雑な計算であれば、より安全に再利用したいです。 しかし、SQLにモダンなプログラミング言語が備えるようなモジュールシステムと同等の機能は有していません。

2. クエリーの単体テストができない

責務の分離: SQLは言語仕様上、データの取得ロジックと加工ロジックが密接に結びついています。故に、一般的なプログラミング言語のように加工ロジックを切り出して単体テストを書くことが難しいです。

WITH句の分離: WITH句を用いた複雑なSQLクエリでは、中間計算のエラー特定が難しい課題があります。 各WITH句のテストを網羅的に行うには、大量のテストデータセットが必要で、これは手間と実行時間の増大を招きます。 しかし、SQLを細分化して多数のテーブルに分けると、パイプラインが複雑化し、管理が煩雑になるというジレンマが存在します。

bigframes.pandas

現状のeScanのバッチ処理の課題を端的にいうと、データ取得とデータ加工の結合度を下げて、テスタブルな単位で凝集度を上げたいというものでしたが、その解決策としてbigframes.pandasを検討しています。

bigframes.pandasはBigQuery DataFramesの一部として提供されるpandas互換のAPIです。このAPIを利用することで、pandasのDataFrameと同じ手触りで、BigQueryのデータをBigQuery上で簡単に扱うことができます。BigQueryのデータセットを直接、あるいはSQLクエリを通じてpandasのDataFrameとして読み込むことができます。

bigframes.pandasの仕組みは、呼び出したメソッドをクエリーに変換して、BigQuery APIを裏側で実行しています。その各メソッドを実行するとBigQueryにジョブを発行して計算されます。

cloud.google.com

BigQuery DataFramesの発表は、BigQueryの2023-08-29のリリースノートに記載されており、現在(2024-03時点)でも、プレビュー段階です。

pandasとの比較

BigQuery を Pandas のように操作する に記載されいてる説明がわかりやすかったため、引用させていただきます。

通常の Pandas は、CSV ファイルなどのデータをメモリに展開した上で処理を行います。 一方で、bigframes.pandas は、Python コードはローカル(もしくはクラウド)のコンピュータで実行しますが、データを BigQuery に保持したまま BigQuery 上でデータを処理します。 bigframes.pandas のコードを実行すると、BigQuery に「セッション」が作成され、セッション内の一時テーブルを使用してデータを処理します。

つまり、pandasはそれぞれが管理するサーバで処理されるのに対して、bigframes.pandasはBigQuery上で処理されるため、本記事の冒頭で説明したように、スケーラビリティとスピードの恩恵を受けつつ、pandasのように柔軟に計算ロジックを記述できるようになります。

しかしエコシステム観点で、pandasにあってbigframes.pandasに対応してないものもあります。その一つが、panderaです。panderaを用いることによって、DataFrameの実行時バリデーションが可能になり、より堅牢なバッチ処理が可能になります。

pandera.readthedocs.io

GoogleSQL vs bigframes.pandas

bigframes.pandasを使ったバッチ処理が課題を解決できるか検証するために、GoogleSQLとbigframes.pandasを比較してみます。

WITH / SELECT / JOIN

GoogleSQL

以下のようなクエリーで、ローカルとBigQuery Studioで実行環境別の実行速度を計測します。今回は簡単な実験のため、すべてのクエリーの実行の試行数は2回の平均とします。

WITH table1 AS (
    SELECT
        id,
        yearmonth,
        price
    FROM dataset.table1
    WHERE
            id = '{id}'
    ),
    table2 AS (
        SELECT
            id,
            yearmonth,
            price
        FROM dataset.table2
        WHERE
            id = '{id}'
        )
SELECT
    id,
    yearmonth,
    table1.price + table2.price AS price
FROM table1
FULL OUTER JOIN table2
    USING (id, yearmonth)

bigframes.pandas

GoogleSQLに記述したクエリーをbigframes.pandasで記述すると以下のようになります。実行環境は、ローカルとColab Enterpriseです。

# テーブルtable1を取得
table1 = bpd.read_gbq(f'{dataset}.table1',
                     columns=['id', 'yearMonth', 'price'],
                     filters=[("id", "==", id)])
# テーブルtable2を取得                                           
table2 = bpd.read_gbq(f'{dataset}.table2',
                      columns=['id', 'yearMonth', 'price'],
                      filters=[("id", "==", id)])
# 2つのテーブルをFULL OUTER JOIN                                    
joined_df = table1.merge(
    table2, on=['id', 'yearMonth'],
    how='outer', sort=False,
    suffixes=('table1', 'table2'))

# priceの計算    
joined_df['price'] \
    = (joined_df['price_table1'].fillna(0) 
       + joined_df['price_table2'].fillna(0))

計算されたbigframes.DataFrameは内部でBlock(2次元のラベル付きデータ構造)を保持しています。このBlockが今まで加工したときの式を保持しています。その式から、SQLにコンパイルすることができ、 <data_frame>.sql で表現されます。このSQLも読み込んだ場合どうなるか比較してみましょう。

計測時間

GoogleSQL bigframe.pandas コンパイルされたSQL
ローカル avg 3.18[sec] avg 8.53[sec] avg 5.21 [sec]
BigQuery Studio avg 0.93[sec] avg 0.97[sec]
Colab Enterprise avg 6.36[sec]

書き心地

結果からいうと、この程度のクエリーであればGoogleSQLで記述したほうがわかりやすいです。しかし、コメント内の「priceの計算」のように、データ取得とデータ加工の結合度を下げるという目的は達成できそうです。

INSERT

GoogleSQL

先述した通り、INSERTするときは必ずSELECTがセットになるため、分離しづらいという課題がありました。

INSERT INTO dataset.Awesome_Table
(
    id,
    yearMonth,
    price,
)
WITH ...
SELECT ...

bigframes.pandas

翻ってbigframes.pandasは取得と計算が分離できるだけでなく、DataFrameに対して to_gbq をすることでBigQueryに保存できるので分離できます。

joined_df.to_gbq('dataset.awesome_table', project_id='your_project_id', if_exists='replace')

bigframes.pandasを使ったバッチ処理の課題

期待するメソッドがない

先ほどのGoogleSQLとbigframes.pandasの比較の例示として、簡単なSELECTとJOINをしていました。しかし、もう少し複雑な処理ではどうでしょうか。

  • DatetimeMethods型に加工するメソッドがない。 e.g. DATE_TRUNCができない
  • DateFrame 型にapplyメソッドはありますが、DataFrameGroupBy 型にはapplyメソッドがないため、group byした後にデータを加工するというようなことができません(pandasにはあります)

やっぱり遅い

計測時間から分かる通り、GoogleSQLで呼び出すより約2倍以上の時間がかかっています。原因は、read_gbqメソッドが原因です。GoogleSQLに記述した場合、バッチ処理を一気通貫で処理しきれるのに対して、read_gbq は毎度BigQuery APIを呼び出してBigQueryとのセッション通信が発生しています。結果として、オーバーヘッドが大きくなってしまいます。

まとめ

現状のeScanの計算のバッチ処理の背景から代替となりそうなbigframes.pandasの紹介をしてきました。そして、GoogleSQLと比較してみて課題点の解決になりそうなことが期待できましたが、まだpandasほど柔軟にデータ操作ができないこともわかりました。よって、現時点ではGoogleSQLから挿げ替えることは難しいですが、今後のBigQuery DataFramesの進化を追いつつ、より良い開発体験を得られるようにしていこうと考えています。

enechain では、プロダクトを一緒に創っていく仲間を募集しています。少しでもご興味・ご関心がございましたら、ぜひお気軽に以下からご応募よろしくお願いします。

herp.careers