データパイプラインをArgo Workflowsに移行した話

ogp

はじめに

enechainのデータプラットフォームデスクでエンジニアをしているakizhouです。

我々データプラットフォームデスクは業務の一部として、enechainが提供するeCompassと呼ばれる「エネルギー関連データ提供SaaS」で使用されるデータ収集および集計を行うデータパイプラインを開発運用しています。

以前の記事でも触れたように、このデータパイプラインは複数の課題を抱えており、現在はアーキテクチャから根本的再設計と移行を進めています。

今回は実際に決定したアーキテクチャとそれが解決する課題に関して、まだ移行途中ではありますが実行の基盤部分について紹介します。

そもそも何を移行しているのか

冒頭でも触れましたが、弊社にはeCompassというデータダッシュボードサービスがあります。 このサービスが提供するデータは主にETLパイプラインが外部からデータを収集したり処理しています。 またパイプラインにより処理されたデータはeCompass以外のサービスや社内の分析用として複数の利用先が存在します。

このパイプラインが扱うデータのフォーマットはテーブルデータのみで、現状データのパースと変換処理はすべてPythonで実装されており、実行はGoogle Cloud Functions上で行い、Pub/SubでCloud Functions間のトリガーを行うことによりパイプラインを構成しています。

old_gamma_architecture 現行アーキテクチャ

なぜ移行しているのかについて

次になぜ、現在パイプラインを移行しているのかについてです。

一般的にETLパイプラインにはデータの入口と出口の仕様が前提としてあり、その上でパイプラインの中身の処理としてパイプラインの両端のデータ仕様の変換を行うものだと思います。

今回のETLパイプラインの場合、出口の仕様が変わる頻度は高くない一方、外部のデータを取り込んでいる性質上、入口の仕様が変わることは頻繁にあります。 加えて仕様通りのデータが取得できる保証はなく、パイプラインがデリバーするデータの品質を担保するために入口のデータ品質にもそれなりに気を配る必要があります。

事実、外部起因による入口の仕様変更やデータの不具合、もしくは外部システム自体の不具合によりパイプラインの処理が正常に終了せずに復旧対応を行うことは数多く、社内の障害対応の大部分はこのETLパイプラインのものと言っても過言ではないです。

そういった背景の中障害の再発防止を含め現状のパイプラインの課題を整理したところ、実行環境/基盤に関しては以下のような課題を解決することが必要だと分かりました。

  • パイプラインの細かい実行制御ができない
  • 計算リソースに変えられない制限がある
  • 環境構築のコストが高い
  • 処理の状態を断片的にしかモニタリングできてない

以上の課題以外にも処理の構成やその他考慮したことがありますが、また別の機会に紹介したいと思います。

新 実行環境/基盤について

Argo Workflowsの起用

パイプラインの細かい実行制御の課題はCloud FunctionsをPub/Subで繋げていた構成から処理をワークフローエンジンに載せ替えることにしました。

ワークフローエンジンの選択としてはAirflowなども選択肢としてはありましたが、現行のパイプラインの運用がすでにかなり負荷になっていたため更に検証コストをかけるのではなく、社内の他サービスですでに採用されており立ち上げのコストを抑えられる点と実行制御という面では十分な機能を持っている点でArgo Workflowsにしました。

enechainにおけるArgo Workflowsの他システムでの採用例についてご興味があればこちらもご一読ください。

Cloud FunctionsからGKE+コンテナ化

Argo Workflowsを使用するにあたり実行環境をk8sに移すこととなったので計算リソースのコントロールは以前と比べては飛躍的に柔軟になりました。

また、Argo Workflowsで実行するためにアプリケーションのコンテナ化を行ったこと、k8sマニフェストを使用してワークフローなどのリソースを作成するようになったことによりアプリケーションレイヤーの変更に要する時間が大幅に短縮されて高頻度に変更ができるようになり新環境の構築も容易になりました。

実際に現行パイプラインでは40以上あるCloud Functionsの設定をterraformで管理しているためデプロイに少なくとも約20分かかりますが、Argo Workflowsではアプリケーションイメージのビルドを含め5-6分でデプロイを行うことができるようになっています。 その際にマニフェストの再利用を目的としたテンプレート作成を行っており、この後の実装パートで紹介します。

モニタリングシステムの統一

上記の現行アーキテクチャ図にあるようにOpsgenieとDatadogの2サービスをアラート送出とメトリクスモニタリング用途で導入しています。

これは初期の段階でパイプラインの実行時に異常が発生したらOpsgenieのAPIを叩くようにすでに実装された後にDatadog全社導入されたことが混在している事情としてあるのですが、今回の移行ではそれをDatadogに統一しています。

