はじめに
前回の記事ではLambdaとAPI Gatewayを組み合わせる方法を紹介しました。今回の記事では、引き続きLambdaの掘り下げで、Amazon SQSと組み合わせてキューを処理する方法を紹介します。
今回紹介する内容
今回の記事では以下の内容について紹介します。
- Amazon SQSでキューを作成する
- Lambda関数でキューにメッセージを送る
- キューの受信をトリガーにLambda関数を実行する
- IAM周り
Amazon SQSとは
Amazon SQSはフルマネージドなメッセージキューイングサービスです。自分でサーバやミドルウェアなどを準備することなくすぐに使い始めることができ、他のAWSサービスとの連携が容易に行えるなどのメリットがあります。
Amazon SQSについてより詳しく知りたい方は、こちらなどをご覧ください。
実際に動かしてみる
それでは、AWS上に環境を構築してキューにメッセージを送ってみましょう。
SQSでキューを作成する
まず初めに、SQSにキューを作成します。
マネジメントコンソールのAmazon SQSのページを開き、「キューを作成」をクリックします。
キューの作成では、最初にキューのタイプを選択します。キューには、「標準」と「FIFO」の2種類のタイプがあります。
簡単に説明すると、「FIFO」タイプではメッセージの順番が保証され、メッセージの配信も厳密に1回のみ行われます。それに比べて、標準タイプではこのあたりが保証されませんが、スループットや料金面で優れています。
今回は順番などに制限はありませんので、標準を選択します。
「設定」の項目では、キューの制御について様々な設定を行います。
- 可視性タイムアウト:メッセージを取得してから再度同じメッセージを取得できるようになるまでの時間
- 配信遅延:コンシューマ(メッセージの受信者)の処理が完了する前にメッセージがキューに入った場合に配信を遅延させる時間
- メッセージ受信待機時間:0の場合はショートポーリング、1以上の場合はロングポーリング
「許可ポリシーの再実行」と「デッドレターキュー」では、デッドレターキューに関する設定を行います。
デッドレターキューは、メッセージの配信が失敗し処理に失敗した場合にそのメッセージを再利用するために使用されます。
今回は特に設定は行いません。
これでキューの設定は完了です。「キューを作成」を押して作成します。
LambdaでMessage送信関数を作成する
今回は2種類のLambda関数を作成します。
1つ目の関数では、先ほど作成したキューにメッセージを送ります。
2つ目の関数では、キューにメッセージが送られるのをトリガーに起動し、そのメッセージをログに出力します。
sendMessage関数を作成する
関数の作成については、前回紹介したため割愛します。
関数のコードは以下のとおりです。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | package main import ( "os" "github.com/aws/aws-lambda-go/lambda" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sqs" "github.com/aws/aws-sdk-go/service/sqs/sqsiface" ) var ( sqsSvc sqsiface.SQSAPI ) func init() { sess := session.Must(session.NewSession()) sqsSvc = sqs.New(sess, aws.NewConfig().WithRegion("ap-northeast-1")) } func handler() error { queueURL := os.Getenv("QUEUE_URL") _, err := sqsSvc.SendMessage(&sqs.SendMessageInput{ QueueUrl: aws.String(queueURL), MessageBody: aws.String("Hello World"), }) if err != nil { return err } return nil } func main() { lambda.Start(handler) } |
環境変数に設定したキューのURLに向けて
SendMessage()
関数でメッセージを送ります。
このコードをビルドしてZIP化したものをLambda(sendMessage)にアップロードします。
IAMを修正する
このままだとキューにアクセスする権限がないので関数がエラーになります。そこで、Lambda関数作成時に作られるIAMロールにキューへのアクセス権限を追加します。
追加する権限は以下のとおりです。
1 2 3 4 5 6 7 8 9 10 11 | { "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": "sqs:SendMessage", "Resource": "arn:aws:sqs:ap-northeast-1:XXXX:TestQueue" } ] } |
権限修正後にテスト実行すると、先程作ったキューにメッセージが送られます。
LambdaでMessage受信関数を作成する
2つ目のLambdaを作成し、キューにLambda関数トリガーを設定します。
getMessage関数を作成する
関数のコードは以下のとおりです。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | package main import ( "context" "fmt" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(ctx context.Context, sqsEvent events.SQSEvent) error { for _, message := range sqsEvent.Records { fmt.Printf("Message ID: %s\n", message.MessageId) fmt.Printf("Message Body: %s\n", message.Body) } return nil } func main() { lambda.Start(handler) } |
ハンドラの引数として
SQSEvent型
を指定します。
SQSEvent.Records
が送信したメッセージにあたり(今回は1件のみ)、それぞれ
ID
と
Body
を持ちます。
こちらもLambda(getMessage)にアップロードします。
IAMを修正する
sendMessage関数と同様に、キューへのアクセス権限を付与する必要があります。この関数にアタッチされているIAMロールに以下の権限を追加します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | { "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "sqs:ReceiveMessage", "sqs:DeleteMessage", "sqs:GetQueueAttributes" ], "Resource": "arn:aws:sqs:ap-northeast-1:XXXX:TestQueue" } ] } |
キューにLambda関数トリガーを設定する
このままだとキューにメッセージが送られてもgetMessage関数が実行されません。そこで、キューにLambda関数トリガーを設定します。
作成したキューのページを開いて、「Lambdaトリガー」のタブにある「Lambdaトリガーを設定」を押します。
「Lambda関数」getMessage関数を選択して「保存」を押します。
これでキューごgetMessage関数が関連付けれられるので、getMessage関数のページに戻ってトリガーを確認します。以下のようになっているはずです。
これ以降、sendMessage関数を実行すると、キューにメッセージが送られてそれをトリガーにgetMessage関数が実行されます。
CloudWatchでログを見てみると、このようなログが表示されるはずです。
さいごに
前回に引き続きLambdaについて深掘りして、Amazon SQSと連携させる方法を紹介しました。