コネヒト開発者ブログ

コネヒト開発者ブログ

ニアリアルタイムで同期される検索基盤を構築 ~AWS Glueによるデータ同期編~

皆さん,こんにちは!MLエンジニアの柏木(@asteriam)です.

ここ最近は検索エンジン内製化プロジェクトに携わっていて,検索エンジニアとして,検索基盤の主にデータ連携・同期の実装を1から構築したりしていました.7月中旬にABテストまで持っていくことが出来たので,ひとまず安心しているところです.ここからはユーザーの検索体験向上のために検索品質の改善に力を入れていく予定です!

はじめに

今回新しく検索基盤をAWSのマネージドサービスを活用して構築しました!本エントリーでは,タイトルにもあるように,検索基盤の肝であるDBから検索エンジンへのデータ同期をAWS Glueを用いてニアリアルタイムで実施したお話になります.我々は以下の構成で今回の検索基盤を構築しています.

  • 検索エンジン:Amazon OpenSearch Service
  • データベース:Amazon Aurora
  • データ同期(ETL):AWS Glue
  • ワークフロー・パイプライン:AWS Step Functions

また,Step Functionsを使ったワークフロー・データ同期パイプラインに関する内容も別ブログで紹介したいと思います.全量データ同期に伴うreindexの仕組みや運用を意識したシンプルなワークフロー構成の紹介などをしようと思っています.

今回は以下の内容を紹介していこうと思います.

  • AWS Glueで実現するデータ同期とは
  • OpenSearchへのデータ連携でハマったところ
  • 今後の取り組み

※ 今回はGlueのDBやOpenSearchとの細かい設定方法や検索エンジン内製化プロジェクトの発足した背景などに関しては,触れないのでご了承下さい🙏


目次


AWS Glueで実現するデータ同期とは

ここからは実際にGlueを用いて検索エンジンへのデータ同期をどのようにして構築したかを紹介していきます.

全体アーキテクチャ

まずは今回の検索基盤の全体アーキテクチャを紹介しておこうと思います.

  • 太線矢印が「データの流れ
  • 点線矢印が「制御の流れ

検索基盤全体のアーキテクチャー概略図

中央にあるOpenSearch(検索エンジン)に対して,2つのパイプラインを流し込んでデータを同期しています.

  1. 全量データ用の同期パイプライン
    • 全量データの同期は,差分データ同期で取り逃がしたデータを拾ったり,バックアップ的な意味合いで定期的に実行しています.ただ1回の同期に時間がかかるので,頻繁には実施できません.
    • 特定のテーブルの全データを対象に総入れ替え(洗い替え)を行います.
  2. 差分データ用の用の同期パイプライン
    • 差分データの同期は,よりリアルタイムにアプリ上で生成されたデータを検索できるようにするために構築しています.
    • 直近x分以内に更新があったデータのみを対象に同期を行います.

全量データ用のパイプラインはさらに上段で実行スケジュールを管理するワークフローが存在しています.この辺りのワークフローやパイプラインの話は別ブログで紹介します.

AWS Glueによるデータ同期

Glueは様々なデータソースからターゲットソースに向けてサーバーレスでETLができるマネージドサービスになります.バックエンドでSparkが動いているので,高速なデータ処理が可能となっています.Glue Studioという新しいUIがあり,見やすく簡単に設定できてめっちゃ使いやすいです!また,使い勝手も悪くないと思います.

今回AWS Glueを選択した理由

AuroraからOpenSearchへのデータ同期の方法はGlueの他にも,AWS Database Migration Service (AWS DMS) を使う方法がありますが,今回の差分データ抽出や今後発生しうる複雑なデータ抽出が発生した場合に,柔軟に対応できるというところでGlueを選択しました.

前提としてマネージドサービスで運用したいという気持ちがあります.これは運用していく人数も限られている中で,データ同期を行うという本筋以外の周辺の管理運用に時間やコストを割けないというのもあります.

Glueはコネクターを使って簡単にOpenSearchへデータを同期できたり,組み込みの変換処理・フィルターが用意されていたりと便利な機能が色々とあるので,AWSでETL処理をするなら最初の選択肢としてありだと思います.

データ同期の流れ

さてさて今回は上記理由からGlueを選択することにし,データソースのDBであるAuroraからデータを抽出し,Glueを用いてターゲットソースの検索エンジンであるOpenSearchにデータを同期しています.

DB-Glue-検索エンジンの繋がり

Glueを使った流れは以下のようになります.

  1. データベースからデータをクロールする
  2. クロールしたらデータカタログにメタデータを管理する
  3. データカタログのメタデータを元にデータソースからデータを抽出する
  4. ジョブを実行してターゲットにデータを同期する

手動やスケジュールなどでGlue Jobを実行したタイミングで3, 4が実行される

  • Auroraへの接続
    • JDBC接続
    • インスタンス名・データベース名・ユーザー名・パスワードを設定
  • OpenSearchへの接続

