Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Broadway SQS

livebooks/broadway/broadway_sqs.livemd

Broadway SQS

Mix.install([
  {:ex_aws, "~> 2.5"},
  {:ex_aws_sqs, "~> 3.4"},
  {:kino, "~> 0.12.0"},
  {:hackney, "~> 1.20"},
  {:jason, "~> 1.4"},
  {:sweet_xml, "~> 0.7.4"},
  {:broadway, "~> 1.0"},
  {:broadway_sqs, "~> 0.7.3"}
])

Amazon SQS でキューを作成する

access_key_id_input = Kino.Input.password("ACCESS_KEY_ID")
secret_access_key_input = Kino.Input.password("SECRET_ACCESS_KEY")
region_input = Kino.Input.text("REGION")

[
  access_key_id_input,
  secret_access_key_input,
  region_input
]
|> Kino.Layout.grid(columns: 3)
auth_config = [
  access_key_id: Kino.Input.read(access_key_id_input),
  secret_access_key: Kino.Input.read(secret_access_key_input),
  region: Kino.Input.read(region_input)
]

Kino.nothing()
sqs_res =
  "sample_queue"
  |> ExAws.SQS.create_queue()
  |> ExAws.request!(auth_config)
queue_url = sqs_res.body.queue_url

キューにメッセージを送信する

["1", "2", "3"]
|> Enum.each(fn message ->
  queue_url
  |> ExAws.SQS.send_message(message)
  |> ExAws.request!(auth_config)
end)
show_queue_status = fn queue_url, auth_config ->
  queue_url
  |> ExAws.SQS.get_queue_attributes()
  |> ExAws.request!(auth_config)
  |> Map.get(:body)
  |> Map.get(:attributes)
end
show_queue_status.(queue_url, auth_config)

キューからメッセージを受信する

1..4
|> Enum.map(fn _ ->
  queue_url
  |> ExAws.SQS.receive_message(max_number_of_messages: 1)
  |> ExAws.request!(auth_config)
  |> Map.get(:body)
  |> Map.get(:messages)
end)
show_queue_status.(queue_url, auth_config)
1..4
|> Enum.map(fn _ ->
  message =
    queue_url
    |> ExAws.SQS.receive_message(max_number_of_messages: 1)
    |> ExAws.request!(auth_config)
    |> Map.get(:body)
    |> Map.get(:messages)
    |> Enum.at(0)

  # 受信したメッセージを削除する
  unless is_nil(message) do
    queue_url
    |> ExAws.SQS.delete_message(message.receipt_handle)
    |> ExAws.request!(auth_config)
  end

  message
end)
show_queue_status.(queue_url, auth_config)

遅延メッセージを送信する

queue_url
|> ExAws.SQS.send_message("delayed", delay_seconds: 30)
|> ExAws.request!(auth_config)
show_queue_status.(queue_url, auth_config)

メッセージを全て削除する

"sample_queue"
|> ExAws.SQS.purge_queue()
|> ExAws.request!(auth_config)
show_queue_status.(queue_url, auth_config)

Broadway によるメッセージ受信

defmodule SampleBroadway do
  use Broadway

  def start_link(queue_url, config, concurrency) do
    Broadway.start_link(__MODULE__,
      name: SamplePipeline,
      producer: [
        module: {
          BroadwaySQS.Producer,
          # AWS の認証情報
          queue_url: queue_url, config: config
        },
        # 何個並列で受信するか
        concurrency: concurrency
      ],
      processors: [
        default: [
          # 何個毎のかたまりで受信するか
          max_demand: 1,
          # 何個並列で処理するか
          concurrency: concurrency
        ]
      ]
    )
  end

  # メッセージ受信時の処理
  def handle_message(_processor_name, message, _context) do
    # 1秒待つ
    Process.sleep(1000)

    IO.inspect(message.data)

    message
  end
