コネヒト開発者ブログ

コネヒト開発者ブログ

AWS Glueを用いてETL環境を構築したお話(RDS for MySQL → S3)

はじめに

こんにちは。2019年3月にMLエンジニアとしてJOINした野澤(@takapy0210)です。

最近はThe Mentalistという海外ドラマにお熱です。犯罪コンサルタントとして活躍する主人公の歯に衣着せぬ物言いやテンポの良さなど、見ていて爽快ですし一つ一つの作品が短いので気軽に楽しめます。(心理学に興味があると楽しさ倍増です)

前置きが長くなりましたが、初めてコネヒト開発者ブログに登場です。テンポ良くいきたいと思いますので、どうぞよろしくお願いします!

今回は機械学習基盤アップデートの一環としてAWS Glueを用いてETLしてみた話について、苦労したポイントなどを中心にお話できればと思います。

導入背景

コネヒトではママリというコミュニティサービスを運営しており、ユーザが投稿する質問に対する検閲フィルタとして機械学習が用いられています。

詳細は弊社CTOのスライドをご覧ください。

現在、モデルの更新が継続的にできていない(≒検閲フィルタのアップデートができていない)という課題があり、機械学習基盤のアップデートを実施しています。
その一環として「RDS for MySQLからデータ抽出 → 機械学習しやすい形へ加工 → S3に保存」のように良い感じでETLしたいよね!という話になり、AWS Glueに白羽の矢が立った次第です。

結論

AWS Glueを用いることでRDSに保存されているデータを抽出・加工し、それをtsv形式でS3に保存することができました。

以下その内訳です。

  • データ件数:約700万件
  • Job実行時間:5分
  • 出力tsvデータ:約3GB

結果としてはやりたいことが実現できた訳ですが、思っていたより複雑で学習コストがかかるな〜という印象を受けました。
(初見が難しいだけで、一度設定できれば以降はスムーズにいけそう感!はありました)

次項から、Glueの説明や設定方法、MySQLからのデータ抽出方法などについて述べていきます。

AWS Glueとは

AWSが提供する完全マネージド型 ETL (抽出、変換、ロード) サービスです。
公式ドキュメントはこちら

f:id:taxa_program:20190417115301p:plain
AWS Glueのアーキテクチャ図

言葉の定義について

上図を見ても分かるようにいろんなワードが出てきます。それぞれの定義が曖昧になり兼ねないので、一度整理しておきます。

  • Data Stores
    データを永続的に保存する(している)リポジトリ。S3バケットやRDS。本記事ではRDSを示す。

  • Data Source
    Job(Script)への入力として使用されるData Stores。 本記事ではRDSを示す。

  • Data Target
    データの書き込み先のData Stores。本記事ではS3を示す。

  • Data Catalog
    AWS Glue 環境を管理するためのテーブル定義、Crawler、接続情報などを管理するサービスの総称。

  • Crawler
    Data Storesに接続しデータのスキーマを判断し、Data Catalog にメタデータテーブルを作成するサービス。

  • Job
    ETL 作業を実行するために必要なビジネスロジック。プロパティ(Scriptのファイル名や保存先、接続情報)で構成される。このJobの作成時にData SourceとData Targetを指定する。 JobはApache Spark ETL ジョブPython シェルジョブの2種類から選択することができ、スケジュール起動/イベント起動が可能。

  • Script
    Jobに含まれるETLの実行処理スクリプト。S3に保存される。

構築したアーキテクチャ

今回構築したアーキテクチャの概要図です。

冒頭でも述べましたが、今回の目的はRDSに保存されている情報をGlueを使って抽出・加工し、それを.tsv形式でS3に保存することです。

f:id:taxa_program:20190417123456p:plain
今回構築したアーキテクチャ概要図

構築手順

今回は下記手順で構築しました。

  1. サブネットの作成
  2. セキュリティグループの作成
  3. Glue接続情報のIAMロール作成
  4. Glue接続情報の作成&接続テスト
  5. CrawlerのIAMロール作成
  6. Crawlerの作成&実行
  7. S3バケットの作成
  8. JobのIAMロール作成
  9. Jobの作成&実行

Glue接続情報とはData Storesへ接続するためのプロパティ情報です。これを元にClawlerを作成していきます。 そしてこのClawlerを実行してRDSのメタ情報を取得し、取得したメタ情報を元にJobでクエリを発行する、という流れです。

ここで押さえておきたい事としては、

  • Glueを利用する際には、最低限として「接続情報」「Crawler」「Job」の3つが必要だということ
  • そしてそれぞれに対してIAMロールが必要(接続情報用、Crawler用、Job用)だということ

だと思います。

構築する際のポイント

上記手順の中でも苦労したポイントについてお話します。大別すると2点です。

  • Glueには自己参照ルールが必要だった(セキュリティグループの話)
  • RDSを用いたユースケースがなかった(Jobの話)

Glueには自己参照ルールが必要だった件

手順「4. Glue接続情報の作成&接続テスト」で接続情報を作成しようとしたときに、適切なセキュリティグループを確認できません。JDBCの接続タイプを変更し、接続の追加を再試行しますというエラーメッセージが表示されました。

f:id:taxa_program:20190417150236p:plain
接続情報作成時のエラー

上記を解消するために、セキュリティグループに関しては下記のように設定する必要があります。

  • Glue 側のセキュリティグループ
    自己参照ルールにて TCP の全ポートからのインバウンドアクセスを許可する必要がある

  • Data Stores(RDS)側のセキュリティグループ
    Glueから3306ポートにアクセスできるようにする必要がある

