Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

SQS trigger a Lambda function

broadway_sqs_lambda.livemd

SQS trigger a Lambda function

Mix.install([
  {:aws, "~> 0.13"},
  {:ex_aws, "~> 2.5"},
  {:ex_aws_lambda, "~> 2.1"},
  {:ex_aws_s3, "~> 2.5"},
  {:ex_aws_sqs, "~> 3.4"},
  {:ex_aws_sts, "~> 2.3"},
  {:kino, "~> 0.12.0"},
  {:hackney, "~> 1.20"},
  {:jason, "~> 1.4"},
  {:sweet_xml, "~> 0.7.4"},
  {:broadway, "~> 1.0"},
  {:broadway_sqs, "~> 0.7.3"}
])

認証情報の設定

access_key_id_input = Kino.Input.password("ACCESS_KEY_ID")
secret_access_key_input = Kino.Input.password("SECRET_ACCESS_KEY")
region_input = Kino.Input.text("REGION")

[
  access_key_id_input,
  secret_access_key_input,
  region_input
]
|> Kino.Layout.grid(columns: 3)
client =
  AWS.Client.create(
    Kino.Input.read(access_key_id_input),
    Kino.Input.read(secret_access_key_input),
    Kino.Input.read(region_input)
  )
auth_config = [
  access_key_id: Kino.Input.read(access_key_id_input),
  secret_access_key: Kino.Input.read(secret_access_key_input),
  region: Kino.Input.read(region_input)
]

Kino.nothing()
account_id =
  ExAws.STS.get_caller_identity()
  |> ExAws.request!(auth_config)
  |> then(& &1.body.account)

Lambda 関数の作成

client
|> AWS.IAM.create_role(%{
  "RoleName" => "sample-lambda-role",
  "AssumeRolePolicyDocument" =>
    Jason.encode!(%{
      "Statement" => [
        %{
          "Sid" => "STS202201051440",
          "Effect" => "Allow",
          "Principal" => %{
            "Service" => ["lambda.amazonaws.com"]
          },
          "Action" => "sts:AssumeRole"
        }
      ]
    })
})
client
|> AWS.IAM.create_policy(%{
  "PolicyName" => "sample-lambda-role-policy",
  "PolicyDocument" =>
    Jason.encode!(%{
      "Version" => "2012-10-17",
      "Statement" => [
        %{
          "Effect" => "Allow",
          "Action" => [
            "cloudwatch:PutMetricData",
            "logs:CreateLogStream",
            "logs:PutLogEvents",
            "logs:CreateLogGroup",
            "logs:DescribeLogStreams",
            "sqs:DeleteMessage",
            "sqs:GetQueueAttributes",
            "sqs:ReceiveMessage",
            "sqs:SendMessage"
          ],
          "Resource" => ["*"]
        }
      ]
    })
})
client
|> AWS.IAM.attach_role_policy(%{
  "RoleName" => "sample-lambda-role",
  "PolicyArn" => "arn:aws:iam::#{account_id}:policy/sample-lambda-role-policy"
})
File.write!("/tmp/sample.py", """
import boto3
import json
import time

from datetime import datetime

def handler(event, context):
    client = boto3.client("sqs")

    records = event.get("Records", [])

    print("records:", len(records))

    time.sleep(2)

    response = "no_records"
    if records:
        for record in records:
            request_body = json.loads(record["body"])
            response = request_body["message"]
            print("response:", response)
            callback_queue_url = request_body["callback_queue_url"]
            client.send_message(
                QueueUrl=callback_queue_url,
                MessageBody=response,
                MessageGroupId="sample",
                MessageDeduplicationId=str(datetime.now().timestamp())
            )

    return response
""")
:zip.create("/tmp/sample_lambda.zip", [~c"sample.py"], cwd: "/tmp")
bucket_name_input = Kino.Input.text("BUCKET_NAME")
bucket_name = Kino.Input.read(bucket_name_input)

"/tmp/sample_lambda.zip"
|> ExAws.S3.Upload.stream_file()
|> ExAws.S3.upload(bucket_name, "sample_lambda/sample_lambda.zip")
|> ExAws.request!(auth_config)
%ExAws.Operation.RestQuery{
  service: :lambda,
  http_method: :post,
  path: "/2015-03-31/functions",
  body: %{
    "FunctionName" => "sample_lambda",
    "Handler" => "sample.handler",
    "Code" => %{
      "S3Bucket" => bucket_name,
      "S3Key" => "sample_lambda/sample_lambda.zip"
    },
    "Role" => "arn:aws:iam::#{account_id}:role/sample-lambda-role",
    "Runtime" => "python3.11"
  }
}
|> ExAws.request!(auth_config)
"sample_lambda"
|> ExAws.Lambda.invoke(%{}, %{})
|> ExAws.request!(auth_config)