こちらの公式ドキュメントにも設定の仕方が分かりやすく書いてあるので参考にしてみて下さい.

Glue Jobの設定

Glue Jobの設定はGlue Studioという新しいUIを使って簡単に設定することができます. 決めるのはデータソースとターゲットだけです.ここで事前に上に書いたコネクターを有効化しておく必要があります.

  • データソース:Data Catalog
  • ターゲットソース:Elasticsearch Connector

全量と差分データの同期フローのDAGは以下のようになり,差分処理が入るかどうかの違いになります.

全量と差分データの同期フローのDAG

Glue Jobには最初から用意されている組み込みのTransform処理があり,それを繋ぐだけで簡単に処理を追加することができます.また,カスタム処理も作ることができ,今回の差分処理はカスタム処理を使っています.

# 青枠の部分を抜粋しています.
from datetime import datetime, timedelta, timezone

from awsglue.dynamicframe import DynamicFrame, DynamicFrameCollection
from awsglue.transforms import DropFields, SelectFromCollection


# 現在時刻からX分前の時刻を取得
JST = timezone(timedelta(hours=+9), 'JST')
now = datetime.now(JST)
lastXmin = now + timedelta(minutes=-X)

# 差分データ抽出用のクエリ
query = f'''
select * from myDataSource
where modified > '{lastXmin}'
'''


def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame:
    """対象のクエリに対して,sparksqlを実行する"""
    for alias, frame in mapping.items():
        frame.toDF().createOrReplaceTempView(alias)
    result = spark.sql(query)

    return DynamicFrame.fromDF(result, glueContext, transformation_ctx)

# Script generated for node Difference Data Transform
def DiffDataTransform(glueContext, dfc) -> DynamicFrameCollection:
    """差分データを抽出し変換する"""
    # DynamicFrameCollectionをDynamicFrameにする
    dyf = dfc.select(list(dfc.keys())[0]).toDF()
    diff_data = DynamicFrame.fromDF(dyf, glueContext, "diff_data")

    return DynamicFrameCollection({"CustomTransform0": diff_data}, glueContext)

SQL_node3 = sparkSqlQuery(
    glueContext,
    query,
    mapping={"myDataSource": DropFields_node2},
    transformation_ctx="SQL_node3"
)

# Script generated for node Difference Data Transform
DifferenceDataTransform_node4 = DiffDataTransform(
    glueContext,
    DynamicFrameCollection(
        {"SQL_node3": SQL_node3}, glueContext
    )
)

# Script generated for node Select From Collection
SelectFromCollection_node5 = SelectFromCollection.apply(
    dfc=DifferenceDataTransform_node4,
    key=list(DifferenceDataTransform_node4.keys())[0],
    transformation_ctx="SelectFromCollection_node5",
)

ニアリアルタイムの同期のために

今回の要件として,差分同期はニアリアルタイムのデータ同期が求められハードルが高いものになっています.これを実現するために,GlueのJob bookmarkというオプション機能を使用しています.

Job bookmark機能とは,ジョブの実行状態を保持する機能で,処理済みデータを再度処理しないようにすることで,重複処理や重複データを防ぐことができます.これによりJobの実行時間が劇的に早くなります!ただ,ここはかなりハマったので次の章でも紹介します.

最終的には,Job bookmark機能とworker数(この辺はインスタンスタイプの調整も必要)を調整することでニアリアルタイムなデータ同期を可能にしています!

数分に1回動かすスケジュール実行は以下の2通り考えましたが,最終的に2番目の方法で実装することにしました.

  1. GlueのJob Schedulerを使う方式
    • メリット
      • オーバーヘッドが少なくGlueのみでデータ同期を完結できる
    • デメリット
      • 最低5分間隔でしか設定できない
        • X分間隔のジョブを複数作ることで回避できるが複雑で管理も大変
      • エラー通知がGlueの設定からはできない(CloudWatch Logsを使えばできそう)
  2. Step FunctionsとEventBridgeを使う方式
    • メリット
      • エラーハンドリングや通知設定がStep Functions内でできる
      • EventBridge経由のスケジューリングになり,柔軟な設定が可能
    • デメリット
      • 複数のサービス間でオーバーヘッドが生じる

OpenSearchへのデータ連携でハマったところ

