RabbitMQ Publish/Subscribe
import IEx.Helpers
Mix.install([{:amqp, "~> 3.0"}])
About
- Sending messages to many consumers at once
Resources
- RabbitMQ Tutorials
- amqp - Idiomatic Elixir client for RabbitMQ
- https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/elixir
Publish/Subscribe
- deliver a message to multiple consumers
Exchanges
- The producer never sends any messages directly to a queue
- The producer can only send messages to an exchange
- Often the producer doesn’t even know if a message will be delivered to any queue at all
|Exchange type ||
|—|—|
| direct
| simple scenario |
| topic
| complex scenarios |
| fanout
| broadcasting |
| headers
| special filtering |
Default exchange
- nameless exchange
-
identifed by the empty string (
""
). - messages are routed to the queue with the name specified by routing_key, if it exists
Temporary queues
- We get a fresh empty queue whenever we connect to Rabbit
- When we provide a queue name, the Rabbit server will choose a random queue name for us
-
With the
exclusive: true
option, the queue gets deleted once the consumer connection is closed
Bindings
- The relationship between exchange and a queue
- Enables a queue to receive messages from a given exchange
-
Can take an extra
routing_key
parameter (binding key), the meaning of which depends on the exchange type
Info
- host: localhost
- port: 5672
- username: guest
- password: guest
- management: http://localhost:15672
Worker
defmodule MessageReceiver do
def wait_for_messages(channel, after_callback \\ fn -> nil end) do
receive do
{:basic_deliver, payload, _meta} ->
IO.puts(" [x] #{inspect(self())} received #{payload}")
wait_for_messages(channel)
{:basic_consume_ok, %{consumer_tag: _}} ->
wait_for_messages(channel)
# It is nice to catch unexpected messages for debugging purposes
other ->
IO.puts("unexpected message: #{inspect(other)}")
wait_for_messages(channel)
after
10_000 ->
IO.puts("#{inspect(self())} no message in 10 seconds")
after_callback.()
end
end
end
Main
defmodule Main do
@exchange_name "fanout_logs_exchange"
defp open_connection do
{:ok, conn} = AMQP.Connection.open()
{:ok, channel} = AMQP.Channel.open(conn)
close = fn -> AMQP.Connection.close(conn) end
{:ok, {channel, close}}
end
defp declare_fanout_exchange(channel) do
AMQP.Exchange.declare(channel, @exchange_name, :fanout)
end
defp declare_temporary_queue(channel) do
AMQP.Queue.declare(channel, "", exclusive: true)
end
defp subscribe_log(channel, queue_name) do
AMQP.Queue.bind(channel, queue_name, @exchange_name)
end
defp emit_log(message) do
{:ok, {channel, close}} = open_connection()
declare_fanout_exchange(channel)
AMQP.Basic.publish(channel, @exchange_name, "", message)
IO.puts(" [x] #{inspect(self())} sent '#{message}' to '#{@exchange_name}'")
close.()
end
defp receive_log do
{:ok, {channel, close}} = open_connection()
declare_fanout_exchange(channel)
{:ok, %{queue: tmp_queue_name}} = declare_temporary_queue(channel)
subscribe_log(channel, tmp_queue_name)
AMQP.Basic.consume(channel, tmp_queue_name, nil, no_ack: true)
IO.puts(" [*] #{inspect(self())} waiting for messages")
MessageReceiver.wait_for_messages(channel, close)
end
def main do
for _ <- 1..3 do
Task.start_link(fn -> receive_log() end)
end
Process.sleep(5000)
emit_log("Hello everybody!")
end
end
Main.main()