この記事はenechain Advent Calendar 2024の11日目の記事です。
はじめに
enechainのデータプラットフォームデスクでエンジニアをしている菱沼です。現在はデータエンジニアとしてデータ基盤の構築をしています。
社内で活用できるデータ量が増加し、データ活用の場面が多様化する中で、業務の現場でデータをいかに効率的に活用できるかが重要な課題となっています。 本記事では、Google CloudのiPaaSであるApplication Integrationを活用し、 BigQueryからSalesforceにデータを連携する仕組みをどのように設計・実装したかをご紹介します。
技術選定や設計上の工夫を中心に、実際の構築事例をご紹介します。ぜひ最後までお付き合いください!
背景
enechainのデータプラットフォームチームでは、データを集約するための基盤の整備を進めており、効率的にデータ活用できる環境が着実に構築されています (このあたりのデータ基盤の全体像については、enechain Advent Calendarで明日公開予定の記事で詳細にご紹介するので、こちらも是非ご覧ください)。
同時に、プロダクトのユーザが徐々に増え、日々の利用が活発化する中で、プロダクトからBigQuery (以下「BQ」)に連携されるデータ量も増加していきました。 また、これらの蓄積されたデータを営業活動に活用することへの期待も高まっていました。
このような流れの中、営業チームではSalesforceの導入が開始され、ビジネスサイドの方が閲覧しやすく活用の幅の広いSalesforceにもデータを連携することとなりました。 データプラットフォームデスクとしては、BQに蓄積された顧客データをSalesforceに連携し、リバースETLを実現することが必要になりました。
技術選定
BigQueryのデータをSalesforceに連携するにあたり、技術選定しました。 具体的には、以下の4つの選択肢について比較しました。
1. Application Integration
Google Cloudが提供する軽量なデータ連携ツール (iPaaS)で、Salesforceを始めとしたさまざまなアプリケーションとの統合が可能です。 シンプルなバッチ処理や、複雑なETLが不要な場合に適しています。2023年夏に利用可能となった比較的新しいサービスです。
2. Cloud Data Fusion
Google Cloudが提供するマネージドETLツールで、ノーコード/ローコードで柔軟なデータパイプラインを構築可能です。 オープンソースのCDAPを基盤とし、Salesforceとの双方向連携が可能です。
3. Trocco®
データ統合の自動化が行えるSaaSです。Salesforceを含む多数のデータソースとデータ転送先に対応しています。 ノーコードで設定可能なGUIを通じてデータパイプラインを構築できるため、初期導入のハードルが低いです。
4. 自前実装
SalesforceのREST APIを直接利用してデータ連携の仕組みを構築します。 シンプルなデータ処理から複雑な要件まで、完全にカスタマイズ可能です。
それぞれの技術のメリット・デメリットを比較すると、以下のような表となりました。
技術 | メリット | デメリット |
---|---|---|
Application Integration | - 簡単な操作で連携を構築可能で、エンジニア以外でも扱いやすい - 他のGoogle Cloudサービスと統合しやすい |
- 複雑な処理や高度なカスタマイズには不向き |
Cloud Data Fusion | - 柔軟性が高い - Salesforceとの双方向連携が可能 - マネージドサービスで運用負荷が低い - 他のGoogle Cloudサービスと統合しやすい |
- Cloud Data Fusion インスタンスが作成されてから削除されるまでの時間に対して課金され、高額 |
Trocco® | - 幅広いデータソースに対応 - ビジネスユーザーでも操作可能 - 簡易にデータ連携を構築可能 |
- 月額で課金され、比較的高額(リバースETLコネクタを利用するには上位の料金プランを利用する必要あり - 新規SaaS導入の承認の過程が必要 |
自前実装 | - カスタマイズ性が高い - 必要最低限の機能に絞れるためコストを抑えやすい |
- 開発・運用コストが高い |
今回は以下の理由から、シンプルな連携をGUIベースで構築可能なApplication Integrationを利用することに決定しました。
- BQのデータを集計処理を挟まずSalesforceにそのまま連携することだけが要件であり、豊富な機能やコネクタが不要であったため
- 開発コストを抑えたかったため
設計
BQ → Salesforceへのデータ連携基盤の設計の全体像は以下のようになっています。
- 前提として、この基盤の役割は、BQ上で作成されたデータをSFDCのAPI Requestへの変換することです。Salesforceへ連携するデータの作成者は、別のチーム (Data Analyst) がいました。
- そこでBQ上にInterfaceとなるテーブルを作成し、アナリストの方にScheduled Queryを利用してデータを更新してもらう設計にしました。テーブルのスキーマは、データの変更に対して柔軟に対応可能であり、このあたりの設計はBQ設計で説明します。
- Application Integrationのフローは定時実行され、Salesforceへのデータ連携が行われます。詳しいフローの設計はApplication Integration設計で説明します。
BQ設計
BQのテーブルは、基盤のinterfaceとなるデータ受取用のテーブルと、テーブル内のデータのうち連携すべきデータを抽出するビューを用意しました。
データ投入用Interface table
interfaceテーブルは、上の図の通り、key-value形式でデータを格納できるようなスキーマとしています。 このような設計により、Salesforceに連携したい項目に追加/更新/削除が発生してData Analystの方が作成するデータに変更があった場合でも、テーブルスキーマの変更が不要となっています。 また、データ投入時のtimestamp列を保持することで、連携データを履歴管理しています。
連携データ抽出View
ビューは、interfaceテーブルのうち、最新timestampのデータのみ抽出するようなクエリで構成されています。 このビューを噛ませることで、最後に投入された、Salesforceに連携したいデータのみ抽出しています。
Application Integration設計
Application Integrationのフローは、以下の図のようになっています。
- BQからのデータ取得
- ビューから、連携対象のデータを抽出します。BQとの接続時には、Integration Connectorsというコネクタサービスを利用します。
- データ変形
- 取得したデータを、Salesforce APIに適した形式に変形します。
- 並列処理
- 変形されたデータは、Salesforceのレコードごとに並列で処理されます。これにより、データの処理が効率的に行われ、処理時間の短縮ができます。
- Salesforce APIの呼び出し
- 各Salesforceレコードに対して、Salesforce APIを叩き、データ連携をします。Salesforceへの連携時は、レコードの外部IDを用いた連携が必要でしたが、謹製のIntegration Connectorsでは外部IDを用いた更新が不可能であったため、今回はやむを得ずAPIを直接叩いて更新する方式を採用しました。APIの認証情報は、Application Integrationの認証プロファイルを用いてGoogle Cloud上に保持しました。
Integration Connectorsや認証プロファイルについては、以下のようにTerraform管理し、再利用可能としました。
# BQ用のIntegration Connectorsの定義 resource "google_integration_connectors_connection" "pubsubconnection" { name = "bigquery" location = "asia-northeast1" service_account = local.service_account connector_version = "projects/${local.project_id}/locations/global/providers/gcp/connectors/bigquery/versions/1" description = "Integration Connectors for BigQuery" config_variable { key = "project_id" string_value = local.project_id } config_variable { key = "dataset_id" string_value = local.datasets.dataset_hoge.dataset_id } } # Salesforce API用認証プロファイルの定義 resource "google_integrations_auth_config" "basic_example" { location = "asia-northeast1" display_name = "salesforce" description = "Integrations Auth Config for Salesforce" decrypted_credential { credential_type = "OAUTH2_CLIENT_CREDENTIALS" oauth2_client_credentials { client_id = data.google_secret_manager_secret_version.salesforce_client_id.secret_data client_secret = data.google_secret_manager_secret_version.salesforce_client_secret.secret_data token_endpoint = local.token_endpoint request_type = "REQUEST_BODY" } } }
最終的に、以下のGUIで表されるようなフローが完成しました。
結果
上の画像がInterfaceに投入されたデータ、下の画像がSalesforceの画面です。 最新timestampのデータが想定通り連携されていることが確認できました。
課題点
上記実装でBQのデータをSalesforceに連携できました。 一方で、Application Integrationを用いた構成には以下のような課題点がありました。
- 実行時の制約
- Application Integrationは実行時間やレコード数の制約が厳しく、処理速度も高速ではないため実行時間の制約に引っかかることがありました。そのためフロー自体を物理的に分割し、1つのフローで実行するデータ量を小さくすることで解決しました。
- フローのIaC管理が不可能
- フロー自体はGUI上でのみ編集が可能で、IaC管理ができません。フローを物理的に分割するときに、フローの複製が必要になりましたが、もしフローのIaC管理が可能ならフローの複製がより簡単に達成できたと感じています。
おわりに
今回は、Application Integrationを用いて、Salesforceにデータを連携する手法を紹介しました。 プラットフォーム的に設計することで、より連携対象のデータの変更に対して柔軟な基盤を構築しました。 データ連携は比較的少ない工数で達成できた一方で、課題点がいくつか見えてきました。 今回の取り組みで得られた知見を活かし、将来的なシステム拡張や新たなユースケースへの対応にもつなげていく予定です。
enechainでは、巨大なマーケットを支えるプラットフォームを一緒に構築する仲間を募集しています!要項は以下からご確認ください。