検証中にハマった問題(初歩的なものもあります笑)を紹介したいと思います.

  • Too Many Requestsの問題
    • これは何も考えずdev環境でOpenSearchにデータを投げたら,発生したエラーです.この原因は単純に同期しようとしているデータ量が多い,またOpenSearch側のインスタンスタイプのスペック不足によるものだったので,dev環境の方もスペックを上げたところ解消されました.
  • Job bookmark機能が言うことを聞かない問題
    • 予想以上に処理時間がかかった
      • 1回のジョブ実行時はブックマークを作ることになるので,通常の実行となります.そのため,それなりに実行時間がかかるということを知らなかったので,効いていないと勘違いしました.差分の同期に関しては,dry-run的な形で一度動かしておく必要があることを学びました.
    • KeysとKeysSortOrderの設定
      • この問題は,データカタログからデータを抽出するところで設定するadditional_optionsのKeysSortOrderをdescにすると意図した通りに上手く動かないという問題です.OpenSearchへデータ同期するのに,重複データを排除するためにupsert形式で入れているのですが,upsertが効いていないことに気づいて発見しました.
      • これはタイムスタンプのように昇順で増加する場合,ascで指定する必要があったみたいです.直感的には,新しい順で並ぶdescにすると思ったのですが,それだと上手く機能しないことが分かりました.(これに全然気づかなくて苦労しました…😅)
        • 正:additional_options={"jobBookmarkKeys": ["id", "modified"], "jobBookmarkKeysSortOrder": "asc"}
          • Keysは複数取ることができ,id(レコードにおける一意のキー)とmodified(更新日)を指定しています.
        • この設定は,Glue Jobの最初のフェーズでデータカタログからデータを抽出する部分でオプションとして設定します(コード例を下に載せておきます).
# Script generated for node Data Catalog table
DataCatalogtable_node1 = glueContext.create_dynamic_frame.from_catalog(
    database="<データカタログに登録されているDB名>",
    table_name="<上記DBに登録されているテーブル名>",
    transformation_ctx="DataCatalogtable_node1",
    additional_options={"jobBookmarkKeys": ["id", "modified"], "jobBookmarkKeysSortOrder": "asc"}
)

Glue→OpenSearch間でエラーが発生した場合,なかなか原因を調査して特定するのが難しいなと感じました.CloudWatch Logsに出力するロギングオプションを有効化したりもしていますが,それでも大変です.今後の取り組みでも紹介していますが,Spark UIを立ち上げて詳細を追っていく必要がありそうで,この辺りはもう少し簡単にできるようになると嬉しいなと思います.

今後の取り組み

データ同期部分に関して,今後取り組んでいきたいことを紹介します.

  • Glueのジョブ実行時間を収集して見える化
    • Start-up timeやExecution timeを計算することで,処理時間が遅くなっているかどうかを知ることでサービスへの影響をプロアクティブに把握することができます
  • Glueジョブのモニタリング
  • データ基盤の安定した運用
    • 検索エンジンにとってデータが安定して同期されることが必要不可欠なため,どんな状態でも止まらず安定してデータ同期を続けられる仕組みとそれを計測する方法についても定義していきたいと思っています

今後複数のテーブルを同時に連携していく必要があったり,機械学習によるランキング学習など負荷が発生したり処理時間の問題など色々な課題が生まれてくることが考えられるので,そのためにも状態の把握や監視を進められると良いと思っています.

おわりに

最後に,コネヒトではプロダクトを成長させたいMLエンジニアを募集しています!!(切実に募集しています!)
もっと話を聞いてみたい方や,少しでも興味を持たれた方は,ぜひ一度カジュアルにお話させてもらえると嬉しいです.(僕宛@asteriamにTwitterDM経由でご連絡いただいてもOKです!)

www.wantedly.com

AWS Distributed Load Testingを使うと手軽にAWS内での負荷試験が出来るという話

こんにちは。インフラエンジニアの永井(shnagai)です。

今回は、現在進めているプロジェクトでの負荷試験で、AWS Distributed Load Testing を使って比較的手軽にAWS内での負荷試験を行うことが出来たのでその内容を紹介しようと思います。

内容はざっくり下記3点です。

  • これまで使ってきた負荷試験ツールとその悩み
  • AWS Distributed Load Testingとは
  • 実際の負荷試験の様子

これまで使ってきた負荷試験ツールの悩み

新規システムを開発し、サービスに導入する際には、負荷試験が必要になるケースも多いと思います。

負荷試験は、開発したシステムが想定リクエストに対して性能面で問題なく稼働出来るかをユーザに提供する前にチェックする目的で行うのが一般的です。

内容としては、レイテンシやステータスコードのエラー数等をレポートし、それらが基準として定めたパフォーマンスを満たしているかチェックします。もし想定外の結果が出た際は、負荷試験で露見したボトルネックを潰し、再度計測して基準値を満たせるまでそれを続けます。

負荷試験を行うツールは代表的なものがいくつかあり、ライトなものだとab - Apache HTTP server benchmarking tool、複雑なテストシナリオやレポーティングを求めると、Apache JMeter™ , Tsung ,Gatling - Professional Load Testing Tool等があります。

コネヒトでもケースに応じてこれらのツールを都度検討しつつ負荷試験を実施してきました。

ここでは各ツールの比較はしませんが、個人的な所感だとabは簡単な単一リクエストの負荷試験用途に便利で、その他のツールはそれぞれ特性はありつつも、一度手に馴染んで環境を作ってしまえば負荷試験ツールとして十分に便利だと思っています。

