RabbitMQ Work Queues
import IEx.Helpers
Mix.install([{:amqp, "~> 3.0"}])
About
- Distribute time-consuming tasks among multiple workers
Resources
- RabbitMQ Tutorials
- amqp - Idiomatic Elixir client for RabbitMQ
- https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/elixir
Work Queues (aka: Task Queues)
- To avoid doing a resource-intensive task immediately and having to wait for it to complete
- We encapsulate a task as a message and send it to the queue
- A worker process running in the background will pop the tasks and eventually execute the job
- When you run many workers the tasks will be shared between them
Round-robin dispatching
- By default, RabbitMQ will send each message to the next consumer, in sequence
- On average every consumer will get the same number of messages
Message acknowledgment (ack)
- Acknowledgement is sent back by the consumer to tell RabbitMQ that a particular message had been received, processed and that RabbitMQ is free to delete it
- Acknowledgement must be sent on the same channel that received the delivery
- All unacknowledged messages are redelivered
Checking unacknowledged messages
rabbitmqctl list_queues name messages_ready messages_unacknowledged
Message durability
-
AMQP.Queue.declare(channel, queue, durable: true)
- When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to
- Two things are required to make sure that messages aren’t lost: we need to mark both the queue and messages as durable
Message persistence
-
AMQP.Basic.publish(channel, exchange, queue, message, persistent: true)
- Marking messages as persistent doesn’t fully guarantee that a message won’t be lost
- Although it tells RabbitMQ to save the message to disk, there is still a short time window when RabbitMQ has accepted a message and hasn’t saved it yet
- If you need a stronger guarantee then you can use publisher confirms
Changing parameters
- RabbitMQ doesn’t allow you to redefine an existing queue with different parameters
- There is a quick workaround of declaring a queue with different name
Fair dispatch
-
AMQP.Basic.qos(channel, prefetch_count: 1)
- By default, RabbitMQ just dispatches a message when the message enters the queue, witout looking at the number of unacknowledged messages for a consumer
Preparation
- host: localhost
- port: 5672
- username: guest
- password: guest
- management: http://localhost:15672
Connection
-
By default,
AMQP.Connection.open
connects tolocalhost
defmodule Mnishiguchi.Connection do
def open do
{:ok, conn} = AMQP.Connection.open()
{:ok, channel} = AMQP.Channel.open(conn)
close = fn ->
AMQP.Channel.close(channel)
AMQP.Connection.close(conn)
end
{:ok, {channel, close}}
end
end
Worker
defmodule Mnishiguchi.Worker do
def wait_for_messages(channel, after_callback \\ fn -> nil end) do
receive do
{:basic_deliver, payload, meta} ->
IO.puts(" [x] #{inspect(self())} received #{payload}")
# Pretend to be busy
dot_count = payload |> to_charlist |> Enum.count(&(&1 == ?.))
:timer.sleep(:timer.seconds(dot_count))
IO.puts(" [x] #{inspect(self())} done #{payload}")
AMQP.Basic.ack(channel, meta.delivery_tag)
wait_for_messages(channel)
after
10_000 ->
IO.puts("#{inspect(self())} no message in 10 seconds")
after_callback.()
end
end
end
Main
defmodule Mnishiguchi.Main do
@exchange ""
@queue_name "task_queue"
@routing_key "task_queue"
@worker_count 3
def publish(channel, message) do
AMQP.Basic.publish(channel, @exchange, @routing_key, message, persistent: true)
IO.puts(" [x] #{inspect(self())} sent '#{message}'")
end
defp consume do
{:ok, {channel, close}} = Mnishiguchi.Connection.open()
{:ok, _queue} = AMQP.Queue.declare(channel, @queue_name, durable: true)
# Dont dispatch a new message to a worker until the previous one is acknowledged
AMQP.Basic.qos(channel, prefetch_count: 1)
AMQP.Basic.consume(channel, @queue_name)
IO.puts(" [*] #{inspect(self())} waiting for messages")
Mnishiguchi.Worker.wait_for_messages(channel, close)
end
def main do
{:ok, {channel, close}} = Mnishiguchi.Connection.open()
{:ok, _queue} = AMQP.Queue.declare(channel, @queue_name, durable: true)
publish(channel, "First message.")
publish(channel, "Second message..")
publish(channel, "Third message...")
publish(channel, "Fourth message....")
publish(channel, "Fifth message.....")
close.()
# Consume in different processes
for _ <- 1..@worker_count do
Task.start_link(fn -> consume() end)
end
end
end
Mnishiguchi.Main.main()