現行パイプラインのモニタリング・アラーティングの課題の中で最もクリティカルなものとして、ワークフローごとにモニタリングしづらいことがあります。 原因としては一つのワークフローの処理が複数のCloud Functionsにまたがり実行されているので、個々の処理の異常には気づけるがワークフロー単位でまとめて監視するのは難しく、アラートが発生時の影響範囲特定に時間がかかることが挙げられます。

これを解決するために今回Argo Workflowsではワークフローごとにアラートを仕分けるようにしています。 その際、Argo WorkflowsのメトリクスをDatadogに連携するにあたりある程度工夫が必要だったのでこちらも次の実装パートで詳細を紹介します。

実装

WorkflowTemplate manifestとtaskのテンプレート化

元々40近くのCloud Functionsで行なっていた処理をArgo Workflowsに載せ替えるにはそれなりの数のworkflowとtaskを定義する必要があります。 各々のtaskやworkflowでパラメーターが異なるとメンテナンスしづらくなるので、ワークフロー内のタスクはほとんど共通した環境変数やパラメーターを持つようにアプリケーション側で設計しています。 これによりタスク部分のmanifestを以下のように個別のテンプレートとして切り出して再利用できるようになりました。

task-template-ref.yaml

apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: task-template-ref
spec:
  templates:
  - name: task-template
    inputs:
      parameters:
      - name: target_module
    container:
      image: <image>
      command: [python, -m]
      args:
      - my_app.{{inputs.parameters.target_module}}
      env:
      - name: '<environment variable 1 name>'
        value: '{{workflow.parameters.variable1}}'
      - name: '<environment variable 2> name'
        value: '{{workflow.parameters.variable2}}'
  - name: task-template-2
    inputs:
      parameters:
      - name: target_module
    container:
      image: <image2>
      command: [python, -m]
      args:
      - my_app.{{inputs.parameters.target_module}}

上記のtask-templateをDAG内のtaskのテンプレートとして使用したWorkflowTemplateの定義は以下のようになります。

sample-workflow.yaml

apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: sample-workflow
spec:
  entrypoint: main

  arguments:
    parameters:
    - name: target_subpackage
      value: sample_module
    - name: variable_1
      value: <some_default_value>
    - name: variable_2
      value: <some_default_value>

  templates:
  - name: main
    dag:
      tasks:
      - name: task1
        templateRef:
          name: task-template-ref
          template: task-template
        arguments:
          parameters:
          - name: target_module
            value: '{{workflow.parameters.target_subpackage}}.task1'
      - name: task2
        dependencies: [task1]
        templateRef:
          name: task-template-ref
          template: task-template
        arguments:
          parameters:
          - name: target_module
            value: '{{workflow.parameters.target_subpackage}}.task2'

task-template-ref.yamlsample-workflow.yamlの紐付けはtemplateRefで行なっており、parameterなどが異なる場合や呼び出しているイメージが異なる場合はtask-template-2など複数のtaskのtemplateも定義できます。

また、workflowで使う変数としてはworkflow.parametersとinputs.parametersの2種類があります。 workflow.parametersの参照スコープは名前の通りworkflow全体になっており、それとは別にtemplateにinputsを設定することでよりスコープが狭いinputs.parametersも作成できます。 上記の例ではworkflow全体 (DAG内全てのタスクで共通) のものはworkflow.parametersに登録して、taskごとに変えたいものはinputs.parametersに登録するようにしています。

Workflowメトリクスの設定とDatadogでのアラート設定

今回の移行で達成したいモニタリングとアラートの必須最低要件は以下です。

  1. Workflowの種類ごとに成功率が監視できる
  2. Workflowの実行時間が監視でき、異常に長い場合に気付ける
  3. Workflowが失敗した際、再実行により復旧できた時に自動でアラートを解消する

Datadogでアラート管理を行い、これらの要件を達成するためには3つのカスタムメトリクスをArgo Workflows側から連携する必要がありました。

最終的にメトリクスの設定を含めたmanifestは以下のようになります。

apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: sample-module-wf
spec:
  entrypoint: main
  serviceAccountName: <your-service-account-name>
  activeDeadlineSeconds: 5400
  metrics:
    prometheus:
    - name: custom_finished_workflows_count
      help: count of workflows which already finished
      labels:
      - key: template_name
        value: '{{= sprig.regexReplaceAll("(.*)-[^-]*-[^-]*$", workflow.name, "$1") }}'
      - key: status
        value: '{{workflow.status}}'
      counter:
        value: '1'
    - name: custom_current_workflow_duration
      help: duration of current workflow
      labels:
      - key: template_name
        value: '{{= sprig.regexReplaceAll("(.*)-[^-]*-[^-]*$", workflow.name, "$1") }}'
      gauge:
        value: '{{workflow.duration}}'
    - name: custom_last_workflow_result
      help: result of workflow executed lastly
      labels:
      - key: template_name
        value: '{{= sprig.regexReplaceAll("(.*)-[^-]*-[^-]*$", workflow.name, "$1") }}'
      gauge:
        value: "{{= sprig.ternary('0', '1', workflow.status == 'Succeeded')}}"