ただ、負荷試験環境を作るのに毎回そこそこのコストがかかるのが個人的にはもったいないなといつも思っていて、実際に下記のような悩みポイントを毎回抱えていました。

  • 負荷試験の環境準備には2つあり、負荷をかけられる側とかける側がある。負荷をかける側の準備は本来の開発とは関係ない部分なので、出来るだけ低コストに抑えたいのですがテストの規模やツールによってはその準備に大きなコストがかかるケースがある。負荷をかける側がボトルネックになり期待したパフォーマンスが出ないケースも十分に想定されるのでおざなりには出来ない。 端的にいうと大きめのリクエストをクライアント側をボトルネックにせずに回す環境作るの大変、、、
  • 負荷試験の頻度が年に数回なので、nヶ月後にやろうとした時に環境含めたキャッチアップコストがそこそこ高い

以降ではこれらの負荷試験環境の準備コストにかかる悩みを一定解消してくれた AWSマネージドの負荷試験ツールであるAWS Distributed Load Testingを紹介していきます。

※参考までに、今回のツール選定前にざっくりかかげた負荷試験ツールの要件のキャプチャを貼っておきます。

AWS Distributed Load Testingとは

AWS SAに負荷試験について相談したところ、下記ブログを教えてもらいAWS Distributed Load Testingの存在を知りました。

大規模な負荷テストを実行可能。「Distributed Load Testing on AWS」 を試してみる - builders.flash☆ - 変化を求めるデベロッパーを応援するウェブマガジン | AWS

自分なりの解釈で要約すると、下記の特徴があるサービスかなと思っています。

  • CloudFormation一発実行で負荷試験の実行環境を作ってくれるサービス
    • 負荷試験環境の構成管理不要
  • 作られるサービスにはフロントエンドとバックエンドに分かれる
    • フロントエンドは、負荷試験の設定やレポートを補完閲覧するためのWebアプリケーション
    • バックエンドは、負荷試験でいうところのクライアント環境(負荷掛け機)
  • 単純なエンドポイントへのHTTPリクエストの他、Jmeterのシナリオファイルをインポート出来る
    • Jmeterのシナリオファイル=複雑なテストシナリオを実行可能
  • クライアントはFargateタスクなので、クライアント側のCPU負荷をウォッチすることでリソース不足であれば簡単に設定画面から追加可能
  • クライアント側で計測出来る項目に、 Average response time(s) Requests Per Second err数(ステータスコードあり) Percentileレイテンシ 等があり1テスト毎にレポート
    • 上のレポートを一覧で見ることも可能(項目は固定だが)

これまで都度苦労してきた負荷試験クライアントの準備が、用意されているCloudFormation一発で作成/削除出来るという手軽さが個人的には一番刺さりました。

また、Jmeterのシナリオを使えるので複雑なテストシナリオの実行が可能かつレポートも今回求めているものを満たしていると判断したので、今回の負荷試験ツールとして正式採用することを決めました。

実際の負荷試験の様子

ここでは具体の設定方法には触れず、どのようなフローで負荷試験を回したかを紹介します。

具体の設定方法は、クラメソさんのやってみたシリーズが詳しいのこちらを参照していただくのがよいかなと思います。

AWSの負荷テストソリューションを試してみた | DevelopersIO

負荷試験のフロー

  1. ローカルのJmeterでシナリオを作る
  2. CloudFormationを実行して負荷試験環境を作る(ほぼワンクリック)
    • VPC内にリソース作ることも可能なのが嬉しいポイント
  3. jmxファイルをZIPに固めて、Distributed Load Testing上でアップロード

    • csv等を使う場合は、一緒にzipに固める
      • シナリオで読み込ませるファイルのパスをアップロード時の相対パスにする必要があるので注意
  4. General Settingsにある負荷テストの条件を設定して実行

  5. 実行後の結果を確認し再度設定を変えて基準値を満たすまで実施
    • 負荷がけ機(クライアント)側のリソース状況はECSクラスタ関連のCloudWatchで確認
    • 負荷をかけられるサービス側はそれぞれのAPIやデータソース側のメトリクスを確認
    • 負荷テストの実行結果のレポートはこんな感じでテスト毎に出力されます 実行結果としてレポートされる項目の一覧は下記です。
    • Average response time : テストによって生成されたすべてのリクエストの平均応答時間 (秒)
    • Average latency : テストによって生成されたすべてのリクエストの平均レイテンシー (秒)
    • Average connection time : テストで生成されたすべてのリクエストについて、ホストへの接続にかかった平均時間 (秒)
    • Average bandwidth : テストで生成されたすべてのリクエストの平均帯域幅
    • Total Count : リクエストの総数
    • Success Count : 成功したリクエストの総数
    • Error Count : エラーの総数
    • Requests Per Second : テストで生成されたすべてのリクエストの 1 秒あたりの平均リクエスト数
    • Percentile : テストの応答時間のパーセンタイル値 (最大応答時間は 100 %、最小応答時間は 0 %)
  6. 負荷試験の結果、基準値を満たせたらCloudFormationのスタック削除で環境がまるっと削除されるので後片付け終了

