コネヒト開発者ブログ

コネヒト開発者ブログ

家族ノートを支えるBigQuery+StepFunctionsで作るデータレイク

こんにちは。インフラエンジニアの永井(shnagai)です。

最近、家族ノートという「ママリ」内の検索データとQ&Aデータ(現在開発中)を可視化したデータ分析サービスの立ち上げに携わっています。

info-kazokunote.mamari.jp

今回は、家族ノートで使っているデータ基盤の一部であるBigQuery+StepFunctionsで作ったデータレイクの仕組みについてご紹介します。

内容は、ざっくりとこんな話を書こうと思います。

  • データ基盤作りに至った経緯
  • AWS→BigQueryにデータ移送するアーキテクチャのpros&cons
  • StepFunctions+Embulk(Fargate)を利用したデータレイクの仕組み

データ基盤作りに至った経緯

コネヒトには大きく分けると2つのデータセットがあります。

  • DB(Aurora)にあるアプリケーションのデータ(業務データやマスターデータ)
  • BigQueryにある行動ログや集計データ

今回家族ノートで使うデータとしてこの2つのデータセットをかけ合わせる必要があり、 データ結合の手段としては、下記2つの方法を検討しました。

  • ①バッチ処理を使いアプリケーション側でデータの結合を行い新しいデータセットを作るパターン
  • ②データマートを用意しアプリケーションはデータ結合は意識せずにデータを利用するパターン

MLプロダクト等で、①のパターンの処理がいくつか動いているのですが、同じようなデータセットを作るバッチ処理が各所で動くのは望ましい状態ではないという課題があり、これを機にデータ基盤を作ることにしました。

と言っても、いきなり完璧なデータ基盤を作るのは現実的ではないのでスコープを絞り小さくデータ基盤を作ることから始めました。 こんな要件を定義しています。

  • 家族ノートのMVPリリースに必要なデータマートを用意するのがゴール
    • BigQuery上にデータレイク/データウェアハウス/データマートを用意
    • データマートとしてはBigQueryのViewを利用しアプリケーションから参照する
  • データマートに必要なデータレイクを最小限でBigQuery上に整えていく

この構成を考えるにあたり下記の書籍が非常に参考になりました。

データマネジメントが30分でわかる本 | ゆずたそ, はせりょ, ゆずたそ | 経営情報システム | Kindleストア | Amazon

Viewでデータマートを作るには、BigQuery上に必要なデータが揃っている(データレイク)必要があります。 以降は、Aurora(AWS上)にある業務データをBigQueryに移送するデータレイク作成の仕組みについて検討及び実装したの内容を紹介していきます。

AWS→BigQueryにデータ移送するアーキテクチャのpros&cons

AWS(Aurora)からBigQueryにデータを移送する手段ですが、AWS純正のものがあればそれを使うのが一番筋がいいだろうと考え、BigQueryにインテグレートできるような機能を探しましたが今のところはなさそうでした。

そこで、データの抽出/変換/移送を行うETLツールとそれを動かすワークフローについて、AWSで構成を作るにはどんなパターンがあるかをざっくりpros&cons形式で出し比較検討しました。

※個人の主観が入ってますので参考程度に

目指す状態

アーキテクチャ選定においては、下記観点を満たせる構成をゴールに設定しました。

  • 開発者もしくは必要な人が誰でも新しいETL処理を追加・削除出来る
    • コード化されておりかつ簡単なDSLで記載出来る方がよりよい
    • インフラ or データエンジニアしか管理・更新出来ない状態にはしない
  • できるだけワークフローの運用コストが低い
    • マネージドサービスもしくはそれに親しい運用コストが理想
    • 専属で面倒見る人まだいないので、オレオレの運用が発生しないという部分のウエイトを上げる

※構成比較の全体像 f:id:nagais:20210517095316p:plain

ETLツール

コード管理の容易さとプラガブルな構成による柔軟性からEmbulkを採用しました。

Embulk 

  • pros
    • 豊富なプラグインで柔軟な処理
      • フィルタである程度整形も出来る
    • 社外での活用事例も多数あり信頼出来る
    • yamlで簡単な記述
    • 処理をembulk内で完結出来る
    • 使い慣れてるのはある(うちでも既に運用実績はあり)
  • cons
    • Embulkの運用(バージョン管理,エラー時は調査が必要)
      • 重要な意味を持ち出した時にメンテ続けるのは結構大変ではある
    • Embulk実行リソース(Fargateにすることでサーバレスには出来る)

