フリーランチ食べたい

機械学習/データ解析/フロントエンド/バックエンド/インフラ

SageMakerをFargateで定期実行する環境をServerless Frameworkで一発でつくる

機械学習のモデルを学習する、あるいは推論APIを作成するにあたってSageMakerはとても便利ですが、 定期実行する機能をSageMaker自身では持っていない という問題があります。

このエントリでは、Serverless Framework(以降、Serverless)を使って素早くSageMakerの定期実行環境を構築する方法を紹介します。

以下が構成図です。

f:id:ikedaosushi:20190804213225p:plain

この環境を構築は、下記を前提としているのでご留意ください。

  • VPC、サブネットは既に作成しているものを利用する
  • 学習用のデータはこの環境以外からアップロードされる

またコードはすべて以下のリポジトリにアップロードしており、これをベースに説明をしていきます。

github.com

環境

  • serverless framework: 1.49.0
  • sagemaker(Python SDK): 1.35.0
  • boto3: 1.9.199

CFでSageMaker、Fargate、S3環境を作成

Serverlessの resources にCloudFormationを書いてそれぞれの環境を作成します。 それぞれの命名などは custom に記載しています。

serverless.yml

custom:
  resource_base: ${self:service}-${self:custom.stage}
  fargate:
    dockerfile: train.dockerfile
    ecr_uri: ${self:custom.config.account_id}.dkr.ecr.ap-northeast-1.amazonaws.com/${self:custom.fargate.ecr_repo}
    tag: ${self:custom.stage}
    cluster_name: ${self:custom.resource_base}-ecs-cluster
    container_name: ${self:custom.resource_base}-ecs-container
    ecr_repo: ${self:service}-ecr-image
  s3:
    bucket: ${self:custom.resource_base}-bucket
  sagemaker:
    endpoint_name: ${self:custom.resource_base}-endpoint

前述したようにVPCとサブネットは事前に生成されている前提なので、 env.yml に記載してください。

env.yml

account_id: xxxxxxxxxxxxxx
vpc_id: vpc-xxxxxxxx
subnet_ids:
  - subnet-xxxxxxx
  - subnet-xxxxxxx

serverless.yml

custom:
  config: # ここだけ設定すればOK
    account_id: ${file(./env.yml):account_id}
    vpc_id: ${file(./env.yml):vpc_id}
    subnet_ids: ${file(./env.yml):subnet_ids}

CloudFormationでの環境構築に関しては、下のようにファイルを分けて記載しています。

serverless.yml

resources:
  - ${file(./cloudformations/sagemaker.yml)}
  - ${file(./cloudformations/fargate.yml)}
  - ${file(./cloudformations/s3.yml)}

それぞれのファイルをここに記載すると、かなり長くなってしまうのでGitHubをご参照ください。

scheduled-sagemaker-sample/cloudformations at master · ikedaosushi/scheduled-sagemaker-sample · GitHub

リソース構築はデプロイすればOKです。

sls deploy
# => 構築OK

うまくいけば下記のように作成されます。(ロールはスクショ省略してます)

S3

f:id:ikedaosushi:20190804222142p:plain

Fargate Cluster, Service

f:id:ikedaosushi:20190804222238p:plain

Fargate Task Definition

f:id:ikedaosushi:20190804222302p:plain

ECR Repository

f:id:ikedaosushi:20190804222359p:plain

データアップロード

f:id:ikedaosushi:20190804224729p:plain

S3リポジトリが作成されたのでデータをアップロードします。 今回はサンプルとしてUCI Machine Learning RepositoryのWine Qualityを用います。

archive.ics.uci.edu

前述したように、このデータは外部からアップロードされる前提なので、今回は適当なシェルスクリプトでアップロードします。

wget https://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv
aws s3 cp winequality-red.csv s3://scheduled-sagemaker-app-dev-bucket/inputs/raw/data.csv

データがアップロードできました。

f:id:ikedaosushi:20190804222102p:plain

ECR Repositoryにpush

f:id:ikedaosushi:20190804224822p:plain

次にFargateで使うDocker Imageをpushします。

Dockerファイルは前処理が行えて、Sagemakerを操作できるようなイメージにしておきます。