負荷試験のまとめとして、実行結果と付随情報を一覧出来るようにまとめて最終的なレポートは自作しました。どの負荷試験ツールを使ってもこれは一緒で、実際のサービス側のリソース状況(CPU,mem等)と負荷試験レポートの相関関係が一望出来ないと意図しない結果の時にボトルネックがわかりにくくなるため一覧出来るようにしています。

ただ、Distributed Load Testingのレポートベースで作れるので作成コストは非常に小さく済みました。

今回は、AWS Distributed Load Testingを使って比較的簡単にAWS内で負荷試験を実施した事例を紹介しました。

個人的には、負荷試験の環境準備にかかるコストが劇的に減った感覚があるので非常におすすめのサービスです。

最後に宣伝です! コネヒトでは一緒に成長中のサービスを支えるために働く仲間を様々な職種で探しています。 少しでも興味もたれた方は、是非気軽にオンラインでカジュアルにお話出来るとうれしいです。

コネヒト株式会社

hrmos.co

A/Bテスト標準化へ取り組んだ話

みなさんこんにちは!機械学習チームのたかぱいです。

半年ほど前からA/Bテストの標準化に取り組んでいたので、本日はその背景やプロセスについてご紹介しようと思います。

尚、以下メルカリさんの事例を参考にさせていただいています(この場を借りて御礼申し上げます。ありがとうございます!)

note.com

標準化に取り組んだ背景

コネヒトでは日常的にさまざまなチームでA/Bテストが行われていました。

しかし、以下のような課題があると感じていました。

  • A/Bテストに関する知識にバラつきがある
  • チームごとにA/Bテストのドキュメントの書体が異なるので、読み解くのにコストがかかる
  • 「どのような実験」が「どのくらいの期間行われていたか(いるか)」という情報が一目で把握できない

etc...

上記のような課題は時間が経てば経つほど負債が大きくなると判断し、一度テコ入れした方が良いと思い、標準化に取り組みました。(後述する書籍の日本語版が発売になったことも大きなトリガーでした)

ここで言う「標準化」 is 何?

標準化によって得たい効果は、以下のようなものを想定していました。

  • 実験計画書のテンプレートをつくり、レビュー文化も導入することで、人依存による実験設計・検証品質の分散を減らす
  • 書体が揃うことで過去のドキュメントを読み解くコストが下がる
    • いつ、どんなものが誰によって検証されたかを振り返りやすく・見やすくする
  • 実験文化そのものの醸成を促す
  • 他のチームでどんな実験をやっていて、どんなメトリクスが使われているのかを共有知とする
  • 過去に何がうまくいって、何がうまくいかなかったのかを振り返ることができる
    • 未来へKnowledgeリレーを繋いでいきたい

そのため、まずは誰でも使える標準化テンプレートを作成し、それに沿って実験を行うフローにアップデートしようと考えました。

以降で、標準化プロセスでポイントだと感じた以下2点について紹介できればと思います。

  1. さまざまな職種の方と輪読会を行い、A/Bテストに対する視座を揃える
  2. 先んじて活用事例を作成し、今後社内で行われる実験で利用するイメージを沸かせる

最終的に作成したテンプレートは冒頭で述べたメルカリさんのテンプレートに若干アレンジを加えたものになりました。(最後にサンプルを載せています)

1. さまざまな職種の方と輪読会を行い、A/Bテストに対する視座を揃える

まず最初に行ったことは、いろんな職種の方を交えて輪読会をしました。

読んだ本は界隈で有名な A/Bテスト実践ガイド 真のデータドリブンへ至る信用できる実験とは という書籍です。

社内slackで参加者の募集を行い、ML、インフラ、サーバーサイド、ネイティブエンジニアやPdMといったいろいろな専門性を持った方々に参加していただきました。

個人的にはPdMを含めた様々な職種の方に参加してもらえたのはとても良かったなと思っています。

例えば、本を読んでいく中で「インフラでこの辺の数値って計測できるんですか?」といった議論や、「ネイティブでこの辺のログって取得できるんですか?」といった議論をその場で行うことができました。
また、PdMにも参加してもらったので「今のコネヒトだとこの辺の検証はどうやっているのか」や「この辺注意した方が良いよね」といった共通認知を取ることもできました。

本を読み終えたあとは、輪読会で議論した内容をもとにテンプレートに落とし込み、関係者にレビューしてもらいブラッシュアップしていきました。

レビュー時の様子

2. 先んじて活用事例を作成し、今後社内で行われる実験で利用するイメージを沸かせる

無事にテンプレートが作成できても、実際に使うまでにはハードルがあると思います。

そのため、機械学習のプロジェクトで過去に行ったABテストをこのテンプレートに落とし込み、いくつかサンプルを作成しました。

これにより少しでも活用イメージを沸かせてもらい、「自分も使ってみようかな」というアクションを促しました。

