Broadway SQS
Mix.install([
{:ex_aws, "~> 2.5"},
{:ex_aws_sqs, "~> 3.4"},
{:kino, "~> 0.12.0"},
{:hackney, "~> 1.20"},
{:jason, "~> 1.4"},
{:sweet_xml, "~> 0.7.4"},
{:broadway, "~> 1.0"},
{:broadway_sqs, "~> 0.7.3"}
])
Amazon SQS でキューを作成する
access_key_id_input = Kino.Input.password("ACCESS_KEY_ID")
secret_access_key_input = Kino.Input.password("SECRET_ACCESS_KEY")
region_input = Kino.Input.text("REGION")
[
access_key_id_input,
secret_access_key_input,
region_input
]
|> Kino.Layout.grid(columns: 3)
auth_config = [
access_key_id: Kino.Input.read(access_key_id_input),
secret_access_key: Kino.Input.read(secret_access_key_input),
region: Kino.Input.read(region_input)
]
Kino.nothing()
sqs_res =
"sample_queue"
|> ExAws.SQS.create_queue()
|> ExAws.request!(auth_config)
queue_url = sqs_res.body.queue_url
キューにメッセージを送信する
["1", "2", "3"]
|> Enum.each(fn message ->
queue_url
|> ExAws.SQS.send_message(message)
|> ExAws.request!(auth_config)
end)
show_queue_status = fn queue_url, auth_config ->
queue_url
|> ExAws.SQS.get_queue_attributes()
|> ExAws.request!(auth_config)
|> Map.get(:body)
|> Map.get(:attributes)
end
show_queue_status.(queue_url, auth_config)
キューからメッセージを受信する
1..4
|> Enum.map(fn _ ->
queue_url
|> ExAws.SQS.receive_message(max_number_of_messages: 1)
|> ExAws.request!(auth_config)
|> Map.get(:body)
|> Map.get(:messages)
end)
show_queue_status.(queue_url, auth_config)
1..4
|> Enum.map(fn _ ->
message =
queue_url
|> ExAws.SQS.receive_message(max_number_of_messages: 1)
|> ExAws.request!(auth_config)
|> Map.get(:body)
|> Map.get(:messages)
|> Enum.at(0)
# 受信したメッセージを削除する
unless is_nil(message) do
queue_url
|> ExAws.SQS.delete_message(message.receipt_handle)
|> ExAws.request!(auth_config)
end
message
end)
show_queue_status.(queue_url, auth_config)
遅延メッセージを送信する
queue_url
|> ExAws.SQS.send_message("delayed", delay_seconds: 30)
|> ExAws.request!(auth_config)
show_queue_status.(queue_url, auth_config)
メッセージを全て削除する
"sample_queue"
|> ExAws.SQS.purge_queue()
|> ExAws.request!(auth_config)
show_queue_status.(queue_url, auth_config)
Broadway によるメッセージ受信
defmodule SampleBroadway do
use Broadway
def start_link(queue_url, config, concurrency) do
Broadway.start_link(__MODULE__,
name: SamplePipeline,
producer: [
module: {
BroadwaySQS.Producer,
# AWS の認証情報
queue_url: queue_url, config: config
},
# 何個並列で受信するか
concurrency: concurrency
],
processors: [
default: [
# 何個毎のかたまりで受信するか
max_demand: 1,
# 何個並列で処理するか
concurrency: concurrency
]
]
)
end
# メッセージ受信時の処理
def handle_message(_processor_name, message, _context) do
# 1秒待つ
Process.sleep(1000)
IO.inspect(message.data)
message
end
end
{:ok, pipeline} = SampleBroadway.start_link(queue_url, auth_config, 1)
["11", "12", "13"]
|> Enum.each(fn message ->
queue_url
|> ExAws.SQS.send_message(message)
|> ExAws.request!(auth_config)
end)
show_queue_status.(queue_url, auth_config)
Broadway.stop(pipeline)
FIFOキュー
sqs_res =
"sample_queue.fifo"
|> ExAws.SQS.create_queue(fifo_queue: true)
|> ExAws.request!(auth_config)
queue_url = sqs_res.body.queue_url
FIFOキューへのメッセージ送信
["1", "2", "3"]
|> Enum.each(fn message ->
queue_url
|> ExAws.SQS.send_message(
message,
message_group_id: "sample",
message_deduplication_id: Integer.to_string(:os.system_time())
)
|> ExAws.request!(auth_config)
end)
show_queue_status.(queue_url, auth_config)
FIFOキューからのメッセージ受信
1..4
|> Enum.map(fn _ ->
queue_url
|> ExAws.SQS.receive_message(max_number_of_messages: 1)
|> ExAws.request!(auth_config)
|> Map.get(:body)
|> Map.get(:messages)
end)
1..4
|> Enum.map(fn _ ->
message =
queue_url
|> ExAws.SQS.receive_message(max_number_of_messages: 1)
|> ExAws.request!(auth_config)
|> Map.get(:body)
|> Map.get(:messages)
|> Enum.at(0)
# 受信したメッセージを削除する
unless is_nil(message) do
queue_url
|> ExAws.SQS.delete_message(message.receipt_handle)
|> ExAws.request!(auth_config)
end
message
end)
グループの動作確認
[{"A", "A1"}, {"B", "B1"}, {"A", "A2"}, {"B", "B2"}, {"A", "A3"}, {"B", "B3"}]
|> Enum.each(fn {group_id, message} ->
queue_url
|> ExAws.SQS.send_message(
message,
message_group_id: group_id,
message_deduplication_id: Integer.to_string(:os.system_time())
)
|> ExAws.request!(auth_config)
end)
1..7
|> Enum.map(fn _ ->
message =
queue_url
|> ExAws.SQS.receive_message(max_number_of_messages: 1)
|> ExAws.request!(auth_config)
|> Map.get(:body)
|> Map.get(:messages)
|> Enum.at(0)
# 受信したメッセージを削除する
unless is_nil(message) do
unless message.body == "A2" do
queue_url
|> ExAws.SQS.delete_message(message.receipt_handle)
|> ExAws.request!(auth_config)
end
message.body
else
nil
end
end)
"sample_queue.fifo"
|> ExAws.SQS.purge_queue()
|> ExAws.request!(auth_config)
Broadway による FIFO キューの受信
{:ok, pipeline} = SampleBroadway.start_link(queue_url, auth_config, 1)
["11", "12", "13"]
|> Enum.each(fn message ->
queue_url
|> ExAws.SQS.send_message(
message,
message_group_id: "sample",
message_deduplication_id: Integer.to_string(:os.system_time())
)
|> ExAws.request!(auth_config)
end)
show_queue_status.(queue_url, auth_config)
Broadway.stop(pipeline)
Broadway による並列受信
{:ok, pipeline} = SampleBroadway.start_link(queue_url, auth_config, 2)
["21", "22", "23", "24", "25", "26"]
|> Enum.each(fn message ->
queue_url
|> ExAws.SQS.send_message(
message,
message_group_id: "sample",
message_deduplication_id: Integer.to_string(:os.system_time())
)
|> ExAws.request!(auth_config)
end)
Broadway.stop(pipeline)
キューの削除
"sample_queue"
|> ExAws.SQS.delete_queue()
|> ExAws.request!(auth_config)
"sample_queue.fifo"
|> ExAws.SQS.delete_queue()
|> ExAws.request!(auth_config)