train.dockerfile

FROM python:3.7

RUN pip install -U pip boto3 sagemaker pandas numpy s3fs sklearn

WORKDIR /root/workspace
ADD train.py train.py
CMD ["python", "train.py"]

train.py は前処理と学習、デプロイを行うスクリプトで後述します。

ECRへのpushはServerless Puginを使って行う方法もありますが、複雑になるので、今回はコンソールでコマンドを確認しながらシェルスクリプトを書きました。

f:id:ikedaosushi:20190804223945p:plain

./continainer/push_ecr.sh

#!/bin/bash

# Variables
IMAGE=scheduled-sagemaker-app-ecr-image
TAG=dev # stageによって切り替える
DOCKERFILE=train.dockerfile
ECR_URI=xxxxxxxxx.dkr.ecr.ap-northeast-1.amazonaws.com/scheduled-sagemaker-app-ecr-image

# Build image
docker build -t "${IMAGE}:${TAG}" -f "${DOCKERFILE}" .

# Docker login
$(aws ecr get-login --no-include-email --region ap-northeast-1)

# Tag that image
docker tag "${IMAGE}:${TAG}" "${ECR_URI}:${TAG}"

# Push
docker push "${ECR_URI}:${TAG}"
cd container
chmod +x push_ecr.sh
./push_ecr.sh

コンソールでECRにデプロイできていることが確認できます。

f:id:ikedaosushi:20190804231555p:plain

前処理、学習、デプロイの詳細

f:id:ikedaosushi:20190804224851p:plain

前処理、学習、デプロイは前述したDocker Image内の train.py で行っています。

前処理はtrainデータとvalidationデータを分割し、SageMakerのデータ形式に合わせるだけ、学習はXGBoostで回帰モデルを作るのみになっています。

train.py

def preprocess():
    # load
    df = pd.read_csv(S3_INPUT_PATH, sep=';')
    # transform
    y_col = 'quality'
    x_cols = [c for c in df.columns if c != y_col]
    df = df[[y_col] + x_cols]
    # split
    train_data, val_data = train_test_split(df, test_size=(VAL_SIZE), random_state=RANDOM_STATE)
    # upload
    train_data.to_csv(S3_TRAIN_PATH, header=False, index=False)
    val_data.to_csv(S3_VALID_PATH, header=False, index=False)

def train():
    boto_session = boto3.Session(region_name=REGION_NAME)
    sagemaker_session = sagemaker.Session(boto_session=boto_session)
    xgb = sagemaker.estimator.Estimator(
        image_name=TRAIN_IMAGE_NAME,
        role=SM_ROLE,
        train_instance_count=TRAIN_INSTANCE_COUNT,
        train_instance_type=TRAIN_INSTANCE_TYPE,
        output_path=S3_OUTPUT_PATH,
        sagemaker_session=sagemaker_session
    )

    # train
    train_params = {
        "eta": 0.2,
        "objective":"reg:linear",
        "max_depth":"5",
        "eval_metric": 'rmse',
        "num_round":100
    }

    xgb.set_hyperparameters(**train_params)
    xgb.fit({
        'train': s3_input(s3_data=S3_TRAIN_PATH, content_type="csv"),
        'validation': s3_input(s3_data=S3_VALID_PATH, content_type="csv")
    })
    # deploy
    deploy_params = {
        "initial_instance_count": DEPLOY_INSTANCE_COUNT,
        "instance_type": DEPLOY_INSTANCE_TYPE,
        "endpoint_name": DEPLOY_ENDPOINT_NAME,
        "update_endpoint": True
    }
    xgb.deploy(**deploy_params)

if __name__ == "__main__":
    preprocess()
    train()

ハイパーパラメータは環境変数として渡しました。スクリプト全体はGitHubをご参照ください。

scheduled-sagemaker-sample/train.py at master · ikedaosushi/scheduled-sagemaker-sample · GitHub

Lambdaからの呼び出し

f:id:ikedaosushi:20190804225840p:plain

準備したFargateをLambdaから呼び出します。必要な情報は provider.environment で環境変数として渡します。