キューの作成

sqs_res =
  "sample_queue.fifo"
  |> ExAws.SQS.create_queue(fifo_queue: true, receive_message_wait_time_seconds: 1)
  |> ExAws.request!(auth_config)
%ExAws.Operation.RestQuery{
  service: :lambda,
  http_method: :post,
  path: "/2015-03-31/event-source-mappings/",
  body: %{
    "FunctionName" => "sample_lambda",
    "EventSourceArn" => "arn:aws:sqs:ap-northeast-1:#{account_id}:sample_queue.fifo",
    "BatchSize" => 1,
    "ScalingConfig" => %{
      "MaximumConcurrency" => 2
    }
  }
}
|> ExAws.request!(auth_config)
queue_url = sqs_res.body.queue_url
call_back_sqs_res =
  "sample_callback_queue.fifo"
  |> ExAws.SQS.create_queue(fifo_queue: true, receive_message_wait_time_seconds: 1)
  |> ExAws.request!(auth_config)
callback_queue_url = call_back_sqs_res.body.queue_url

メッセージの送信

["1", "2", "3"]
|> Enum.each(fn message ->
  queue_url
  |> ExAws.SQS.send_message(
    """
    {"callback_queue_url": "#{callback_queue_url}", "message": "#{message}"}
    """,
    # とりあえず全て同じグループにする
    message_group_id: "sample",
    # とりあえず現在時刻を入れる
    message_deduplication_id: Integer.to_string(:os.system_time())
  )
  |> ExAws.request!(auth_config)
end)
show_queue_status = fn queue_url, auth_config ->
  queue_url
  |> ExAws.SQS.get_queue_attributes()
  |> ExAws.request!(auth_config)
  |> Map.get(:body)
  |> Map.get(:attributes)
end
show_queue_status.(queue_url, auth_config)
show_queue_status.(callback_queue_url, auth_config)
1..4
|> Enum.map(fn _ ->
  message =
    callback_queue_url
    |> ExAws.SQS.receive_message(max_number_of_messages: 1)
    |> ExAws.request!(auth_config)
    |> Map.get(:body)
    |> Map.get(:messages)
    |> Enum.at(0)

  # 受信したメッセージを削除する
  unless is_nil(message) do
    callback_queue_url
    |> ExAws.SQS.delete_message(message.receipt_handle)
    |> ExAws.request!(auth_config)

    message.body
  else
    nil
  end
end)

Broadway による受信

defmodule SampleBroadway do
  use Broadway

  def start_link(queue_url, config) do
    Broadway.start_link(__MODULE__,
      name: SamplePipeline,
      producer: [
        module: {
          BroadwaySQS.Producer,
          queue_url: queue_url, config: config
        },
        concurrency: 1
      ],
      processors: [
        default: [concurrency: 1]
      ]
    )
  end

  def handle_message(_processor_name, message, _context) do
    IO.inspect(message.data)
    message
  end
end
{:ok, pipeline} = SampleBroadway.start_link(callback_queue_url, auth_config)
["11", "12", "13", "14", "15", "16"]
|> Enum.each(fn message ->
  queue_url
  |> ExAws.SQS.send_message(
    """
    {"callback_queue_url": "#{callback_queue_url}", "message": "#{message}"}
    """,
    # とりあえず全て同じグループにする
    message_group_id: "sample",
    # とりあえず現在時刻を入れる
    message_deduplication_id: Integer.to_string(:os.system_time())
  )
  |> ExAws.request!(auth_config)
end)
show_queue_status.(queue_url, auth_config)
show_queue_status.(callback_queue_url, auth_config)
Broadway.stop(pipeline)

リソースの削除

"sample_lambda"
|> ExAws.Lambda.delete_function()
|> ExAws.request!(auth_config)
client
|> AWS.IAM.detach_role_policy(%{
  "RoleName" => "sample-lambda-role",
  "PolicyArn" => "arn:aws:iam::#{account_id}:policy/sample-lambda-role-policy"
})
client
|> AWS.IAM.delete_policy(%{
  "PolicyArn" => "arn:aws:iam::#{account_id}:policy/sample-lambda-role-policy"
})
client
|> AWS.IAM.delete_role(%{
  "RoleName" => "sample-lambda-role"
})
"sample_queue.fifo"
|> ExAws.SQS.delete_queue()
|> ExAws.request!(auth_config)
"sample_callback_queue.fifo"
|> ExAws.SQS.delete_queue()
|> ExAws.request!(auth_config)