Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

RabbitMQ Routing

rabbitmq_04_routing.livemd

RabbitMQ Routing

import IEx.Helpers

Mix.install([{:amqp, "~> 3.0"}])

About

  • Receiving messages selectively
  • subscribing only to a subset of the messages

Resources

Bindings

  • The relationship between exchange and a queue
  • Enables a queue to receive messages from a given exchange

binding key

  • A binding can take an extra routing_key parameter (binding key)
  • The meaning of a binding key depends on the exchange type

Direct exchange

  • more flexible than a fanout exchange that is only capable of mindless broadcasting
  • a message goes to the queues whose binding key exactly matches the routing key of the message

Multiple bindings

  • It is legal to bind multiple queues with the same binding key

Info

Worker

defmodule MessageReceiver do
  def wait_for_messages(channel, after_callback \\ fn -> nil end) do
    receive do
      {:basic_deliver, payload, meta} ->
        IO.puts(" [x] #{inspect(self())} received [#{meta.routing_key}] #{payload}")
        wait_for_messages(channel)

      {:basic_consume_ok, %{consumer_tag: _}} ->
        wait_for_messages(channel)

      # It is nice to catch unexpected messages for debugging purposes
      other ->
        IO.puts("unexpected message: #{inspect(other)}")
        wait_for_messages(channel)
    after
      10_000 ->
        IO.puts("#{inspect(self())} no message in 10 seconds")
        after_callback.()
    end
  end
end

Main

defmodule Main do
  @exchange_name "direct_logs_exchange"

  defp open_connection do
    {:ok, conn} = AMQP.Connection.open()
    {:ok, channel} = AMQP.Channel.open(conn)
    close = fn -> AMQP.Connection.close(conn) end

    {:ok, {channel, close}}
  end

  defp declare_direct_exchange(channel) do
    AMQP.Exchange.declare(channel, @exchange_name, :direct)
  end

  defp declare_temporary_queue(channel) do
    AMQP.Queue.declare(channel, "", exclusive: true)
  end

  defp subscribe_log(channel, queue_name, severity) do
    AMQP.Queue.bind(channel, queue_name, @exchange_name, routing_key: severity)
  end

  defp publish_log(channel, severity, message) do
    routing_key = severity
    AMQP.Basic.publish(channel, @exchange_name, routing_key, message)
    IO.puts(" [x] #{inspect(self())} sent '[#{severity}] #{message}' to '#{@exchange_name}'")
  end

  defp receive_log(severity) do
    {:ok, {channel, close}} = open_connection()

    declare_direct_exchange(channel)

    {:ok, %{queue: tmp_queue_name}} = declare_temporary_queue(channel)

    binding_key = severity
    subscribe_log(channel, tmp_queue_name, binding_key)

    AMQP.Basic.consume(channel, tmp_queue_name, nil, no_ack: true)
    IO.puts(" [*] #{inspect(self())} waiting for #{severity} messages")

    MessageReceiver.wait_for_messages(channel, close)
  end

  def main do
    for severity <- ["info", "error"] do
      Task.start_link(fn ->
        receive_log(severity)
      end)
    end

    Process.sleep(5000)

    {:ok, {channel, close}} = open_connection()

    declare_direct_exchange(channel)

    publish_log(channel, "foo", "random news 1")
    publish_log(channel, "info", "good news 1")
    publish_log(channel, "error", "bad news 1")
    publish_log(channel, "info", "good news 2")
    publish_log(channel, "foo", "random news 2")

    close.()
  end
end

Main.main()