はじめに
AWSを使っていて、なにか非同期な処理を実行したい場合、SQSからLambdaをキックする方法が最も一般的な方法の一つではないかと思います。
しかし、何らかの事情(15分以内に終わらないなど)により、LambdaではなくECSタスクとして実行したいケースも存在するのではないでしょうか。
そのようなケースでは、EventBridge Pipesを使うことで容易に実現することができますので、EventBridge PipesとSQSとECSを組み合わせた場合の基本的な構成について紹介します。
EventBridge Pipesとは
EventBridge Pipesは、Producer/Consumer型のメッセージング処理を提供するAWSのサービスで、SQSやDynamoDBなどの様々なサービスとLambda、ECS、API Gatewayなどの様々なサービスを結びつけることができます。
EventBridge Pipesを構成する要素としては、以下のようなものがあります。
- ソース
- フィルタリング
- エンリッチメント(プリプロセス)
- ターゲット
EventBridge Pipesは、様々なソースからイベントデータを受け取り、必要に応じてそのデータをフィルタリングおよびエンリッチメントして、ターゲットに送信します。
フィルタリングとエンリッチメントについてはオプションとなっており、最低限、ソースとターゲットを指定する必要があります。
ソース
ソースは一連の処理の起点となるサービスのことです。
ソースとして指定できるサービスは以下のとおりです。
- Kinesis
- SQS
- DynamoDB
- MQ
- Apache Kafka
- MSK
フィルタリング
指定したフィルター条件を満たさないイベントデータを除外することができます。
フィルタリングで指定できるイベントパターンには以下のようなものがあります。
- プレフィックスマッチング
- サフィックスマッチング
- 「以外」マッチング(ルールで指定されているもの以外のすべて)
- 数値マッチング(特定の範囲内)
- IPアドレスマッチング(CIDRで指定)
- 存在マッチング(リーフノードに特定のフィールドを持たないすべて)
- 大文字小文字を区別しないマッチング
- 複数のマッチング(複数のマッチングルールの組み合わせ)
- ORマッチング
イベントパターンはJSON形式で指定します。例えば、
detail.status
が
stopped
および
overloade
以外のイベントデータにマッチさせるイベントパターンの指定は以下のようになります。
1 2 3 4 5 6 7 | { "detail": { "state": [{ "anything-but": ["stopped", "overloaded"] }] } } |
ソースがSQSの場合、フィルター条件を満たさない場合は自動的にキューから削除されます。
また、メッセージのbodyの形式が文字列かJSONかによってイベントパターンの指定の仕方が変わるので、正しく指定しないと予期せぬ動作をする場合があります。
詳しくは公式ドキュメントをご覧ください。
エンリッチメント
エンリッチメントでは、ソースからのイベントデータをターゲットに送信する前に任意の処理を挟むことができます。
エンリッチメントとして指定できるサービスは以下のとおりです。
- Lambda
- Step Function
- API Gateway
- API送信先
エンリッチメントは同期的に呼び出されます。また、レスポンスサイズは最大で6MBまでに制限されています。
ターゲット
ターゲットはイベントデータを最終的に処理するサービスで、以下のようなサービスを指定することができます(一部抜粋)
- API Gateway
- バッチジョブのキュー
- CloudWatchのロググループ
- Lambda
- ESCタスク
ソースがSQSのように順番を強制しない場合、EventBridgeでは、ターゲットを同期的、または非同期的に呼び出すことができます(デフォルトでは同期的に呼び出される)。
ECSタスクを起動する際にパラメータを渡したい場合、
ContainerOverrides
の
Command
でコンテナに渡すことができます。
この設定に関しては、現時点ではコンソールからは行えないので、AWS CLIなどで設定を行います。
EventBridge Pipesを使うメリット
EventBridge Pipesを使うと、サービス間を疎に保つことができ、IAMで許可する権限を絞ることができます。
また、SQSとLambdaを組み合わせて非同期処理を実現する場合でも、EventBridge Pipesを使うことで、Lambdaの処理の中でSQSの操作をしなくて良くなる利点があります。
EventBridge Pipesの設定
それでは、実際のコンソールを使って構築していきます。
パイプの作成
AWSのコンソールでEventBridgeのページを開き、「パイプの作成」をクリックします。
「パイプ名」を入力し、「ソース」から「SQS」を選択し、「SQSキュー」を選択します(SQSキューは予め作成しておきます)。
「ターゲット」をクリックし、「ターゲットサービス」から「ECSクラスター」を選択し、「クラスター」や「タスク定義」などECSでタスクを立ち上げる際に設定するような項目を設定していきます(ECSの設定については、以前の記事をご覧ください)。
また、サブネットとセキュリティグループについては、カンマ区切りで入力します。
「ターゲット入力トランスフォーマー」を画像のように設定します。これはオプショナルなのでやらなくても大丈夫です。
設定が終わったら「パイプを作成」をクリックするとパイプが作成されます。
AWS CLIでパイプを更新する
先程説明した通り、SQSからのメッセージ中のパラメータをECSタスクに渡すには、
ContainerOverrides
の
Command
にパラメータを入れる必要があります。これはコンソールからは設定できないので、以下のようにAWS CLIで設定を行います。
1 | % aws pipes update-pipe --cli-input-json file://update.json |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | { "Name": "ecs-pipe", "RoleArn": "arn:aws:iam::048414909692:role/service-role/Amazon_EventBridge_Pipe_ecs-pipe_bd32b832", "TargetParameters": { "EcsTaskParameters": { "EnableECSManagedTags": true, "EnableExecuteCommand": false, "LaunchType": "FARGATE", "NetworkConfiguration": { "awsvpcConfiguration": { "AssignPublicIp": "ENABLED", "SecurityGroups": [ "sg-08a80c3f9a0cfdd77" ], "Subnets": [ "subnet-079116e8cbcd0a640", "subnet-065b2b720b039523a" ] } }, "Overrides": { "ContainerOverrides": [ { "Command": [ "$.body" ], "Name": "go" } ] }, "TaskCount": 1, "TaskDefinitionArn": "arn:aws:ecs:ap-northeast-1:048414909692:task-definition/pipsSample" }, "InputTemplate": "{\n \"body\": <$.body>\n}" } } |
SQSからECSタスクを起動する
それでは、実際にSQSにメッセージを送信して、ECSタスクが起動するか確認します。
SQSの画面を開いて、作成したキューの画面の「キューの送受信」をクリックします。
「メッセージ本文」に入力して「メッセージ送信」をクリックします。
設定がうまくできていると、ECSの「クラスター」の「タスク」タブにタスクが表示されます。
タスクのログを見てみると、SQSから送信したメッセージ「foo」が出力されています。
ちなみに、ここで起動したタスクは、コマンド引数を表示するシンプルなコードです。
1 2 3 4 5 6 7 8 9 10 11 | package main import ( "fmt" "os" ) func main() { args := os.Args[1:] fmt.Println(args) } |
さいごに
EventBridge PipesとSQSとECSを使ったメッセージング処理の基本的な構築について紹介しました。
実際のサービスでは、同時に起動するタスクの上限を設定したり、フィルタリングやエンリッチメントなどを活用するなど、より高度な使い方をすることになるのではと思います。
フィルタリングとエンリッチメントについても少し触ってみましたが、うまく行かないところが多く、なれるまでもう少し触ってみる必要がありそうです。
また、以前はEventBridge Pipesのログを直接出力することができなかったので、デバッグに苦労していたようですが、最近の更新でCloudWatch Logsに出力できるようになりました。
EventBridge Pipesは、まだまだ発展途上のサービスだと思いますので、これからもっと便利に使い易くなっていくと嬉しいです。