Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

RabbitMQ Work Queues

rabbitmq_02_work_queues.livemd

RabbitMQ Work Queues

import IEx.Helpers

Mix.install([{:amqp, "~> 3.0"}])

About

  • Distribute time-consuming tasks among multiple workers

Resources

Work Queues (aka: Task Queues)

  • To avoid doing a resource-intensive task immediately and having to wait for it to complete
  • We encapsulate a task as a message and send it to the queue
  • A worker process running in the background will pop the tasks and eventually execute the job
  • When you run many workers the tasks will be shared between them

Round-robin dispatching

  • By default, RabbitMQ will send each message to the next consumer, in sequence
  • On average every consumer will get the same number of messages

Message acknowledgment (ack)

  • Acknowledgement is sent back by the consumer to tell RabbitMQ that a particular message had been received, processed and that RabbitMQ is free to delete it
  • Acknowledgement must be sent on the same channel that received the delivery
  • All unacknowledged messages are redelivered

Checking unacknowledged messages

rabbitmqctl list_queues name messages_ready messages_unacknowledged

Message durability

  • AMQP.Queue.declare(channel, queue, durable: true)
  • When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to
  • Two things are required to make sure that messages aren’t lost: we need to mark both the queue and messages as durable

Message persistence

  • AMQP.Basic.publish(channel, exchange, queue, message, persistent: true)
  • Marking messages as persistent doesn’t fully guarantee that a message won’t be lost
  • Although it tells RabbitMQ to save the message to disk, there is still a short time window when RabbitMQ has accepted a message and hasn’t saved it yet
  • If you need a stronger guarantee then you can use publisher confirms

Changing parameters

  • RabbitMQ doesn’t allow you to redefine an existing queue with different parameters
  • There is a quick workaround of declaring a queue with different name

Fair dispatch

  • AMQP.Basic.qos(channel, prefetch_count: 1)
  • By default, RabbitMQ just dispatches a message when the message enters the queue, witout looking at the number of unacknowledged messages for a consumer

Preparation

Connection

  • By default, AMQP.Connection.open connects to localhost
defmodule Mnishiguchi.Connection do
  def open do
    {:ok, conn} = AMQP.Connection.open()
    {:ok, channel} = AMQP.Channel.open(conn)

    close = fn ->
      AMQP.Channel.close(channel)
      AMQP.Connection.close(conn)
    end

    {:ok, {channel, close}}
  end
end

Worker

defmodule Mnishiguchi.Worker do
  def wait_for_messages(channel, after_callback \\ fn -> nil end) do
    receive do
      {:basic_deliver, payload, meta} ->
        IO.puts(" [x] #{inspect(self())} received #{payload}")

        # Pretend to be busy
        dot_count = payload |> to_charlist |> Enum.count(&(&1 == ?.))
        :timer.sleep(:timer.seconds(dot_count))

        IO.puts(" [x] #{inspect(self())} done #{payload}")
        AMQP.Basic.ack(channel, meta.delivery_tag)

        wait_for_messages(channel)
    after
      10_000 ->
        IO.puts("#{inspect(self())} no message in 10 seconds")
        after_callback.()
    end
  end
end

Main

defmodule Mnishiguchi.Main do
  @exchange ""
  @queue_name "task_queue"
  @routing_key "task_queue"
  @worker_count 3

  def publish(channel, message) do
    AMQP.Basic.publish(channel, @exchange, @routing_key, message, persistent: true)

    IO.puts(" [x] #{inspect(self())} sent '#{message}'")
  end

  defp consume do
    {:ok, {channel, close}} = Mnishiguchi.Connection.open()
    {:ok, _queue} = AMQP.Queue.declare(channel, @queue_name, durable: true)

    # Dont dispatch a new message to a worker until the previous one is acknowledged
    AMQP.Basic.qos(channel, prefetch_count: 1)

    AMQP.Basic.consume(channel, @queue_name)
    IO.puts(" [*] #{inspect(self())} waiting for messages")

    Mnishiguchi.Worker.wait_for_messages(channel, close)
  end

  def main do
    {:ok, {channel, close}} = Mnishiguchi.Connection.open()
    {:ok, _queue} = AMQP.Queue.declare(channel, @queue_name, durable: true)

    publish(channel, "First message.")
    publish(channel, "Second message..")
    publish(channel, "Third message...")
    publish(channel, "Fourth message....")
    publish(channel, "Fifth message.....")

    close.()

    # Consume in different processes
    for _ <- 1..@worker_count do
      Task.start_link(fn -> consume() end)
    end
  end
end

Mnishiguchi.Main.main()