...

Workflowの種類ごとに成功率が監視できる

Workflowの種類ごとの成功率を監視するためにtemplate_nameとworkflow.statusをlabelとして付与したcounterを設けています。

- name: custom_finished_workflows_count
      help: count of workflows which already finished
      labels:
      - key: template_name
        value: '{{= sprig.regexReplaceAll("(.*)-[^-]*-[^-]*$", workflow.name, "$1") }}'
      - key: status
        value: '{{workflow.status}}'
      counter:
        value: '1'

このメトリクスを設定することでworkflowの実行が終了した際にカウントがvalueで設定した数値分(上記では1)増えます。 Datadog上でメタデータ(template_nameとworkflow.status)ごとにグルーピングすれば以下のように成功率を算出できるようになります。

ddog_success_rate_config

Workflowの実行時間が監視でき、異常に長い場合に気付ける

Argo Workflowsでは実行中に取得できるリアルタイム実行時間とワークフロー終了時に確定する実行時間の2種類の実行時間関連のメトリクスを取得できます。

リアルタイム実行時間を取得したい場合

- name: custom_realtime_workflow_duration
  help: realtime duration of current workflow
  labels:
  ...
  gauge:
    realtime: true
    value: "{{duration}}"

ワークフロー終了後に確定する実行時間を取得したい場合

- name: custom_workflow_duration
  help: duration of current workflow
  labels:
  ...
  gauge:
    value: '{{workflow.duration}}'

ワークフローの実行時間が異常に長いことを監視したい場合、一見リアルタイム実行時間を監視した方が良いように思えますが、Datadogにこのリアルタイム実行時間を連携した場合、ワークフローが終了した後も一定時間メトリクスが残ってしまうため、メトリクスがDatadogに残留している間はアラートを解消できなくなってしまいます。 そこでリアルタイムの実行時間監視は諦めて、過去に計測した実行時間を参考にしてWorkflowTemplate自体にactiveDeadlineSecondsを設定してワークフローに実行上限時間を設けることにしました。 これにより異常に実行時間が長い場合はworkflowが上限時間超過でFail扱いになり、監視する必要があるものはワークフローの失敗だけとなります。

ただ、頻繁に上限時間を超えてしまうと計測してきた実行時間の統計値に影響してしまうため設定する値をメンテナンスする必要はあります。

Workflowのが失敗した際、再実行により復旧できた時自動でアラートを解消する

Datadogのアラート管理の仕組み上、手動でアラートを解消扱いにしてもメトリクスが異常を示している場合アラートが再送されてしまいます。 そこで失敗したworkflowの復旧が完了したら自動でDatadog側で復旧を検知してアラートを解消するようにする必要がありました。

個々のworkflow実行ごとの成功失敗を計測したい場合、上記先程触れたcustom_finished_workflows_countで事足ります。 しかしworkflow実行単位ではなくworkflowの種類ごとに成功したか否を継続的にヘルスチェックするためにはcounterではなく以下のようなgaugeメトリクスが必要になります。

- name: custom_last_workflow_result
  help: result of workflow executed lastly
  labels:
  - key: template_name
    value: '{{= sprig.regexReplaceAll("(.*)-[^-]*-[^-]*$", workflow.name, "$1") }}'
  gauge:
    value: "{{= sprig.ternary('0', '1', workflow.status == 'Succeeded')}}"

監視したい情報としては普及が必要か否かのバイナリー情報なのでternaryで値が0か1になるようにし、gaugeメトリクスなので個々のworkflow実行が終了した際に値が上書き更新されるようになります。 あとはDatadogの監視設定で以下のようにworkflow.statusがSucceeded以外の場合(上記では数値上1と同義)にアラート条件として指定すればworkflow種別ごとにヘルスチェックを行うことができます。

ddog_alert_config

実際にDatadog monitorのstatusで再実行による復旧が行われた時にメトリクス (紫線) が1から0になりAlertステータスからOKに変化していることが確認できます。

ddog_alert_status

おわりに

今回はデータパイプラインをCloud FunctionsからArgo Workflowsに移行した際に工夫をしたことについて紹介しました。 この他にもアプリケーションレイヤーやデータストアなど実行環境以外のアーキテクチャについても今後記事にしたいと思います。

このようにenechainではモダンな技術への関心を高く持ち、サービスの改善に繋げ、巨大なマーケットを支えるデータ基盤を一緒に構築する仲間を募集しています。もし興味を持っていただけました場合はぜひご応募お待ちしております!

herp.careers

herp.careers