この記事は enechain Advent Calendar 2023 の6日目の記事です。 昨日は@YamatoのNode.js上でのBigQueryのスキーマ管理でした。
はじめに
enechain データサイエンスデスク エンジニアの藤村です。
enechainでは市場活性化を目的として、機械学習や最適化アルゴリズムを用いて電力などの商品に関する指標を算出し、社内外に提供しています。その中でも特に正確さが求められる数値については、社外に公開する前に社内のドメインエキスパートによるチェックを必須としています。本稿では、そのような運用を効率化するためにSlackを利用して構築した「Human in the Loop (HITL)」機構について紹介します。
背景
なぜHuman in the Loopが必要か
我々は、電力や燃料などのエネルギーに関するマーケットの指標を自動算出する複数のシステムを構築・運用しています。その中には、人間の介在を必要としない自動のシステムを組むのが難しいものもあります。その理由の1つとして、扱う商品のボラティリティが高く、過去傾向からは説明できない規模で急変しうることが挙げられます。例えば、国内の代表的な電力価格指標であるJEPXスポット市場約定価格は次に示すように2021年1月に急騰し、その後も世界的な燃料価格高騰などの影響を受け、足元と5年前とでは価格の相場や変動の傾向が大きく異なっています*1。
加えて、システムで算出する数値には社内外のビジネスの根幹で利用されているものもあり、数値の可用性や正確性(妥当さ)には高い要求があります。そのため、算出した数値の品質を自動でチェックする機構を設けていますが、前述のような理由から、単純なルールベースの方法や過去傾向との比較では完璧な異常値の判定は実現できません。例えば、「過去n年間の平均・分散で決めたレンジから逸脱していないか」のような異常値判定の方法が考えられますが、この方法だと正当な指標の急変を異常値として判定してしまったり、逆にそれを防ぐために基準を緩めると本当の異常値を検出し逃してしまうかもしれません。「常に正確な数値を出し続けたい」という要件を満たすためには、このチェックの部分でドメインエキスパートに協力していただくことが不可欠だと判断し、HITLシステムを構築することにしました。
HITLとは、機械学習などの自動化プロセスにおいて、完全な自動化が困難な課題に対処するために人間がシステムに介入し、必要に応じて判断や修正を行う仕組みを指します。本稿で紹介するシステムでは、自動で算出された指標価格に対して「公開可否の判断」と、公開不可の場合は「修正値の作成」の2つの部分でドメインエキスパートが介入しています。
なぜSlackで構築するのか
今回、HITLのインターフェースとしてSlackを採用しました。数値のチェックや修正に関するやり取りは、メッセージにExcelを添付する形で行っています。この選択にはいくつかの理由があります。まず、enechain社内のコミュニケーションがSlackに一本化されていることが挙げられます。これにより、ドメインエキスパートによるHITL運用の学習コストが低くなり、日々の他の業務への影響を最小限に抑えることができます。また、開発面では相応のスピードが求められる中、UIを作ることが専門ではないデータサイエンスデスクのエンジニアでも、後述するSlack Boltを利用することで手軽にインタラクティブなシステムを構築できる点が魅力的でした。
構築したシステムの説明
今回構築したHITLシステムの構成と主な動きを次のシーケンス図に示しています。
- パイプラインは定時実行により指標を算出し、データベースの算出結果一次保存用テーブルにその結果を保存します。この部分の具体的なシステム構成についての詳細は過去の記事を参照してください。
- 算出した指標(数十レコードのテーブルデータ)を格納したExcelファイルを添付したボタン付きのメッセージをSlackに投稿します。このExcelファイルは、ドメインエキスパートが確認しやすいようにデータを整形して表示し、修正値のアップロード時の雛形としての役割も果たします。
- ドメインエキスパートはSlackメッセージに添付されたExcelファイルを開き、算出された指標を外部に公開してよいかを判断します。公開が適切であれば
Accept
、修正が必要であればReject
ボタンをクリックします。 - Bolt Appはボタンクリックイベントを受け取り、押されたボタンがAcceptであればデータベースから①で保存した結果を読み出し、公開用のテーブルにコピーします。
- 押されたボタンが
Reject
の場合は、Bolt AppはSlackへの返答メッセージを投稿し、次のイベントを待ち受けます。ドメインエキスパートは②で投稿されたExcelファイルの数値を修正し、それと特定のコマンドメッセージ(ここでは!publish-product1
とします)をSlackに投稿します。ここではSlashコマンドを利用することも考えられましたが、添付ファイルを処理できなかったため、自前でコマンドを実装しています。 - Bolt Appは
!publish-product1
が含まれたメッセージを検知すると、添付Excelファイルをパースしてデータベースの公開用テーブルに保存します。
詳細な実装の説明
本節では、前節で登場した算出パイプラインとBolt Appについて、コードを交えてより詳細に説明します。
まず、算出パイプラインとBolt App共通で使うClassを定義します。
import datetime import enum from pydantic import BaseModel @enum.unique class Product(enum.Enum): PRODUCT1 = "product1" PRODUCT2 = "product2" class InquiryPayload(BaseModel): date: datetime.date run_id: str @enum.unique class InquiryAction(enum.Enum): ACCEPT = "Accept" REJECT = "Reject"
Product
は我々が複数扱う指標それぞれを識別するためのEnumです。
InquiryPayload
はデータベースから1実行単位での算出結果を特定するためのキーです。date
は指標の算出日、run_id
はKubeFlowパイプラインにおける実行ごとのユニークidです。これらの情報をSlackアクションのpayloadに乗せて算出パイプラインからBolt Appに伝えることで、④の公開処理において、③でドメインエキスパートがチェックしたものと同じレコードを確実に特定できるようにしています。また、このペイロードはAPIを通じてやり取りするので、JSONとの相互変換がしやすいようにpydanticのModelとして定義しています。
算出パイプラインの実装
算出パイプラインでは前述のように指標を算出・保存し、ドメインエキスパートに見せるためのExcelファイルを作成しますが、ここでは省略してメッセージングの部分に着目します。この関数は、生成したExcelファイルのpath、InquiryPayload、Slackメッセージを投稿する宛先のチャンネルIDを受け取り、ドメインエキスパート宛に算出結果やアクションボタンを含んだメッセージを送信します。
from pathlib import Path from slack_sdk.web import WebClient def send_inquiry_message( product: Product, file_path: Path, inquiry_payload: InquiryPayload, channel: str, ) -> None: blocks = [ { "type": "section", "text": { "type": "mrkdwn", "text": "上に投稿された指標をチェックし、 `Accept` または `Reject` を押してください。", }, }, { "type": "actions", "block_id": f"inquiry_{product.value}_actions", "elements": [ { "type": "button", "action_id": action.value, "text": { "type": "plain_text", "text": action.value, }, "confirm": { "title": { "type": "plain_text", "text": "Are you sure?", }, "text": { "type": "mrkdwn", "text": f"本当にこの指標を{action.value}しますか?", }, "confirm": { "type": "plain_text", "text": action.value, }, "deny": { "type": "plain_text", "text": "Cancel", }, }, "style": { InquiryAction.ACCEPT: "primary", InquiryAction.REJECT: "danger", }[action], "value": inquiry_payload.json(), } for action in InquiryAction ], }, ] slack_client = WebClient() slack_client.files_upload( file=file_path.as_posix(), channels=channel ) slack_client.chat_postMessage(channel=channel, blocks=blocks)
ここで、SlackメッセージはBlock Kitを使って構築しています。テキストやボタンの装飾・配置を宣言的に記述できるため、手軽にUIを作れます。下図のように、モバイルでも視認性・操作性を損なわずに表示してくれます。
blocks
には確認ダイアログの定義も含めています。これはミスクリック1回で公開処理が走ってしまうことを防ぐための措置です。次の画像のように、ボタンを押すと確認ダイアログが現れるため、計2回ボタンを押さないとアクションを発行できないようになっています。
ボタンが押されたときのvalueとしてInquiryPayload
をJSON化して渡しています。Bolt App側でボタンイベントを処理するときに、どの実行単位で算出された数値を処理するのかを特定するための情報です。
Bolt Appの実装
Bolt Appの各部分の実装の前に、イベントのハンドリングに必要なClassを定義します。
import enum from typing import List, Optional from pydantic import BaseModel, Extra from typing_extensions import Literal class SlackUser(BaseModel): class Config: extra = Extra.allow id: Optional[str] = None class SlackActionContentText(BaseModel): class Config: extra = Extra.allow type: Optional[str] = None text: Optional[str] = None class SlackActionContent(BaseModel): class Config: extra = Extra.allow block_id: Optional[str] = None action_id: Optional[str] = None value: Optional[str] = None text: Optional[SlackActionContentText] = None class SlackActionPayload(BaseModel): type: Literal["block_actions"] token: str user: Optional[SlackUser] = None actions: Optional[List[SlackActionContent]] = None @enum.unique class SlackKeywordCommand(enum.Enum): PUBLISH = "publish"
SlackUser
, SlackActionContentText
, SlackActionContent
, SlackActionPayload
はSlack 側のデータモデルの定義に合わせて、処理に必要なフィールドを抜粋して定義しています。ここでもpydanticのModelとして定義することで、後述のようにパース時の手間が少し省けます。
SlackKeywordCommand
は修正値のアップロードに使うコマンドメッセージのキーワードです。主にメッセージイベントのハンドリングの部分で使用します。
Socket Modeでの立ち上げ
Bolt AppとSlackの通信にはHTTPではなくSocket Modeを利用しています。Socket Modeでは、Bolt AppがSlackとWebSocket接続を確立し、その接続を介してリアルタイムにイベントを受信できます。HTTP方式の場合はエンドポイントを作成しpublicに公開する必要がありますが、Socket Modeだとpublicなエンドポイントを公開する必要がないため、よりセキュアかつ手軽に構築できます。
from slack_bolt import App from slack_bolt.adapter.socket_mode import SocketModeHandler app = App() handler = SocketModeHandler(app)
ボタンイベントのハンドリング
Slackでのblock actionのうち、block_id
がinquiry_product1_actions
のようなものを対象に処理をします。
import json import re from typing import Any from slack_bolt import Ack, Respond, Say @app.block_action( { "block_id": re.compile( f"inquiry_({'|'.join(product.value for product in Product)})_actions" ), "action_id": re.compile("[a-zA-Z0-9_+]"), } ) def handle_block_action( ack: Ack, say: Say, respond: Respond, body: dict[str, Any], action: dict[str, Any], ) -> None: ack() action_form = SlackActionPayload(**body) action_content = SlackActionContent(**action) inquiry_action = InquiryAction(action_content.action_id) respond( f"<@{action_form.user.id}> が {action_content.text.text} をクリックしました。" ) if inquiry_action == InquiryAction.ACCEPT: inquiry_payload = InquiryPayload.parse_raw( action_content.value ) # inquiry_payloadから特定されるデータを読み込んでpublishする(省略) elif inquiry_action == InquiryAction.REJECT: say( f"{action_content.block_id}がRejectされました。" f"`!{SlackKeywordCommand.PUBLISH}-{action_content.block_id}` " "コマンドを用いて修正したExcelファイルをアップロードしてください。" ) else: raise ValueError( f"Invalid InquiryAction ({inquiry_action})." )
- actionを処理する場合、必ず3秒以内に確認応答を返す必要があります。特に理由がなければお約束的に関数の先頭で
ack()
を呼びましょう。 SlackActionPayload(**body)
で、ネストされたdictをネストされたpydantic modelに一括変換しています。このためにpydanticを使っていると言っても過言ではありません。- Slack上から誰が何のボタンを押したのかわかるように「@dareka が {Accept|Reject} をクリックしました。」というメッセージを投稿しています。
- Acceptボタンが押された場合は、公開対象の指標を特定する情報(
inquiry_payload
)をパースしてから、公開処理に進みます。 - Rejectボタンが押された場合は、基本的には何もしません。わかりやすさの観点から、「product1がRejectされました。
!publish-product1
コマンドを用いて修正したExcelファイルをアップロードしてください。」というメッセージをSlackに投稿します。
メッセージイベントのハンドリング
Slackでのメッセージのうち、自前で定義した特定のキーワードを含むものを対象に処理をします。特定のキーワードとは、SlackKeywordCommand
とProduct
の組み合わせで表現され、本稿の例の場合は!publish-product1
と!publish-product2
の2通りです。
@app.message( re.compile( # !((command1|command2)-(product1|product2)) f"!(({'|'.join(command.value for command in SlackKeywordCommand)})" f"-({'|'.join(product.value for product in Product)}))", ) ) def handle_publish_and_fix_message( say: Say, message: dict, context: dict ) -> None: _validate_message_context(say, context) command_name = SlackKeywordCommand(context["matches"][0][1]) product = Product(context["matches"][0][2]) say(f"{product}の{command_name}を開始します。") for file_path in _download_and_get_file_path( say, message ): # file_pathのデータを読み込んでpublishする(省略) pass say(f":ok: {product}の{command_name}が完了しました。")
メッセージのバリデーションを行う _validate_message_context()
、およびメッセージに添付されたファイルの取得を行う _download_and_get_file_path()
は次のように実装しています。
キーワードを含んだメッセージのバリデーションでは次の2つのことを確認し、必要に応じてエラーをraiseして処理を止めます。
- メッセージの投稿者がbotでないこと
- 前述のように、Bolt Appから
!publish-product1
を含むメッセージを投稿することがあるため、その投稿に対して処理をすることを防いでいます。
- 前述のように、Bolt Appから
- メッセージ中に複数種類のキーワードが含まれていないこと
- 「
!publish-product1
!publish-product2
」のようなメッセージに対してはエラーを返します。- 「
!publish-product1
!publish-product1
」のように、複数のキーワードが含まれているが1種類だけの場合はOKとします。
- 「
context["matches"]
には、メッセージに含まれるキーワードが全て格納されているので、そのユニークな数で判定しています。
- 「
def _validate_message_context( say: Say, context: dict[str, Any] ) -> None: if context["user_id"] in ("Uxxxxxx", "Uyyyyyy"): raise ValueError( "Messages from bot users are skipped." ) if len(set(context["matches"])) > 1: say(":warning: 一回のメッセージに複数のキーワード処理を実行できません。") raise ValueError("Multiple keywords extracted.")
メッセージに添付されたファイルの取得には認証が必要です*2。ファイルを正常にダウンロードできたら、一時ファイルとして保存してからそのパスを返します。
from typing import Generator def _download_and_get_file_path( say: Say, message: dict[str, Any] ) -> Generator[str, None, None]: for file in message["files"]: file_url = file["url_private_download"] response = requests.get( file_url, headers={ "Authorization": "Bearer xxxx-xxxx-xxxx-xxxx" }, ) if response.status_code != 200: say(":warning: Excelファイルのダウンロードに失敗しました。") raise ValueError( f"Excel download error. status_code: {response.status_code}." ) file_path = os.path.join( "/tmp", os.path.basename(file_url) ) with open(file_path, mode="wb") as f: f.write(response.content) yield file_path
おわりに
今回の記事では、我々がSlack Boltを利用して構築したHuman in the Loop機構について紹介しました。
明日の記事の担当は @26takafuji さんです。乞うご期待!
enechainのデータサイエンスデスクでは、データサイエンスを活用してエネルギーのマーケットを作り上げていく仲間を募集しています。