作成したテンプレート

冒頭で紹介したメルカリさんの事例にあるように、「Background」「Test setting」「Metrics Details」「Action plan」という項目は、細部は異なりますが概ね同じ項目を採用しました。

大きな差分としては「Result」という項目を追加しています。

この項目は、実験が終了した後に記載する項目で、実験が複数のメトリクスにどの程度の影響を与えたかを記録しておく項目です。
数値が改善した理由(成功した理由)や、数値に変化がなかった or 改悪した理由(失敗した理由)などの考察も記入します。
ここの記載内容をもとに、A/Bテストの成功 or 失敗を判断し、事前に決めておいたAction plan**に則ってNext actionに移行します。

Resultは以下のような項目でテンプレート化しました。

# Result

## OEC metricsの結果
- hoge

## Guardrail metricsの結果
- hoge

## Debugging metricsの結果
- hoge

## 考察
- hoge

## Next action
- hoge

この項目を追加した意図としては、実験計画書とその実験の結果や得られた知見をセットで記録しておくことで、後から振り返りやすくしたい、という想いがあります。

標準化に取り組んでどうだったか?

運用を始めて半年ほど経過しましたが、このテンプレートを使って数十件の実験計画書が作成されています。

実験計画書を作成する際も項目に沿って埋めるだけで良くなったので、準備コストは削減できているなぁと感じています。

また「こんな検証してるんだな〜」ということも分かりやすくなり、各チームの簡易的な活動の可視化にもなっていると感じています。

最後に

前提として、実験の成功率は高くないと思っています。

BingやGoogleでも、アイデアの成功率は約10%〜20%程度、Slackでも30%程度しかポジティブな結果を得られないことが公開されています。

つまり、70%以上は失敗するわけですが「失敗=負け」だとは思っていません。失敗から何も学べない状況こそが本当の敗北だと思っています。

そうならないためにも、失敗前提で標準的なプロセスに則ってサイクルを簡単に回していける仕組みを作っていくはとても大切だと思います。

実験計画書を溜めておくことによるメリットを享受できるのはもう少し先になると思いますが、確実に未来へ資産を残していけていると思うので、形骸化しないように今後もブラッシュアップしていこうと思います!

We Are Hiring !!

コネヒトでは一緒に働く仲間を募集しています!

もしご興味があれば、カジュアルにお話しさせてください!

www.wantedly.com

コネヒトにおける機械学習関連業務の紹介資料を公開します

更新履歴💡
2022-07-25:初版作成

コネヒトでは Tech vision の1つに 1 to 1 AI というStrategyを掲げているように、今後パーソナライズやレコメンデーションなどの分野に積極的にチャレンジしていくため、機械学習エンジニアの採用を強化しています。

それに伴い、コネヒトにおける

  • 機械学習の活用事例
  • 今後どんなチャレンジをしていく予定なのか
  • 機械学習エンジニアの働き方

などについて、より多くの方に知っていただきたいと思い本資料を公開しました。
(※資料中のリンクについては、speakerdeckページの説明欄や本記事の後半にも記載しております)

本資料を読んでより詳しく話を聞きたいと思った方は、ぜひカジュアルにお話しをさせてください!(各メンバーの Twitter DM や Meety経由でお気軽にご連絡ください!)

採用選考にご興味を持っていただいた場合は、下記よりご応募ください。

bit.ly

上記資料にも添付しておりますが、メンバーが公開している登壇資料やブログ記事もぜひご覧ください。

機械学習の活用事例

MLプロジェクトの進め方

tech.connehito.com

tech.connehito.com

サービス内での活用事例

tech.connehito.com

tech.connehito.com

tech.connehito.com

社内での活用事例

tech.connehito.com

機械学習基盤

分析環境

tech.connehito.com

基盤のスコアリング

tech.connehito.com

SageMakerやStep Functionsを用いたMLOps

tech.connehito.com

tech.connehito.com

tech.connehito.com

tech.connehito.com

産学連携

connehito.com

働いている人・チームのインタビュー記事

www.wantedly.com

eh-career.com

www.wantedly.com

www.wantedly.com

テクノロジーで「家族像が実現できる社会をつくる」という価値提供をしていきたい、皆さんのご応募をお待ちしております!!

Firebase AnalyticsからBigQueryへの日次データ同期が突如不規則になった事象に対応した話

こんにちは。インフラエンジニアの永井(shnagai)です。

今回は、Firebase AnalyticsからBigQueryへの日次データ同期処理の時間が大幅にずれた際に取った対策について書こうと思います。

内容はざっくり下記3点です。

  • 背景説明
  • Firebase AnalyticsからBigQueryへのデータ同期
  • 解決策

背景説明

コネヒトでは、家族ノートという「ママリ」内の検索データとQ&Aデータを可視化したデータ分析サービスを運営しています。

info-kazokunote.mamari.jp

家族ノート内で使っている検索データは、FirebaseAnalyticsを使ってトラッキングしている行動ログをベースにしています。

