フリーランチ食べたい

機械学習/データ解析/フロントエンド/バックエンド/インフラ

AWS BatchとServerlessを組み合わせて高速に定期実行バッチやバッチAPIを作る

AWS Lambdaで行えないような重い処理はAWS Batchを使うと簡単に行うことができますが、定期実行したりエンドポイントを作る仕組みはAWS BatchにはまだなくCloudWatchやAPI Gatewayと組み合わせる必要がありますがServerlessと組み合わせることで簡単に実現できるので紹介します。

f:id:ikedaosushi:20190427105206p:plain

つくるもの

今回は次のサンプルを作ってみます。

  • 毎日AWS Batchで実行する定期バッチ
  • AWS Batchを実行するAPI

環境

  • serverless: 1.41.1
  • aws-cli: 1.16.144

ファイル構成

最終的なファイル構成は次のようになります。 AWS Batchに関するファイルだけフォルダを分けています。

 tree -L 2
├── batch
│   ├── app.dockerfile
│   ├── ecr_deploy.sh
│   └── sample.py
├── handler_.py
└── serverless.yml

動かすバッチ

今回はサンプルとして引数を表示するスクリプトをバッチとして実行します。

batch/samle.py

import sys

if __name__ == "__main__":
    print(sys.argv)

事前準備: ECRにDockerイメージの登録

事前準備として、ECRにDockerイメージを登録しておきます。

ECRリポジトリを作成します。

aws ecr create-repository --repository-name serverless-batch-example
# {
#     "repository": {
#         "repositoryArn": "arn:aws:ecr:ap-northeast-1:xxxxxxxxxxxxx:repository/serverless-batch-example",
#         "registryId": "xxxxxxxxxxxxx",
#         "repositoryName": "serverless-batch-example",
#          # これをメモ
#         "repositoryUri": "xxxxxxxxxxxxx.dkr.ecr.ap-northeast-1.amazonaws.com/serverless-batch-example",
#         "createdAt": 1556336732.0
#     }
# }

作成できました。repositoryUriは使うのでメモっておきましょう。 それではDockerイメージを作って登録します。

次のようにDockerfileとデプロイ用のshell scriptを作成します。

batch/app.docker

FROM python:3.7
ADD sample.py sample.py

batch/ecr_push.sh

#!/bin/bash

# Variables
IMAGE=serverless-batch-example
TAG=dev
DOCKERFILE=app.dockerfile
## さっきメモしたrepositoryUriを貼り付け
ECR_URI=xxxxxxxxxxxxxx.dkr.ecr.ap-northeast-1.amazonaws.com/serverless-batch-example

# Build image
docker build -t "${IMAGE}:${TAG}" -f "${DOCKERFILE}" .

# Docker login
$(aws ecr get-login --no-include-email --region ap-northeast-1)

# Tag that image
docker tag "${IMAGE}:${TAG}" "${ECR_URI}:${TAG}"

# Push
docker push "${ECR_URI}:${TAG}"

あとはこのshell scriptを実行します。

cd batch
chmod +x ecr_push.sh
./ecr_push.sh
# => ecrにDockerイメージのデプロイが行われる

コンソールから登録されていることを確認できます。

f:id:ikedaosushi:20190427191513p:plain

AWS Batch/Lambda実装

それでは前置きが長くなりましたが、Lambdaのアプリケーションを作成していきます。

sls create -t aws-python3 -n serverless-batch-example

CloudFormation部分

ServerlessではresourceフィールドでCloudFormationを使ってAWS BatchなどのStackを作成できるので活用しました。 設定にあたって以下の記事を参考にさせてもらいました。

ちょっと長くなりますが、serverless.ymlは次のようになります。前提として VPC・Subnet・Security Groupは既存のものを使う という設計にしています。 クラスメソッドさんのブログではVPCから作っていましたが、どちらかというとVPCなどはすでにあるものを使うほうがユースケースとして多いと思ったので、今回そうしました。 長いですが、皆さんの環境に合わせないといけない部分はcustomフィールドにすべて集約させているので、そこだけ編集してもらえればあとは使い回せると思います。

serverless.yml

service: serverless-batch-example

# 設定はここだけ
custom:
  default_stage: dev
  region: ap-northeast-1
  stage: ${opt:stage, self:custom.default_stage}
  batch:
    compute_env: ${self:service}-compute-env-${self:custom.stage}
    job_queue: ${self:service}-job-queue-${self:custom.stage}
    job_queue_arn: !Ref SlsJobQueue
    job_definition: ${self:service}-job-definition-${self:custom.stage}
    commands:
      - python
      - sample.py
      - Ref::first_arg # 引数のプレースホルダを設定
      - Ref::second_arg
  subnet_ids: subnet-xxxxxxxx,subnet-xxxxxxx,subnet-xxxxxxxxx # 設定したいsubnetを記載
  security_groups: sg-xxxxxxxxxxxxxxx # 設定したいsecurity groupを記載
  # メモっておいたrepositoryUriを設定(タグがある場合は :TAG で追加)
  repo_uri: xxxxxxxxxxxxxxx.dkr.ecr.ap-northeast-1.amazonaws.com/serverless-batch-example

provider:
  name: aws
  runtime: python3.7
  stage: ${self:custom.stage}
  region: ${self:custom.region}
  environment:
    BATCH_JOB_QUEUE_ARN: ${self:custom.batch.job_queue_arn}
    BATCH_JOB_DEFINITION: ${self:custom.batch.job_definition}
  iamRoleStatements:
    - Effect: Allow
      Action: batch:*
      Resource: arn:aws:batch:*

