前回の記事ではLambdaとAPI Gatewayを組み合わせる方法を紹介しました。今回の記事では、引き続きLambdaの掘り下げで、Amazon SQSと組み合わせてキューを処理する方法を紹介します。
今回の記事では以下の内容について紹介します。
Amazon SQSはフルマネージドなメッセージキューイングサービスです。自分でサーバやミドルウェアなどを準備することなくすぐに使い始めることができ、他のAWSサービスとの連携が容易に行えるなどのメリットがあります。
Amazon SQSについてより詳しく知りたい方は、こちらなどをご覧ください。
それでは、AWS上に環境を構築してキューにメッセージを送ってみましょう。
まず初めに、SQSにキューを作成します。
マネジメントコンソールのAmazon SQSのページを開き、「キューを作成」をクリックします。
キューの作成では、最初にキューのタイプを選択します。キューには、「標準」と「FIFO」の2種類のタイプがあります。
簡単に説明すると、「FIFO」タイプではメッセージの順番が保証され、メッセージの配信も厳密に1回のみ行われます。それに比べて、標準タイプではこのあたりが保証されませんが、スループットや料金面で優れています。
今回は順番などに制限はありませんので、標準を選択します。
デッドレターキューは、メッセージの配信が失敗し処理に失敗した場合にそのメッセージを再利用するために使用されます。
今回は特に設定は行いません。
今回は2種類のLambda関数を作成します。
1つ目の関数では、先ほど作成したキューにメッセージを送ります。
2つ目の関数では、キューにメッセージが送られるのをトリガーに起動し、そのメッセージをログに出力します。
関数の作成については、前回紹介したため割愛します。
関数のコードは以下のとおりです。
package main import ( "os" "github.com/aws/aws-lambda-go/lambda" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sqs" "github.com/aws/aws-sdk-go/service/sqs/sqsiface" ) var ( sqsSvc sqsiface.SQSAPI ) func init() { sess := session.Must(session.NewSession()) sqsSvc = sqs.New(sess, aws.NewConfig().WithRegion("ap-northeast-1")) } func handler() error { queueURL := os.Getenv("QUEUE_URL") _, err := sqsSvc.SendMessage(&sqs.SendMessageInput{ QueueUrl: aws.String(queueURL), MessageBody: aws.String("Hello World"), }) if err != nil { return err } return nil } func main() { lambda.Start(handler) }
環境変数に設定したキューのURLに向けてSendMessage()
関数でメッセージを送ります。
このコードをビルドしてZIP化したものをLambda(sendMessage)にアップロードします。
このままだとキューにアクセスする権限がないので関数がエラーになります。そこで、Lambda関数作成時に作られるIAMロールにキューへのアクセス権限を追加します。
追加する権限は以下のとおりです。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": "sqs:SendMessage", "Resource": "arn:aws:sqs:ap-northeast-1:XXXX:TestQueue" } ] }
権限修正後にテスト実行すると、先程作ったキューにメッセージが送られます。
2つ目のLambdaを作成し、キューにLambda関数トリガーを設定します。
関数のコードは以下のとおりです。
package main import ( "context" "fmt" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(ctx context.Context, sqsEvent events.SQSEvent) error { for _, message := range sqsEvent.Records { fmt.Printf("Message ID: %s\n", message.MessageId) fmt.Printf("Message Body: %s\n", message.Body) } return nil } func main() { lambda.Start(handler) }
ハンドラの引数としてSQSEvent型
を指定します。
SQSEvent.Records
が送信したメッセージにあたり(今回は1件のみ)、それぞれID
とBody
を持ちます。
こちらもLambda(getMessage)にアップロードします。
sendMessage関数と同様に、キューへのアクセス権限を付与する必要があります。この関数にアタッチされているIAMロールに以下の権限を追加します。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "sqs:ReceiveMessage", "sqs:DeleteMessage", "sqs:GetQueueAttributes" ], "Resource": "arn:aws:sqs:ap-northeast-1:XXXX:TestQueue" } ] }
このままだとキューにメッセージが送られてもgetMessage関数が実行されません。そこで、キューにLambda関数トリガーを設定します。
作成したキューのページを開いて、「Lambdaトリガー」のタブにある「Lambdaトリガーを設定」を押します。
「Lambda関数」getMessage関数を選択して「保存」を押します。
CloudWatchでログを見てみると、このようなログが表示されるはずです。
前回に引き続きLambdaについて深掘りして、Amazon SQSと連携させる方法を紹介しました。