end
{:ok, pipeline} = SampleBroadway.start_link(queue_url, auth_config, 1)
["11", "12", "13"]
|> Enum.each(fn message ->
  queue_url
  |> ExAws.SQS.send_message(message)
  |> ExAws.request!(auth_config)
end)
show_queue_status.(queue_url, auth_config)
Broadway.stop(pipeline)

FIFOキュー

sqs_res =
  "sample_queue.fifo"
  |> ExAws.SQS.create_queue(fifo_queue: true)
  |> ExAws.request!(auth_config)
queue_url = sqs_res.body.queue_url

FIFOキューへのメッセージ送信

["1", "2", "3"]
|> Enum.each(fn message ->
  queue_url
  |> ExAws.SQS.send_message(
    message,
    message_group_id: "sample",
    message_deduplication_id: Integer.to_string(:os.system_time())
  )
  |> ExAws.request!(auth_config)
end)
show_queue_status.(queue_url, auth_config)

FIFOキューからのメッセージ受信

1..4
|> Enum.map(fn _ ->
  queue_url
  |> ExAws.SQS.receive_message(max_number_of_messages: 1)
  |> ExAws.request!(auth_config)
  |> Map.get(:body)
  |> Map.get(:messages)
end)
1..4
|> Enum.map(fn _ ->
  message =
    queue_url
    |> ExAws.SQS.receive_message(max_number_of_messages: 1)
    |> ExAws.request!(auth_config)
    |> Map.get(:body)
    |> Map.get(:messages)
    |> Enum.at(0)

  # 受信したメッセージを削除する
  unless is_nil(message) do
    queue_url
    |> ExAws.SQS.delete_message(message.receipt_handle)
    |> ExAws.request!(auth_config)
  end

  message
end)

グループの動作確認

[{"A", "A1"}, {"B", "B1"}, {"A", "A2"}, {"B", "B2"}, {"A", "A3"}, {"B", "B3"}]
|> Enum.each(fn {group_id, message} ->
  queue_url
  |> ExAws.SQS.send_message(
    message,
    message_group_id: group_id,
    message_deduplication_id: Integer.to_string(:os.system_time())
  )
  |> ExAws.request!(auth_config)
end)
1..7
|> Enum.map(fn _ ->
  message =
    queue_url
    |> ExAws.SQS.receive_message(max_number_of_messages: 1)
    |> ExAws.request!(auth_config)
    |> Map.get(:body)
    |> Map.get(:messages)
    |> Enum.at(0)

  # 受信したメッセージを削除する
  unless is_nil(message) do
    unless message.body == "A2" do
      queue_url
      |> ExAws.SQS.delete_message(message.receipt_handle)
      |> ExAws.request!(auth_config)
    end

    message.body
  else
    nil
  end
end)
"sample_queue.fifo"
|> ExAws.SQS.purge_queue()
|> ExAws.request!(auth_config)

Broadway による FIFO キューの受信

{:ok, pipeline} = SampleBroadway.start_link(queue_url, auth_config, 1)
["11", "12", "13"]
|> Enum.each(fn message ->
  queue_url
  |> ExAws.SQS.send_message(
    message,
    message_group_id: "sample",
    message_deduplication_id: Integer.to_string(:os.system_time())
  )
  |> ExAws.request!(auth_config)
end)
show_queue_status.(queue_url, auth_config)
Broadway.stop(pipeline)

Broadway による並列受信

{:ok, pipeline} = SampleBroadway.start_link(queue_url, auth_config, 2)
["21", "22", "23", "24", "25", "26"]
|> Enum.each(fn message ->
  queue_url
  |> ExAws.SQS.send_message(
    message,
    message_group_id: "sample",
    message_deduplication_id: Integer.to_string(:os.system_time())
  )
  |> ExAws.request!(auth_config)
end)
Broadway.stop(pipeline)

キューの削除

"sample_queue"
|> ExAws.SQS.delete_queue()
|> ExAws.request!(auth_config)
"sample_queue.fifo"
|> ExAws.SQS.delete_queue()
|> ExAws.request!(auth_config)