今回の場合は、

  1. RDSにアタッチするセキュリティグループ(SG-A)とGlueにアタッチするセキュリティグループ(SG-B)を作成する。(新規作成しなくても良い。2つのSGを分けることが重要)
  2. SG-Aのインバウドルールに「TCP、ポート:3306、ソース:SG-B」を追加する。
  3. SG-Bのインバウドルールに「TCP、ポート:0-65536、ソース:SG-B」を追加する。(こちらが自己参照ルール)

という手順で設定しました。

RDS for MySQLを用いたユースケースがなかった件

冒頭でも述べましたが、GlueのJobはApache Spark ETL ジョブPython シェルジョブの2種類から選択することができます。

今回は「学習コストが低い」「分散処理するほどの大量データではない」などの観点から、Pythonシェルジョブを用いることにしました。

知の宝庫であるインターネットを駆使してユースケースを探しましたが、RDSからデータを取得するユースケースは見当たらず・・・。
公式ドキュメントなどにRedShiftを利用した事例はいくつかありました)

AWS サポートの力も借りつつ試行錯誤した結果、MySQL用のライブラリを外部よりインポートすることで実現できたので、次項でその手順をご紹介できればと思います。

PythonShellジョブからMySQLを利用する手順

Python Shellで外部のライブラリを使用したい場合はeggファイル利用する必要があります。
下記が大まかな手順です。

  1. eggファイルを作成
  2. 作成したeggファイルをS3にアップロード
  3. eggファイルをジョブに連携

こうすることで、JobからMySQLが利用可能になります。
(eggファイルの作成は、ローカルでも任意のインスタンス上でも可能ですが、python2.7がインストールされている必要があります)

1. git cloneで下記のPyMySQLライブラリをコピー

これは任意の場所にcloneすればOKです。

$ git clone https://github.com/PyMySQL/PyMySQL 

2. 任意の場所にpackage作成用のディレクトリを作成

$ mkdir package
$ cd package

3. packageを作成(python2.7.15で実行)

$ pip install PasteScript
$ paster create -t basic_package pymysql

設定項目は、全てデフォルトで進めました。

4. ソースコードを配置

1.でgit clone したディレクトリのpymysqlディレクトリの内容を、2.で作成したpackageディレクトリ配下にすべてコピーします。

$ cp -pr <git cloneしたパス>/PyMySQL/pymysql/* <2.でpackageディレクトリを作成したパス>/package/pymysql/pymysql/

※__init__.pyについては、上書き保存します。

この時点で、ディレクトリ構成は下記のようになると思います。

f:id:taxa_program:20190417184915p:plain
ディレクトリ構成

5. eggファイルを作成

packageディレクトリの配下(setup.pyファイルが作成されているディレクトリ)で、以下コマンドを実行します。
これでeggファイルが作成されます。

$ ls
-> pymysql  pymysql.egg-info  setup.cfg  setup.py 

$ python setup.py  bdist_egg
$ ls
-> build  dist  pymysql  pymysql.egg-info  setup.cfg  setup.py 

6. distディレクトリの下に、eggファイルが出来ていることを確認

$ cd dist
$ ls
-> pymysql-0.1.dev0-py2.7.egg

下記のようなeggファイルが作成されていれば問題ありません。

f:id:taxa_program:20190417185007p:plain
ディレクトリ構成

以上でインポートするeggファイルの作成は終了です。

7. 作成したeggファイルをs3にアップロード

CLIを使わずとも、マネジメントコンソール上からアップロードしても問題ありません。

$ aws s3 cp ./pymysql-0.1.dev0-py2.7.egg <s3パス>/pymysql-0.1.dev0-py2.7.egg

8. GlueのPython Shell Jobを設定

マネジメントコンソール上でジョブの作成を行います。 Pythonライブラリパスに 7.でアップロードした.eggファイルを指定してジョブを作成します。

f:id:taxa_program:20190417185200p:plain
Job作成時にPythonライブラリパスを指定

以上でMySQLが利用できるPythonShellジョブの作成は終了です。

スクリプト

参考までに、今回作成したスクリプトを公開します。 (一部、値をマスクしています)

import boto3
import sys
import csv
import pymysql.cursors
import pandas as pd
import re
from io import StringIO, BytesIO

connection = pymysql.connect(
    host='XXXXXXXXXX.rds.amazonaws.com',
    user='user',
    db='dbname',
    password='password',
    charset='utf8mb4',
    cursorclass=pymysql.cursors.DictCursor
    )

sql = "SELECT column1, column2 FROM table WHERE 0 = 0 ORDER BY column1 desc"
df = pd.read_sql(sql=sql, con=connection)

# Delete tab char
df['column1'] = df['column1'].apply(lambda x: x.replace("\t",""))

# Replace CR char
df['column1'] = df['column1'].apply(lambda x: x.replace("\n"," "))

# Convert DF to TSV format
csv_buffer = BytesIO()
df.to_csv(csv_buffer, sep='\t', encoding="UTF8") #TSV出力

# output to S3 as CSV file
bucket = "s3-bucket"
key = "hoge/hogehoge.tsv"
s3 = boto3.resource('s3')
obj = s3.Object(bucket, key)
s3response = obj.put(Body=csv_buffer.getvalue())

最後に

テンポ良くいくつもりが盛りだくさんになってしまいました。

現段階ではGlueを用いてRDSからデータ抽出するユースケースがネット上にないと思うので、この記事が一人でも多くの方の参考になれば幸いです!