サービス性質上FirebaseAnalyticsの行動ログは大切で、元のデータがうまく出来ていないとデータの欠損等にも繋がってしまいます。

※もちろんそう簡単に欠損を出すわけにはいかないので、後続の集計処理のスケジュールを工夫したりしてデータ欠損が出ない取り組みはしていますが。

Firebase AnalyticsからBigQueryへのデータ同期

データ同期のフロー

今回の事象の説明に入る前に、FirebaseAnalyticsからBigQueryへのデータ同期のフローをざっくり書くと下記のようになっています。

デバイス
↓ ①行動ログを送信
FirebaseAnalytics
↓ ②リアルタイムにデータ同期
BigQuery(events_intraday_yyyymmdd)
↓ ③日次テーブルとして書き出し
BigQuery(events_yyyymmdd)

ちなみにFirebaseAnalytics上の設定は下記のキャプチャのように「毎日」「ストリーミング」同期の両方をオンにしています

今回課題になった点

※再掲

デバイス
↓ ①行動ログを送信
FirebaseAnalytics
↓ ②リアルタイムにデータ同期
BigQuery(events_intraday_yyyymmdd)
↓ ③日次テーブルとして書き出し
BigQuery(events_yyyymmdd)

家族ノートでは、③の日次テーブルを起点に集計データを作って最終的に表示するBigQueryのビューを作成しています。

ただ、③の日次テーブルの書き出し時間は不定期で、BigQueryの events_yyyymmdd テーブルが何時に出来るのかは利用者側ではハンドリング出来ません。

これまでの経験則から、ある程度この時間までにはデータが出来ているというバッファをもったタイミングで後続の集計処理をスケジュールベースで動かしていました。

そんな中、今年の6月1日のタイミングから時間のズレが大きくなり、元のスケジュールではカバーできない状況が発生しました。(これまでの時間帯に日次テーブルが出来る日もあれば大幅にタイミングが遅れる日も散見され始めました)

具体的に言うと、集計テーブルに対するデータのテストでアラートが出る日が増えてきており何かしらの対応が必要になりました。

解決策

解決策は簡単で、集計テーブルの対象を③ events_yyyymmdd のみから、③events_yyyymmdd or ② events_intraday_yyyymmddという形に変えました。

events_intraday テーブルの仕様がよくわからず、結果としてデータ重複等予期せぬことが起きるのは怖かったので、1ヶ月ほど併行して出来上がるデータに差分がないかをチェックしました。

結果、うまく動いた日は同じ数のイベントがあり、旧来の処理がコケた日も新処理ではデータがうまく出来ていることが確認出来たので正式に新方式に切り替えることにしました。

これまでのクエリ:

FROM
    `hogehoge.analytics_99999999.events_*`
