Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

RabbitMQ Topics

notebooks/amqp/rabbitmq_05_topics.livemd

RabbitMQ Topics

import IEx.Helpers

Mix.install([{:amqp, "~> 3.0"}])

About

  • learn about a topic exchange that enables us to do routing based on multiple criteria

Resources

Topic exchange

  • The logic behind the topic exchange is similar to a topic one, but it can support multiple criteria

routing key

  • must be a list of words, delimited by dots
  • up to the limit of 255 bytes
  • e.g., quick.orange.rabbit

special characters for binding

char
* substitute for exactly one word
# substitute for zero or more words
  • When a queue is bound with #, the exchange will behave like fanout
  • When no special character is used in binding, the exchange will behave like topic

Info

Worker

defmodule MessageReceiver do
  def wait_for_messages(channel, after_callback \\ fn -> nil end) do
    receive do
      {:basic_deliver, payload, meta} ->
        IO.puts(" [x] #{inspect(self())} received [#{meta.routing_key}] #{payload}")
        wait_for_messages(channel)

      {:basic_consume_ok, %{consumer_tag: _}} ->
        wait_for_messages(channel)

      # It is nice to catch unexpected messages for debugging purposes
      other ->
        IO.puts("unexpected message: #{inspect(other)}")
        wait_for_messages(channel)
    after
      10_000 ->
        IO.puts("#{inspect(self())} no message in 10 seconds")
        after_callback.()
    end
  end
end

Main

defmodule Main do
  @exchange_name "topic_logs_exchange"

  defp open_connection do
    {:ok, conn} = AMQP.Connection.open()
    {:ok, channel} = AMQP.Channel.open(conn)
    close = fn -> AMQP.Connection.close(conn) end

    {:ok, {channel, close}}
  end

  defp declare_topic_exchange(channel) do
    AMQP.Exchange.declare(channel, @exchange_name, :topic)
  end

  defp declare_temporary_queue(channel) do
    AMQP.Queue.declare(channel, "", exclusive: true)
  end

  defp subscribe_log(channel, queue_name, topic) do
    AMQP.Queue.bind(channel, queue_name, @exchange_name, routing_key: topic)
  end

  defp publish_log(channel, topic, message) do
    routing_key = topic
    AMQP.Basic.publish(channel, @exchange_name, routing_key, message)
    IO.puts(" [x] #{inspect(self())} sent '[#{topic}] #{message}' to '#{@exchange_name}'")
  end

  defp receive_log(topic) do
    {:ok, {channel, close}} = open_connection()

    declare_topic_exchange(channel)

    {:ok, %{queue: tmp_queue_name}} = declare_temporary_queue(channel)

    binding_key = topic
    subscribe_log(channel, tmp_queue_name, binding_key)

    AMQP.Basic.consume(channel, tmp_queue_name, nil, no_ack: true)
    IO.puts(" [*] #{inspect(self())} waiting for #{topic} messages")

    MessageReceiver.wait_for_messages(channel, close)
  end

  def main do
    # subscribing
    Task.start_link(fn -> receive_log("user2.info") end)
    Task.start_link(fn -> receive_log("*.error") end)

    Process.sleep(5000)

    {:ok, {channel, close}} = open_connection()

    declare_topic_exchange(channel)

    # publishing with routing key: .
    publish_log(channel, "user1.info", "good news for user 1")
    publish_log(channel, "user1.error", "bad news for user 1")
    publish_log(channel, "user2.info", "good news for user 2")
    publish_log(channel, "user2.error", "bad news for user 2")

    close.()
  end
end

Main.main()