はじめに
enechain データサイエンスデスク エンジニアの藤村です。
enechainでは市場活性化を目的として、機械学習や最適化アルゴリズムを用いて電力や燃料などの商品に関する指標を算出し、社内外に提供しています。本稿では、これらを算出するモデルの構築・運用を効率化するために作成した、タスクランナーinvokeとパラメータ管理ツールhydraを一体化したシステムを紹介します。
背景
タスクランナーを導入するモチベーション
機械学習モデルの構築・運用において、データサイエンティストがモデル開発を、MLエンジニアがパイプライン構築や処理効率化を担当するなど、複数の開発者が適切に役割分担して並列で作業を進めることで、高速な価値提供が期待できます。しかし、開発者それぞれの間で何の取り決めもなく並列で開発を進めてしまうと、お互いの成果物の受け渡しが煩雑になったり、環境の違いによる想定外の不具合が発生したりと、円滑な連携が難しい場合があります。
これらの問題を解消するため、MLパイプラインのステップ単位での「タスク」としてインターフェイスを予め定義しておきます。タスクにはモデルのロジックは記載せずに、処理の並列化や依存関係、パイプラインエンジン特有の定義などを記載します。データサイエンティストとMLエンジニアはそれぞれの開発に集中しながら、異なる環境(ローカル・クラウド)でも 「タスク」を叩くだけでMLパイプラインを実行できることが望ましいです。
パラメータ管理ツールを導入するモチベーション
モデルのハイパーパラメータや前処理・評価指標の設定など、実験や運用には多くのパラメータ管理が伴います。単純なテキストファイルや環境変数でもある程度は管理できますが、パラメータファイルの階層化・分割や、他パラメータの再利用、環境ごとの値の切り替えといった高度な機能を持ったツールを導入することで、効率的かつ見通し良くパラメータを管理することができます。
実現したいこと
本稿では、タスクランナーとパラメータ管理ツールを統合することで、次のような機能の実現を目指します。
モデルや環境に依存しないタスクによるパイプラインの操作
モデルの「学習」や「推論」などのタスクをCLIから呼び出すことでパイプラインを操作します。モデルや環境の名前、実行時にしか与えられないパラメータ(対象日次など)はコマンドの引数として与えますが、それ以外のモデル・環境別の差異は全てコンフィグファイルに吸収させます。
共通部分と環境特有部分を分離したパラメータ定義
localやdevelop, productionなどの複数の環境でモデルを動作させる場合、パラメータの大部分は共通である一方で、環境ごとに異なるパラメータも一部存在します。同じようなパラメータを環境ごとに繰り返し定義するのは冗長であり、またパラメータの変更漏れや環境間の差異が生じるリスクがあります。そこで、ベースのパラメータは1つとし、環境特有の部分は差分として定義することで、パラメータの再利用性を高めます。この考え方はKustomizeのpatchと似たものです。
パラメータ定義の構造化
パラメータファイルにはモデルのハイパーパラメータや前処理・評価の設定など、多くのパラメータを定義する必要があります。これらのパラメータを効率的に管理するため、パラメータをトピック単位でグループ化し、ディレクトリやYAMLのマッピング形式でネスト構造を作ります。この階層構造により、パラメータ群を論理的に分割でき、可読性と保守性が向上します。
さらに、ある箇所で定義されたパラメータ値を、別の箇所から参照できる機能を活用します。これにより定義の冗長性を排除でき、一か所の変更で関連パラメータを一括で更新できます。
一方で、コード内からはそれらがマージされた単一のコンフィグオブジェクトにアクセスします。パラメータファイルの分割や参照元をコード側で意識することなく、実装をシンプルに保ちます。
実装方法
利用するツール
タスクランナーにはinvokeを採用しました。invokeはPython製のタスクランナーであり、Pythonの関数をタスクとして登録し、CLIから呼び出すことができます。似たような手段としてsetupによる自作cliやdoit、clickなどが挙げられますが、invokeは拡張機能が豊富で、subcommandや依存関係の定義、yamlでの設定ファイルの記述ができる、などのメリットがあります。
invokeにもパラメータ管理の機能がありますが、パラメータの構造化や参照機能が不足しているため、パラメータ管理にはhydra(とそのバックエンドであるomegaconf)を採用しました。hydraは機械学習モデルのハイパーパラメータ管理を目的としたツールですが、パラメータの構造化や参照機能が強力であり、環境ごとのパラメータの差分管理や、パラメータの再利用性を高めることができます。
invokeとhydraのそれぞれがタスクランナーとパラメータ管理の機能の両方を持っていますが、それぞれ強みがあり、それらを統合して良いとこどりをすることで、効率的なタスクランナーとパラメータ管理システムを構築することができました。
パラメータファイル
パラメータファイルはyaml形式で、次のようなディレクトリ構造で定義します。
. ├── configs │ ├── pipelines │ │ ├── pipeline_1.yaml │ │ └── pipeline_2.yaml │ ├── pipelines.yaml │ ├── prod.yaml │ └── sandbox.yaml └── invoke.yaml
invoke.yaml
には全モデル・環境のベースとなるパラメータを定義します。gcp project名やMLflow Trackingのエンドポイントは環境ごとに切り替えられるように定義しておきます。
# invoke.yaml defaults: - _self_ - configs@pipelines: pipelines version: ${rc.pyproject_value:tool.poetry.version} # poetry package version env: dev user: ${oc.env:USER, runner} gcp: project: xxx-${env} train: mlflow: use: True tracking_uri: https://mlflow.${env}.xxx.com output_dir: trained_models
configs/
ディレクトリ下には、モデル(パイプライン)特有のパラメータと環境特有のパラメータを定義します。
# configs/prod.yaml env: prod
prod.yaml
ではinvoke.yaml
で定義したパラメータの${env}
の部分をprod
に上書きするだけです。sandbox環境(開発者個人ごとに用意した実験用環境)のgcp projectは${env}
の置き換えだけでは対応できないため、あらためて定義しています。
# configs/sandbox.yaml gcp: project: xxx-sandbox-${user} train: mlflow: use: False
パイプラインごとのパラメータはconfigs/pipelines/
ディレクトリ下に定義します。それぞれのモデルに必要なパラメータを定義する想定ですが、空でも大丈夫です。
# configs/pipelines.yaml defaults: - pipelines@pipeline_1: pipeline_1 - pipelines@pipeline_2: pipeline_2
# configs/pipelines/pipeline1.yaml param1: xxx
# configs/pipelines/pipeline2.yaml
これらのパラメータファイルを用意したところで、configを表示する簡単なタスクをinvokeで定義してみます。
# tasks.py from collections.abc import Iterable import invoke import yaml from invoke.config import DataProxy def config_to_dict(config: DataProxy, keys: Iterable[str]) -> dict: """invokeのconfigをdictに変換する""" return { key: config_to_dict(val, val.keys()) if isinstance((val := config.get(key)), DataProxy) else val for key in keys } @invoke.task def print_config(c: invoke.Context) -> None: print( yaml.dump( config_to_dict( c.config, ("version", "env", "user", "gcp", "train", "pipelines"), ) ) )
このタスクを実行してみると次のような結果が得られます。${env}
が残っていたり、pipelines
がnull
になっていたりと、invokeだけの機能では他パラメータ値の参照や、複数ファイルのマージができないことがわかります。
$ inv print-config env: dev gcp: project: xxx-${env} pipelines: null train: mlflow: tracking_uri: https://mlflow.${env}.xxx.com use: true output_dir: trained_models user: ${oc.env:USER, runner} version: ${rc.pyproject_value:tool.poetry.version} $ inv -f configs/sandbox.yaml print-config env: dev gcp: project: xxx-sandbox-${user} pipelines: null train: mlflow: tracking_uri: https://mlflow.${env}.xxx.com use: false output_dir: trained_models user: ${oc.env:USER, runner} version: ${rc.pyproject_value:tool.poetry.version}
構造化パラメータのマージ処理の実装
最初に、pyproject.toml
に記載された任意の値を取得する処理をomegaconfのresolverとしてtasks.py
に定義します。実装の詳細は割愛しますが、これによってパラメータファイル内の記述からomegaconfがpyproject.toml
の値を自動的に解決し、参照することができます。
import os from typing import Any import toml from omegaconf import OmegaConf def get_pyproject_value( project_dir: str, args: str | None = None ) -> Any | None: """pyprojectで管理している設定値を返す Args: project_dir: Project Directory Path args: Pyproject args string (ex. tool.poetry.version) Returns: pyproject value """ pyproject_path = f"{project_dir}/pyproject.toml" if not args: return None if not os.path.exists(pyproject_path): return None with open(pyproject_path) as f: pyproject_toml = toml.load(f) value = pyproject_toml try: for arg in args.split("."): value = value[arg] return value except KeyError: return None def _register_omegaconf_resolver(project_dir: str) -> None: """OmegaConf Custom Resolverを登録 Args: project_dir: Project Directory Path """ def _get_pyproject_value(args: str | None = None) -> str | None: """pyprojectで管理している設定値を返す Args: args: Pyproject args (ex. tool.poetry.version) Returns: pyproject value """ return get_pyproject_value(project_dir, args) # Register OmegaConf OmegaConf.register_new_resolver( "rc.pyproject_value", _get_pyproject_value, replace=True )
次に、invokeのconfigをhydraで読み込んだconfigで上書きする処理をtasks.py
に追加します。一度hydra (omegaconf) でyamlを読み込んでresolverや階層化コンフィグのマージ処理を行ってから、その結果をinvokeのconfigにマージします。
from hydra import compose, initialize_config_dir from omegaconf import DictConfig def _merge_config( invoke_config: invoke.Config, hydra_config: DictConfig ) -> None: """Invoke configをHydra configの内容で上書きする Args: invoke_config: Invoke config hydra_config: Hydra config """ for key, val in hydra_config.items(): if isinstance(val, DictConfig): if key in invoke_config: _merge_config( invoke_config[key], OmegaConf.to_container(val, resolve=True), ) else: invoke_config.update( {key: OmegaConf.to_container(val, resolve=True)} ) else: invoke_config.update({key: val}) @invoke.task def update_config(c: invoke.Context) -> None: """c.configの中身をHydraで読み込んだもので上書きするtask Args: c: invoke.Context """ project_dir = getattr(c.config, "_project_prefix") runtime_path = getattr(c.config, "_runtime_path") _register_omegaconf_resolver(project_dir) if os.path.exists(os.path.join(project_dir, "invoke.yaml")): # projectのrootにinvoke.yamlがある場合 with initialize_config_dir(config_dir=project_dir, version_base=None): hydra_config = compose("invoke") else: hydra_config = None if runtime_path is not None: # -fオプションで異なるyamlが指定された場合はinvoke.yamlの内容を上書きする runtime_dir = os.path.dirname(runtime_path) runtime_name = os.path.basename(runtime_path) with initialize_config_dir( config_dir=os.path.join(project_dir, runtime_dir), version_base=None ): override_config = compose(runtime_name) if hydra_config is not None: hydra_config = OmegaConf.merge(hydra_config, override_config) else: hydra_config = override_config if hydra_config is not None: _merge_config(c.config, hydra_config)
最後に、update_config
タスクをprint_config
タスクの前に実行するようにinvokeのタスク定義を変更すれば完成です。
@invoke.task(pre=[update_config]) def print_config(c: invoke.Context) -> None: print( yaml.dump( config_to_dict( c.config, ("version", "env", "user", "gcp", "train", "pipelines"), ) ) )
$ inv print-config env: dev gcp: project: xxx-dev pipelines: pipeline_1: param1: xxx pipeline_2: {} train: mlflow: tracking_uri: https://mlflow.dev.xxx.com use: true output_dir: trained_models user: fujimura version: 0.1.0 $ inv -f configs/prod.yaml print-config env: prod gcp: project: xxx-prod pipelines: pipeline_1: param1: xxx pipeline_2: {} train: mlflow: tracking_uri: https://mlflow.prod.xxx.com use: true output_dir: trained_models user: fujimura version: 0.1.0 $ inv -f configs/sandbox.yaml print-config env: dev gcp: project: xxx-sandbox-fujimura pipelines: pipeline_1: param1: xxx pipeline_2: {} train: mlflow: tracking_uri: https://mlflow.dev.xxx.com use: false output_dir: trained_models user: fujimura version: 0.1.0
実際の動作としてはこれで十分ですが、taskを定義する際に@invoke.task(pre=[update_config])
を毎回書くのは面倒かもしれません。その場合は、invoke.task
デコレータをカスタマイズして、デフォルトでupdate_config
を前提処理として実行するようにすることができます。
def task(*args, **kwargs) -> Callable[[...], Callable]: """Hydraを利用するためのカスタムinvoke task @taskを用いたときにdefaultでupdate_configを事前に実行する """ pre = kwargs.pop("pre", []) pre.append(update_config) return invoke.task(*args, pre=pre, **kwargs) @task def print_config(c: invoke.Context) -> None: print( yaml.dump( config_to_dict( c.config, ("version", "env", "user", "gcp", "train", "pipelines"), ) ) )
おわりに
本記事では、タスクランナーとパラメータ管理を統合することで、モデルの構築・運用を効率化する方法を紹介しました。タスクランナーにはinvoke、パラメータ管理にはhydraを採用し、それぞれの強みを生かしてシステムを構築しました。これにより、モデルや環境に依存しないタスクによるパイプラインの操作、共通部分と環境特有部分を分離したパラメータ定義、パラメータ定義の構造化といった機能を実現しました。
enechainのデータサイエンスデスクでは、データサイエンスを活用してエネルギーのマーケットを作り上げていく仲間を募集しています。