カテゴリー: Server

EventBridge Pipesを使ってSQSからECSタスクを起動する

はじめに

AWSを使っていて、なにか非同期な処理を実行したい場合、SQSからLambdaをキックする方法が最も一般的な方法の一つではないかと思います。

しかし、何らかの事情(15分以内に終わらないなど)により、LambdaではなくECSタスクとして実行したいケースも存在するのではないでしょうか。

そのようなケースでは、EventBridge Pipesを使うことで容易に実現することができますので、EventBridge PipesとSQSとECSを組み合わせた場合の基本的な構成について紹介します。

EventBridge Pipesとは

EventBridge Pipesは、Producer/Consumer型のメッセージング処理を提供するAWSのサービスで、SQSやDynamoDBなどの様々なサービスとLambda、ECS、API Gatewayなどの様々なサービスを結びつけることができます。

EventBridge Pipesを構成する要素としては、以下のようなものがあります。

  • ソース
  • フィルタリング
  • エンリッチメント(プリプロセス)
  • ターゲット

EventBridge Pipesは、様々なソースからイベントデータを受け取り、必要に応じてそのデータをフィルタリングおよびエンリッチメントして、ターゲットに送信します。

フィルタリングとエンリッチメントについてはオプションとなっており、最低限、ソースとターゲットを指定する必要があります。

ソース

ソースは一連の処理の起点となるサービスのことです。

ソースとして指定できるサービスは以下のとおりです。

  1. Kinesis
  2. SQS
  3. DynamoDB
  4. MQ
  5. Apache Kafka
  6. MSK

フィルタリング

指定したフィルター条件を満たさないイベントデータを除外することができます。

フィルタリングで指定できるイベントパターンには以下のようなものがあります。

  • プレフィックスマッチング
  • サフィックスマッチング
  • 「以外」マッチング(ルールで指定されているもの以外のすべて)
  • 数値マッチング(特定の範囲内)
  • IPアドレスマッチング(CIDRで指定)
  • 存在マッチング(リーフノードに特定のフィールドを持たないすべて)
  • 大文字小文字を区別しないマッチング
  • 複数のマッチング(複数のマッチングルールの組み合わせ)
  • ORマッチング

イベントパターンはJSON形式で指定します。例えば、detail.statusstoppedおよびoverloade以外のイベントデータにマッチさせるイベントパターンの指定は以下のようになります。

{
  "detail": {
    "state": [{
      "anything-but": ["stopped", "overloaded"]
    }]
  }
}

ソースがSQSの場合、フィルター条件を満たさない場合は自動的にキューから削除されます。

また、メッセージのbodyの形式が文字列かJSONかによってイベントパターンの指定の仕方が変わるので、正しく指定しないと予期せぬ動作をする場合があります。

詳しくは公式ドキュメントをご覧ください。

エンリッチメント

エンリッチメントでは、ソースからのイベントデータをターゲットに送信する前に任意の処理を挟むことができます。

エンリッチメントとして指定できるサービスは以下のとおりです。

  • Lambda
  • Step Function
  • API Gateway
  • API送信先

エンリッチメントは同期的に呼び出されます。また、レスポンスサイズは最大で6MBまでに制限されています。

ターゲット

ターゲットはイベントデータを最終的に処理するサービスで、以下のようなサービスを指定することができます(一部抜粋)

  • API Gateway
  • バッチジョブのキュー
  • CloudWatchのロググループ
  • Lambda
  • ESCタスク

ソースがSQSのように順番を強制しない場合、EventBridgeでは、ターゲットを同期的、または非同期的に呼び出すことができます(デフォルトでは同期的に呼び出される)。

ECSタスクを起動する際にパラメータを渡したい場合、ContainerOverridesCommandでコンテナに渡すことができます。

この設定に関しては、現時点ではコンソールからは行えないので、AWS CLIなどで設定を行います。

EventBridge Pipesを使うメリット

EventBridge Pipesを使うと、サービス間を疎に保つことができ、IAMで許可する権限を絞ることができます。

また、SQSとLambdaを組み合わせて非同期処理を実現する場合でも、EventBridge Pipesを使うことで、Lambdaの処理の中でSQSの操作をしなくて良くなる利点があります。

EventBridge Pipesの設定

それでは、実際のコンソールを使って構築していきます。

