Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

RabbitMQ Publish/Subscribe

rabbitmq_03_pub_sub.livemd

RabbitMQ Publish/Subscribe

import IEx.Helpers

Mix.install([{:amqp, "~> 3.0"}])

About

  • Sending messages to many consumers at once

Resources

Publish/Subscribe

  • deliver a message to multiple consumers

Exchanges

  • The producer never sends any messages directly to a queue
  • The producer can only send messages to an exchange
  • Often the producer doesn’t even know if a message will be delivered to any queue at all

|Exchange type || |—|—| | direct | simple scenario | | topic | complex scenarios | | fanout | broadcasting | | headers | special filtering |

Default exchange

  • nameless exchange
  • identifed by the empty string ("").
  • messages are routed to the queue with the name specified by routing_key, if it exists

Temporary queues

  • We get a fresh empty queue whenever we connect to Rabbit
  • When we provide a queue name, the Rabbit server will choose a random queue name for us
  • With the exclusive: true option, the queue gets deleted once the consumer connection is closed

Bindings

  • The relationship between exchange and a queue
  • Enables a queue to receive messages from a given exchange
  • Can take an extra routing_key parameter (binding key), the meaning of which depends on the exchange type

Info

Worker

defmodule MessageReceiver do
  def wait_for_messages(channel, after_callback \\ fn -> nil end) do
    receive do
      {:basic_deliver, payload, _meta} ->
        IO.puts(" [x] #{inspect(self())} received #{payload}")
        wait_for_messages(channel)

      {:basic_consume_ok, %{consumer_tag: _}} ->
        wait_for_messages(channel)

      # It is nice to catch unexpected messages for debugging purposes
      other ->
        IO.puts("unexpected message: #{inspect(other)}")
        wait_for_messages(channel)
    after
      10_000 ->
        IO.puts("#{inspect(self())} no message in 10 seconds")
        after_callback.()
    end
  end
end

Main

defmodule Main do
  @exchange_name "fanout_logs_exchange"

  defp open_connection do
    {:ok, conn} = AMQP.Connection.open()
    {:ok, channel} = AMQP.Channel.open(conn)
    close = fn -> AMQP.Connection.close(conn) end

    {:ok, {channel, close}}
  end

  defp declare_fanout_exchange(channel) do
    AMQP.Exchange.declare(channel, @exchange_name, :fanout)
  end

  defp declare_temporary_queue(channel) do
    AMQP.Queue.declare(channel, "", exclusive: true)
  end

  defp subscribe_log(channel, queue_name) do
    AMQP.Queue.bind(channel, queue_name, @exchange_name)
  end

  defp emit_log(message) do
    {:ok, {channel, close}} = open_connection()

    declare_fanout_exchange(channel)

    AMQP.Basic.publish(channel, @exchange_name, "", message)
    IO.puts(" [x] #{inspect(self())} sent '#{message}' to '#{@exchange_name}'")

    close.()
  end

  defp receive_log do
    {:ok, {channel, close}} = open_connection()

    declare_fanout_exchange(channel)
    {:ok, %{queue: tmp_queue_name}} = declare_temporary_queue(channel)

    subscribe_log(channel, tmp_queue_name)

    AMQP.Basic.consume(channel, tmp_queue_name, nil, no_ack: true)
    IO.puts(" [*] #{inspect(self())} waiting for messages")

    MessageReceiver.wait_for_messages(channel, close)
  end

  def main do
    for _ <- 1..3 do
      Task.start_link(fn -> receive_log() end)
    end

    Process.sleep(5000)

    emit_log("Hello everybody!")
  end
end

Main.main()