serverless.yml

provider:
  #...(略)
  environment:
    CLUSTER_NAME: ${self:custom.fargate.cluster_name}
    TASK_DEFINITION: !Ref ECSTaskDefinition
    ECS_CONTAINER_NAME: ${self:custom.fargate.container_name}
    S3_BUCKET: ${self:custom.s3.bucket}
    SM_ROLE: !GetAtt SageMakerServiceRole.Arn
    DEPLOY_ENDPOINT_NAME: ${self:custom.sagemaker.endpoint_name}
    REGION_NAME: ${self:custom.region}
    SUBNET_IDS: !Join
      - ","
      - ${self:custom.config.subnet_ids}

functions:
  train:
    handler: handler.train

handlerではboto3を使ってFargateを呼び出します。

handler.py

def train(event, context):
    fargate = boto3.client("ecs", region_name=REGION_NAME)
    resp = fargate.run_task(
        cluster=CLUSTER_NAME,
        taskDefinition=TASK_DEFINITION,
        launchType='FARGATE',
        networkConfiguration={
            'awsvpcConfiguration': {
                'subnets': SUBNET_IDS.split(",")
                , 'assignPublicIp': 'ENABLED'
            }
        },
        overrides={
            "containerOverrides": [
                {
                    'name': ECS_CONTAINER_NAME,
                    'environment': [
                        {"name": k, "value": v} for k, v in CONTAINER_ENV.items()
                    ]
                }
            ]
        }
    )

    url_base = "https://ap-northeast-1.console.aws.amazon.com/ecs/home?region=ap-northeast-1#/clusters/"
    cluster_name = CLUSTER_NAME
    task_id = resp['tasks'][0]['taskArn'].split("/")[-1]
    url = url_base + cluster_name + "/tasks/" + task_id + "/details"
    message = f"In order to check task, go to {url}"

    return {
        "ok": True, "message": message
    }

デプロイしたあと invoke して動作を確認します。

sls deploy
sls invoke -f train
# {
#    "ok": true,
#    "message": "In order to check task, go to https://ap-northeast-1.console.aws.amazon.com/ecs/home?region=ap-northeast-1#/clusters/scheduled-sagemaker-app-dev-ecs-cluster/tasks/05d35352-7db0-49f1-8d22-c15f2dbb42e1/details"
# }

コンソールからFargateのタスクが生成されて実行されていることが確認できます。

f:id:ikedaosushi:20190804233623p:plain

また同様にコンソールからSageMaker上で学習がされていることとエンドポイントが作成(or更新)されていることが確認できます。

f:id:ikedaosushi:20190804213059p:plain

f:id:ikedaosushi:20190804232517p:plain

定期実行

f:id:ikedaosushi:20190804230313p:plain

若干長い道のりになりましたが、ServerlessからSageMakerが実行できることが確認できたので、あとは functionsevents.schedule[] に設定するだけです。

serverless.yml

functions:
  train:
    handler: handler.train
    events:
      - schedule:
          rate: cron(0 15 * * ? *) # 毎朝AM0:00に実行
sls deploy

これで毎朝AM0:00にSageMakerの学習、デプロイをする定期バッチを作ることができました。

Clean Up

検証が終わって使ったものを消したい方向けです。 Serverlessはコマンドでほとんどのリソースは削除できます。

sls remove

SageMakerのEndpointはコマンドでは削除されないので、コンソールから削除してください。 Endpointは削除しない限り料金が発生するのでご注意ください。

f:id:ikedaosushi:20190804232619p:plain

さいごに

SageMakerの定期実行バッチをServerless FrameworkとFargateでつくる方法を紹介しました。Fargateの部分はAWS Batchに置き換えても同様に実現できます。AWS Batchの場合は以前近い内容のブログを書いたのでそちらなど参考にしていただけると思います。

blog.ikedaosushi.com

AWS BatchではなくFargateを使った理由は、ただ単にFargateの勉強をしたかっただけなので、カスタマイズ要件が特になければAWS Batchを使うかなと思います。

繰り返しになりますが、コードはすべてGitHubで公開しています。

github.com

間違いや質問などあれば気軽にコメントください。