Glue+S3+DataTransferService(BigQuery)

  • pros
    • 全てマネージドサービスなので運用コストが低い
      • どんな処理をするかを定義すれば後はおまかせ
  • cons
    • 各マネージドサービスのコード管理が必要(terraformでやれそう)
    • Glue, DataTransferServiceそれぞれの制約と向き合う必要
    • DataTransferServiceの知見がなく事例も薄い(ちょい検証して実地で知見をためてくアプローチ)
    • DataTransferServiceはスケジュールベースでしか動かないかも(リトライ考えるとトリガ形式にしたい)

Glue+S3+Lambda+GCS+CloudFunctions

  • pros
    • GCSにcsvを置くとBQにデータ流すという構図が作れる
    • s3に置くまでとGCS後の世界が分けれるので応用は効く
      • s3にファイルを置けばデータソースが何であれ同じ仕組みでBQに取り込めるみたいな
  • cons
    • ピタゴラスイッチになるので、構成を把握するのが大変
    • 扱う要素が多くなるので管理コストは高い
      • 全部terraformで賄わないと変更箇所多数で運用破綻するかもな

ワークフロー

運用コストの低さをワークフローエンジンに求めたかったのが大きくStepFunctionsを採用しました。 DigDagでもFargateをキックする構成で近い構成を取ることはできそうだったのですが。DigDag自体の運用が残る点がネックとなりました。

DigDag

  • pros
    • 全てDigDagのDSLでコード管理
    • Embulkと好相性(Fargateキックパターンもあり)
    • BigQueryのジョブも管理出来る可能性ある(やってみないとわからないけど)
    • 管理画面もある
  • cons
    • 学習コスト
    • DigDag自体は常に存在している必要がある(ECSサービス化)
    • DigDagデプロイするときには注意が必要なのかな??(検証してみる必要がある)
  • 検討の中で追記
  • デプロイにしても何にしてもワークフロー自体をうまく回すことを意識して運用しないといけないよな・・・
  • コンテナでやる場合には、セッションやデータを管理する必要がある(イミュータブル対応)
  • 永続ディスクが必要(ECSでボリュームマウント)
  • PostgresSQLをRDSで立ててそこにデータを保存するようにする

StepFunctions

  • pros
    • ワークフローの運用は不要
    • 使い慣れている
    • AWSコンソール上からだがリトライやジョブ可視化可能
    • AWSリソースとフレンドリ
      • AWSサービス使うとかなった際に楽
  • cons
    • コード管理しないと運用破綻する
      • DWH構想用のコード化
  • 検討の中で追記
  • AWS SAMでStepFunctionsとEventBridgeを管理すればコード管理出来る
  • 共通Dockerイメージ作って、SFで Iterator を使うことで、呼び出し時にテーブル名を指定する形にすれば * 取得の時に簡素に書くことも可能かもを検証

StepFunctions+Embulk(ECS×Fargate)を利用したデータレイクの仕組み

f:id:nagais:20210517102541p:plain

コード管理

誰でも簡単にデータレイクに手をいれれる構成にこだわっていたので、データレイクに関わる必要なリソースはすべて1リポジトリで管理する構成にしました。

  • AWSリソース(StepFunctions,EventBridge等)はSAM(AWS サーバーレスアプリケーションモデル)を使って管理
  • データレイク処理追加時に1リポジトリの修正で済むようにembulkのコードと同じGitHubリポジトリで管理しています。
  • 基本的に新しいテーブルをデータレイクに追加したい時は、コピペベースでPR作ることで追加ができるようにしています。
  • デプロイも自動化しており、embulkのDockerイメージのbuild&pushとSAMのデプロイがmainマージ時に実行されます。

ディレクトリ構成はこんな感じです。

embulk(embulkの定義)
 |--conf/ (embulkの定義)★
sam(AWS SAMの定義)
 |--statemachine/ (StepFunctionsの定義)★
 |--functions/ (Lambdaの定義)
 |--env
   |--dev(dev環境用の定義)
     |--samconfig.toml (SAMデプロイ時の設定)
     |--template.yaml (SAMの定義 ※CloudFormationにジェネレートされる)
   |--prd(prd環境用の定義)
     |--samconfig.toml (SAMデプロイ時の設定)
     |--template.yaml (SAMの定義 ※CloudFormationにジェネレートされる)

StepFunctions

できるだけ運用コストを低くするために、