WHERE (_TABLE_SUFFIX = FORMAT_DATE("%Y%m%d", DATE_SUB(@run_date, INTERVAL 1 DAY))

新方式のクエリ:

FROM
    `hogehoge.analytics_99999999.events_*`
WHERE (_TABLE_SUFFIX = FORMAT_DATE("%Y%m%d", DATE_SUB(@run_date, INTERVAL 1 DAY))
      OR _TABLE_SUFFIX = CONCAT('intraday_', FORMAT_DATE("%Y%m%d", DATE_SUB(@run_date, INTERVAL 1 DAY))))

どうしようか困っていた時に、下記ツイートのアイデアに非常に助けられました。

この場でお礼を伝えさせていただきます!ありがとうございます。

今回はニッチなネタでしたが、このブログを読んでたまたま同じ悩みに遭遇している誰かの救いになれば幸いです。

CakePHP Fixture Factories を導入しました

こんにちは。プロダクト開発部の @su-kun1899 です。

今回はママリの CakePHP アプリケーションに Fixture Factories を導入した事例を紹介します。

Fixture Factories とは何か

Fixture Factories は、モデルやデータベースに依存するテストコードにおいて、テーブルの作成やデータ初期化を行うためのプラグインです。

github.com

CakePHP には元々 Fixture という仕組みが提供されていますが、 Fixture Factories はより柔軟に扱うことができます。

https://book.cakephp.org/4/en/development/testing.html#test-fixtures

導入したきっかけ

アプリケーションの規模が大きくなり、機能が増えてくると、テーブル(モデル)ごとに一律データを管理する Fixuture は管理や運用の負荷が高まっていきます。

ママリでもテストケースとデータの依存関係が強くなり、テストデータを少し変更すると既存のテストが壊れて修正が必要になるなど、気軽に変更できないことが多くなってきていました。

Fabricate を導入するなど対応は行っていましたが、関連データの生成やシーケンシャルな値の発行等でママリでのユースケースにはマッチせず、十分な解決策には至っていませんでした。

GitHub - sizuhiko/Fabricate: PHP data generator for Testing inspired on Fabrication and factory-girl from the Ruby world.

そこで、Fixture Factories を導入することにしました。

Fixture Factories のいいところ

一部ですが、特に便利だなと思っているところを紹介します。

公式が推している

公式ドキュメントでも明確に Fixture 肥大化時の解決手段として言及されています。

https://book.cakephp.org/4/ja/development/testing.html#id20

github.com

API が直感的かつ柔軟

主観を多分に含みますが、かなり使いやすく読みやすいと思います。

<?php
// Entity を生成するとき
$article = ArticleFactory::make()->getEntity();
<?php
// Entity を永続化するとき
$article = ArticleFactory::make()->persist();
<?php
// フィールドを書き換えるとき (未指定のフィールドはデフォルト値で生成される)
$article = ArticleFactory::make(['title' => 'Foo'])->getEntity();
$article = ArticleFactory::make()->setField('title', 'Foo')->getEntity();
$article = ArticleFactory::make()->patchData(['title' => 'Foo'])->getEntity();
<?php
// テストデータを複数件まとめて作るとき
$articles = ArticleFactory::make(2)->getEntities();
// フィールド書き換えと同時に行うこともできます
$articles = ArticleFactory::make(['title' => 'Foo'], 3)->getEntities();
$articles = ArticleFactory::make(3)->setField('title', 'Foo')->getEntities();

関連データの生成が柔軟

モデルの Association に従って、スムーズに関連データの生成が行なえます。

# Bake コマンドで -m オプションを指定すると、関連データ生成のメソッドも生やしてくれます
bin/cake bake fixture_factory -m Articles
<?php
// 関連データをまとめて作る
$country = CountryFactory::make()->withCities()->persist();
<?php
// 関連データの値や件数も柔軟に変更できる
$country = CountryFactory::make()->withCities(3)->persist();
$country = CountryFactory::make()->withCities(['is_capital' => true])->persist();
$country = CountryFactory::make()->withCities(['is_capital' => true], 3)->persist();

Faker が使える

初期データは Factory の setDefaultTemplate メソッドで定義するのですが、 Faker が使えるのでテストデータ生成がスムーズです。

<?php
class ArticleFactory extends BaseFactory
{
    // ~~~~ 略 ~~~~~
    protected function setDefaultTemplate(): void
    {
          $this->setDefaultData(function(Generator $faker) {
               return [
                    'title' => $faker->text(30),
                    'body'  => $faker->text(1000),
               ];
          })
          ->withAuthors(2);
    }
    // ~~~~ 略 ~~~~~
}

呼び出し元で callback を使って利用することも可能です。

<?php
$article = ArticleFactory::make(
    fn(ArticleFactory $factory, Generator $faker) => [
        'title' => $faker->text,
    ]
)->persist();

ちなみに Faker は CakePHP 側の defaultLocale を参照するので、 ja_JP を指定しておくと日本語のテストデータ生成もできます。

その他 Tips

使用するケースは限られるかもしれませんが、参考までに。

データを組み合わせごと複製する

特定の組み合わせのデータを、そのまま任意の数複製することができます。

下記の例では、 is_admin が true / false で 5 件ずつ、合計 10 件のテストデータが生成されます。

<?php
$users = UserFactory::make(
    [
        ['is_admin' => true],
        ['is_admin' => false],
    ],
    5
)->persist();

テストのときだけ Association を書き換える

あまりないと思いますが、 Factory で getTable をオーバーライドすることで、 テストのときだけ有効な Association を定義することができます。

何らかの事情でプロダクションコードでは Association を定義できないが、テストの場合は Association をあるものとしてテストデータ生成を効率化したいケースなどで利用できます。

<?php
class ArticleFactory extends BaseFactory
{
    // ~~~~ 略 ~~~~~
    public function getTable(): Table
    {
        $table = parent::getTable();

        // 一時的に Association を定義
        $table->hasOne('Authors')->setForeignKey('author_id');

        return $table;
    }
    // ~~~~ 略 ~~~~~
}

おわりに

今回は Fixture Factories の導入と、その便利な機能の一部について紹介しました。

かなり便利さを実感しており、どんどん活用していこうと思います!

PR

コネヒトでは一緒にテストを改善していく仲間を募集しています!

hrmos.co

2022年7月最近のスマイル制度活用事例

コネヒトには「スマイル制度」という制度があります。これは開発組織でのインプットとアウトプットの活性化を促進する制度です。とてもコネヒトらしい制度になっているので、詳しくはスマイル制度 - Connehito Tech Visionや、制度が生まれた当時の記事をご覧ください。

tech.connehito.com

その後、カジュアル面談などで社外の方から「実際どういったものに使われているんですか?」と質問いただくケースがちらほらあるため、どういったインプット・アウトプットが行われたかを発信していければと思います!

インプット事例

アウトプット事例

イベント開催、LT、個人ブログ、OSSコントリビュートと色々な種類のアウトプットがありました。

speakerdeck.com

connehito.connpass.com

zenn.dev

masatakashiwagi.github.io

github.com

以上です!