この記事はenechain Advent Calendar 2024の10日目の記事です。
はじめに
enechain データサイエンスデスク エンジニアの藤村です。
我々データサイエンスデスクは、電力や燃料に関するデータ分析や予測モデルの構築などの他に、enechainの様々なビジネスをサポートする社内向けツールの開発・運用も行っています。機械学習や数理最適化を活用したアプローチを中心に、最近ではLLMの活用にも取り組んでいます。
本稿では、この取り組みでStreamlitアプリケーションをGKEでホストするに至った経緯や、その運用について紹介します。
なぜ Streamlit なのか
社内の業務を支援するツールを構築する際、常に課題となるのが「人間の判断をどのように介在させるか」という点です。ドメインやアルゴリズムの性質上、自動化が難しい部分も多く、人間の判断を介在させる必要のある場面が多々あります。
これまではSlack botを活用したHuman in the Loopの仕組みを構築してきました。Slackという身近なインターフェースを使うことで、利用者の学習コストを抑え、Python SDKを利用することで開発コストも抑えられました。しかし、複雑な入力フォームやリアルタイムなデータ可視化など、より高度なインタラクティブ性を実現しようとすると限界がありました。例えば、対話的なインターフェイスやグラフの動的更新といった機能は、Slack botでは実装が困難でした。
そこで我々は「データサイエンティストが素早く(= Pythonで)インタラクティブなツールを自作できる環境」の必要性を感じ、その要件を満たすフレームワークとしてStreamlitを選択しました。選定の理由としては、(1) グラフやdataframeなどの描画機能がリッチであること、(2) 歴史が長く、利用者も多いため情報が豊富にあることです。
構築するアプリケーションの要件
現在、我々は複数の社内向けアプリケーションを管理しています。最近の例では「まとめるくん」というシンプルな音声文字起こしサービスを開発しました。このツールは、動画・音声ファイルをアップロードすると要約と全文文字起こしを生成してくれます。裏側では次のような流れで処理をしています。
- (拡張子次第で)動画ファイルから音声を抽出
- 音声が長ければ分割
- GCSにアップロード
- Vertex AI Gemini APIで要約と文字起こしを生成
これ以外にも様々なサービスを開発・運用しています。サービスごとに要件はそれぞれで異なりますが、社内向けサービス共通の要件として次のようなものがあります。
- アクセス制御
- 基本的に社員のみにアクセスを制限
- 特定業務に紐づくアプリケーションは、関連メンバーのみにアクセス権を付与
- センシティブな情報を扱うケースでは、さらにIPアドレスによる制限も追加(オフィスWi-FiまたはVPNからのアクセスのみ許可)
- 開発・運用効率
- シンプルなアプリケーションを複数作るため、1つ1つのアプリケーションの立ち上げが容易であること
- 少人数チームで運用しているため、インフラやセキュリティなど、アプリケーションの本質とは関係ない部分の管理コストを低く抑えたい
システム構成
今回の対象はすべて社内向けのツールですが、他の社外向けプロダクトと共通のGKEクラスタ上で運用しています。 この選択には次のようなメリットがあります。
- 開発速度の向上
- 社内のInstant Universeという仕組みを活用することで、開発速度を向上させています。共通のボイラープレートを使用することで、新規アプリケーションの立ち上げがスムーズで、かつ統一感のあるものになります。
- セキュリティ
- Cloud Armorを利用してIPアドレスによるアクセス制限を実装しています。共通基盤側であらかじめ定義された「オフィスまたはVPNからのアクセスのみを許可する」というSecurityPolicyを利用することで、本番プロダクトと同等のセキュリティ管理を簡単に適用できます。
- 運用負荷の軽さ
- クラスタ全体の管理は専属チームが担当しているため、こちらでインフラの面倒を見る必要がありません。また、他の社内外向けサービスやjobと同じ仕組み・体制でモニタリングができ、後述するログ基盤との連携も容易です。
アクセス制御にはCloud IAPを採用しました。社内の他プロダクトではAuth0による認証とprotobuf製の共通基盤による認可の組み合わせによる実装が一般的ですが、Streamlitで同様の実装をすることは工数と信頼性の観点から避けたかったため、この選択に至りました。
Cloud IAPは、GCPのIdentity-Aware Proxyを利用して、GCPのリソースに対するアクセスを認証・認可するサービスです。GKEのIngressリソースに対して設定することで、GCPの認証機構を利用してアクセス制御を行うことができます。BackendServiceを紐づけるだけで設定できるので、アプリケーション側での実装は不要です。アクセス許可設定はBackendService × google groupの単位で設定できるので、「特定のアプリケーションに対して、特定の部署の社員のみからのアクセスのみ許可」といった設定を実現できます。
Cloud IAP × Streamlitでのロギング実装
Cloud IAPはアクセス制御のために導入したものですが、副次的な効果としてアクセス者の情報をrequest headerから取得できます。社内ツールなので必須ではありませんが、利用実態の把握や簡易的な監査ログのため、各ページへのアクセス情報(誰が・いつ・どのページへ)を記録することにしました。
ロギングの実装
Streamlitアプリケーション内でアクセス者の情報を取得し、ログに記録するための関数を次のように実装しました。
from logging import getLogger import streamlit as st from streamlit import config from streamlit.runtime.scriptrunner_utils.script_run_context import get_script_run_ctx logger = getLogger(__name__) def get_session_id() -> str: """Get the session ID of the current websocket connection""" return get_script_run_ctx().session_id def get_host() -> str | None: """Get the host of the current websocket connection""" return st.context.headers.get("Host") def get_user_id() -> str | None: """Get the user ID of the current websocket connection""" return st.context.headers.get("X-Goog-Authenticated-User-Id") def get_user_email() -> str | None: """Get the user email of the current websocket connection""" return st.context.headers.get("X-Goog-Authenticated-User-Email") def get_base_url_path() -> str | None: """Get the base URL path of the current server options""" return config.get_options_for_section("server").get("baseUrlPath") uri = f"{get_base_url_path()}/matomeru_kun" log_format = { "host": get_host(), "uri": uri, "user_id": get_user_id(), "user_email": get_user_email(), } logger.info(f"accessed streamlit page: {uri}", extra=log_format)
ロギング実装の省力化
我々はStreamlitのmulti page appを構築しており、前述のロギング処理は各ページファイルから呼び出す必要があります。しかし、この処理を毎回記述するのは面倒ですし、記述が漏れるリスクもあります。そこで、次のスクリプトをsitecustomize moduleとして配置しました。streamlitのbackend通信ごとに実行されるscript runnerのget_bytecodeをwrapしてlogging処理を追加しています。sitecustomize moduleとして定義することで、このwrapping処理もアプリの起動時に自動的に適用されます。
from collections.abc import Callable from functools import wraps from logging import getLogger from pathlib import Path from typing import Any from streamlit.runtime.scriptrunner.script_cache import ScriptCache from frontend.utils import server_config logger = getLogger(__name__) def access_log(func: Callable) -> Callable: """Wrapping script runner to write access log""" @wraps(func) def wrapper(*args: Any, **kwargs: Any) -> Any: try: _, script_path = args script_name = Path(script_path).stem _access_log(script_name) return func(*args, **kwargs) except Exception as e: logger.error(f"Application error: {e}", exc_info=True) return wrapper def _access_log(script_name: str | None) -> None: """Write access log to logger""" uri = f"{server_config.get_base_url_path()}/{script_name}" log_format = { "host": server_config.get_host(), "uri": uri, "user_id": server_config.get_user_id(), "user_email": server_config.get_user_email(), } logger.info(f"accessed streamlit page: {uri}", extra=log_format) original = ScriptCache.get_bytecode ScriptCache.get_bytecode = access_log(original)
ログの活用例
GKEのコンテナログは、共通ログ基盤によってBigQueryに永続化されています。ここに蓄積されているアクセスログをBIツールで可視化することで、アプリケーションの利用状況を把握し、改善の方針を立てることができます。
1日1回、特定の業務で利用されているアプリケーションの利用状況をモニタリングしているダッシュボードの一例です。アプリケーションの操作性やロジックが適切でないと作業時間が伸びるはずだという仮説に基づき、作業時間のトレンドを改善要否の判断材料として見ています。
おわりに
本稿では、StreamlitアプリケーションをGKEでホストする際のアクセス制御とロギングの実装について紹介しました。社内向けツールの開発・運用において、アクセス制御やログの取得は重要な要素です。これらの実装を通じて、アプリケーションの利用状況を把握し、改善の方針を立てることができるようになりました。本ブログの内容が、社内向けStreamlitアプリケーションの開発に携わる方々の参考になれば幸いです。
enechainでは、事業拡大のために随時仲間を募集しています。興味がある方はぜひお声がけください!