functions:
  batch:
    handler: handler.batch

resources:
  Parameters:
    SubnetIds:
      Type: List<AWS::EC2::Subnet::Id>
      Default: ${self:custom.subnet_ids}
    SecurityGroupIds:
      Type: List<AWS::EC2::SecurityGroup::Id>
      Default: ${self:custom.security_groups}
  Resources:
    BatchServiceRole:
      Type: AWS::IAM::Role
      Properties:
        AssumeRolePolicyDocument:
          Version: '2012-10-17'
          Statement:
          - Effect: Allow
            Principal:
              Service:
              - batch.amazonaws.com
            Action:
            - sts:AssumeRole
        ManagedPolicyArns:
          - arn:aws:iam::aws:policy/service-role/AWSBatchServiceRole
    ecsInstanceRole:
      Type: AWS::IAM::Role
      Properties:
        AssumeRolePolicyDocument:
          Version: '2012-10-17'
          Statement:
          - Effect: Allow
            Principal:
              Service:
              - ec2.amazonaws.com
            Action:
            - sts:AssumeRole
        ManagedPolicyArns:
          - arn:aws:iam::aws:policy/service-role/AmazonEC2ContainerServiceforEC2Role
    ecsInstanceProfile:
      Type: AWS::IAM::InstanceProfile
      Properties:
        Roles:
          - !Ref ecsInstanceRole
    SlsComputeEnv: # ComputeEnvironment
      Type: AWS::Batch::ComputeEnvironment
      Properties:
        Type: MANAGED
        ServiceRole: !GetAtt BatchServiceRole.Arn
        ComputeEnvironmentName: ${self:custom.batch.compute_env}
        ComputeResources:
          MaxvCpus: 256 # 最大vCPU数
          MinvCpus: 0 # 最小vCPU数
          SecurityGroupIds: !Ref SecurityGroupIds
          InstanceRole: !GetAtt ecsInstanceProfile.Arn
          Subnets: !Ref SubnetIds
          Type: EC2
          InstanceTypes:
            - optimal
        State: ENABLED
    SlsJobQueue: # JobQueue
      Type: AWS::Batch::JobQueue
      Properties:
        JobQueueName: ${self:custom.batch.job_queue}
        ComputeEnvironmentOrder:
          - Order: 1
            ComputeEnvironment: !Ref SlsComputeEnv
        State: ENABLED
        Priority: 1
    SlsJobDefinition: # JobDefinition
      Type: AWS::Batch::JobDefinition
      Properties:
        Type: container
        JobDefinitionName: ${self:custom.batch.job_definition}
        ContainerProperties:
          Command: ${self:custom.batch.commands}
          Memory: 4048
          Vcpus: 2
          Image: ${self:custom.repo_uri}

ポイントとしてはAWS Batchにjobを送信するにあたって、Job DefinitionのARNが必要なのでCloudFormationの Ref 関数を使って設定しています。 これでとりあえずデプロイするとAWS Batchの環境が作られます。

sls deploy

Compute Environment

f:id:ikedaosushi:20190427215735p:plain

Job Queue

f:id:ikedaosushi:20190427215803p:plain

Job Definition

f:id:ikedaosushi:20190427215831p:plain

function部分

実行する関数は次のように実装しました。環境変数からAWS BatchのJobの情報を読んでboto3を使ってJobを送信するだけの関数です。

handler.py

import json
import os

import boto3

BATCH_JOB_QUEUE_ARN = os.environ.get("BATCH_JOB_QUEUE_ARN")
BATCH_JOB_DEFINITION = os.environ.get("BATCH_JOB_DEFINITION")

def batch(event, context):
    client = boto3.client('batch')
    job_name = "lambda_example_job"

    result = client.submit_job(
        jobName = job_name,
        jobQueue = BATCH_JOB_QUEUE_ARN,
        jobDefinition = BATCH_JOB_DEFINITION,
        parameters = {
            "first_arg": "hello",
            "second_arg": "world"
        }
    )

    response = {
        "statusCode": 200,
        "body": json.dumps(result)
    }

    return response

実装できたので、もう一度デプロイします。

sls deploy

デプロイできたら関数を呼び出してみましょう。

sls invoke -f batch

エラーが起こらなければjobが送信されているはずです。

f:id:ikedaosushi:20190427191926p:plain

定期実行化/API化

ここまでできれば、あとはfunctionsフィールドに追加するだけなので簡単です。 例えば次のように設定します。

serverless.yml

functions:
  batch:
    handler: handler.batch
  batch_daily: # 1日おきに実行
    handler: handler.batch
    events:
      - schedule: rate(1 day)
  batch_api: # APIエンドポイントを作成
    handler: handler.batch
    events:
      - http:
          path: batch
          method: get

定期実行

f:id:ikedaosushi:20190427221115p:plain

APIエンドポイント

f:id:ikedaosushi:20190427221207p:plain

無事できているようです。 さらに細かい設定に関してはServerlessのドキュメントをチェックしてみてください。

serverless.com

さいごに

AWS BatchとServerlessを使って高速に定期実行バッチを作ることができました。 Serverlessの良いところはCloudFormationで作成したResourceとLambdaで実装するアプリケーションの情報共有が簡単にできるところだと思います。 AWS Batchの使い所としては、簡単な処理はLambdaで実装しつつ、複雑なデータ処理など必ずLambdaでは足りない場合が出てくるので、そうしたときにLambdaからプロキシするような形で使っていくのが1つの良い使い方ではないかと思います。

今回使ったコードはすべてGitHubにあげてあります。 github.com

何かコメントや改善点などありましたら教えてください!