パイプの作成

AWSのコンソールでEventBridgeのページを開き、「パイプの作成」をクリックします。

「パイプ名」を入力し、「ソース」から「SQS」を選択し、「SQSキュー」を選択します(SQSキューは予め作成しておきます)。

「ターゲット」をクリックし、「ターゲットサービス」から「ECSクラスター」を選択し、「クラスター」や「タスク定義」などECSでタスクを立ち上げる際に設定するような項目を設定していきます(ECSの設定については、以前の記事をご覧ください)。

また、サブネットとセキュリティグループについては、カンマ区切りで入力します。

「ターゲット入力トランスフォーマー」を画像のように設定します。これはオプショナルなのでやらなくても大丈夫です。

設定が終わったら「パイプを作成」をクリックするとパイプが作成されます。

AWS CLIでパイプを更新する

先程説明した通り、SQSからのメッセージ中のパラメータをECSタスクに渡すには、ContainerOverridesCommandにパラメータを入れる必要があります。これはコンソールからは設定できないので、以下のようにAWS CLIで設定を行います。

% aws pipes update-pipe --cli-input-json file://update.json
{
    "Name": "ecs-pipe",
    "RoleArn": "arn:aws:iam::048414909692:role/service-role/Amazon_EventBridge_Pipe_ecs-pipe_bd32b832",
    "TargetParameters": {
        "EcsTaskParameters": {
            "EnableECSManagedTags": true,
            "EnableExecuteCommand": false,
            "LaunchType": "FARGATE",
            "NetworkConfiguration": {
                "awsvpcConfiguration": {
                    "AssignPublicIp": "ENABLED",
                    "SecurityGroups": [
                        "sg-08a80c3f9a0cfdd77"
                    ],
                    "Subnets": [
                        "subnet-079116e8cbcd0a640",
                        "subnet-065b2b720b039523a"
                    ]
                }
            },
            "Overrides": {
                "ContainerOverrides": [
                        {
                                "Command": [
                                        "$.body"
                                ],
                                "Name": "go"
                        }
                ]
            },
            "TaskCount": 1,
            "TaskDefinitionArn": "arn:aws:ecs:ap-northeast-1:048414909692:task-definition/pipsSample"
        },
        "InputTemplate": "{\n  \"body\": <$.body>\n}"
    }
}

SQSからECSタスクを起動する

それでは、実際にSQSにメッセージを送信して、ECSタスクが起動するか確認します。

SQSの画面を開いて、作成したキューの画面の「キューの送受信」をクリックします。

「メッセージ本文」に入力して「メッセージ送信」をクリックします。

設定がうまくできていると、ECSの「クラスター」の「タスク」タブにタスクが表示されます。

タスクのログを見てみると、SQSから送信したメッセージ「foo」が出力されています。

ちなみに、ここで起動したタスクは、コマンド引数を表示するシンプルなコードです。

package main

import (
 "fmt"
 "os"
)

func main() {
 args := os.Args[1:]
 fmt.Println(args)
}

さいごに

EventBridge PipesとSQSとECSを使ったメッセージング処理の基本的な構築について紹介しました。

実際のサービスでは、同時に起動するタスクの上限を設定したり、フィルタリングやエンリッチメントなどを活用するなど、より高度な使い方をすることになるのではと思います。

フィルタリングとエンリッチメントについても少し触ってみましたが、うまく行かないところが多く、なれるまでもう少し触ってみる必要がありそうです。

また、以前はEventBridge Pipesのログを直接出力することができなかったので、デバッグに苦労していたようですが、最近の更新でCloudWatch Logsに出力できるようになりました。

EventBridge Pipesは、まだまだ発展途上のサービスだと思いますので、これからもっと便利に使い易くなっていくと嬉しいです。

おすすめ書籍

Hiroki Ono

シェア
執筆者:
Hiroki Ono
タグ: AWS

最近の投稿

Goの抽象構文木でコードを解析する

はじめに Goでアプリケーショ…

22時間 前

フロントエンドで動画デコレーション&レンダリング

はじめに 今回は、以下のように…

4週間 前

Goのクエリビルダー goqu を使ってみる

はじめに 最近携わっているとあ…

1か月 前

【Xcode15】プライバシーマニフェスト対応に備えて

はじめに こんにちは、suzu…

2か月 前