【ポイント】

  • StepFunctionsのステートマシンはデータソース単位で用意
    • 並列(Parallel)でFargateタスク(embulkプロセス)を呼び出すことでエラー時の他への影響を極力小さくする
  • コスト削減のためにFargate Spotを100%利用するクラスタで動かす
  • Fargateタスクなので、処理の干渉が起きない
    • 夜間なので今の所問題になっていないがデータソースに一気に繋ぐとデータソース側がパンクする可能性があるが、その時はステートマシン内のフローを組み替えることで対応予定
  • StepFunctions側でリトライ制御もいれており、単純リトライでもエラーになった際はエラー通知

参考までにSAMで管理している実際のステートマシンのコードの一部をお見せします。 VSCodeのプラグインを使うことでステートマシンを描画できるのはめちゃくちゃ便利でした。

{
    "Comment": "Definition of Embulk StateMachine",
    "StartAt": "Parallel",
    "States": {
      "Parallel": {
        "Type": "Parallel",
        "Next": "Final",
        "Catch": [
          {
            "ErrorEquals": [
              "States.ALL"
            ],
            "Next": "NotifySlackFailure"
          }
        ],
        "Branches": [
          {
            "StartAt": "users",
            "States": {
              "users": {
                "Type": "Task",
                "Resource": "arn:aws:states:::ecs:runTask.sync",
                "Parameters": {
                  "LaunchType": "FARGATE",
                  "Cluster": "${EcsCluster}",
                  "TaskDefinition": "${TaskDefinition}",
                  "NetworkConfiguration": {
                    "AwsvpcConfiguration": {
                      "Subnets": [
                        "${Subnets}"
                      ],
                      "SecurityGroups": [
                        "${SecurityGroups}"
                      ],
                      "AssignPublicIp": "ENABLED"
                    }
                  },
                  "Overrides": {
                    "ContainerOverrides": [
                      {
                        "Name": "${ContainerName}",
                        "Command": [
                          "/embulk/bin/embulk",
                          "run",
                          "conf/hoge/users_bigquery.yml.liquid"
                        ]
                      }
                    ]
                  }
                },
                "End": true,
                "Retry": [
                  {
                    "ErrorEquals": [
                      "States.ALL"
                    ],
                    "IntervalSeconds": 3,
                    "BackoffRate": 2,
                    "MaxAttempts": 1
                  }
                ]
              }
            }
          },
          {
            "StartAt": "children",
            "States": {
              "children": {
                "Type": "Task",
                "Resource": "arn:aws:states:::ecs:runTask.sync",
                "Parameters": {
                  "LaunchType": "FARGATE",
                  "Cluster": "${EcsCluster}",
                  "TaskDefinition": "${TaskDefinition}",
                  "NetworkConfiguration": {
                    "AwsvpcConfiguration": {
                      "Subnets": [
                        "${Subnets}"
                      ],
                      "SecurityGroups": [
                        "${SecurityGroups}"
                      ],
                      "AssignPublicIp": "ENABLED"
                    }
                  },
                  "Overrides": {
                    "ContainerOverrides": [
                      {
                        "Name": "${ContainerName}",
                        "Command": [
                          "/embulk/bin/embulk",
                          "run",
                          "conf/hoge/children_bigquery.yml.liquid"
                        ]
                      }
                    ]
                  }
                },
                "End": true,
                "Retry": [
                  {
                    "ErrorEquals": [
                      "States.ALL"
                    ],
                    "IntervalSeconds": 3,
                    "BackoffRate": 2,
                    "MaxAttempts": 1
                  }
                ]
              }
            }
          },

※embulkについては、すでにwebにも様々な事例があり特殊なことはしていないので詳細は省略します。

おわりに

この構成を作ってから半年以上経過していますが、今のところ特段大きな問題が起きておらず、意図した通りに低い運用コストで毎日動いています。 ※Fargateのエフェメラルストレージが200GBになったのも追い風になっています(20GBだったのでいつか限界に達したら手を加えなければと思っていた..)

StepFunctionsは他の用途でも活用しているのですが、AWSでワークフローを動かす時の選択肢としてかなり優秀だなと感じています。 AWSでワークフローを組む必要がある際には、一番の選択肢として考えていいのではないでしょうか。

最後に、今回紹介したデータを元にして開発している家族ノートを一緒に育てていってくれるエンジニアを募集しています。 少しでも興味をもたれた方は、ぜひ一度お話させてもらえるとうれしいです。

hrmos.co