RabbitMQ Routing
import IEx.Helpers
Mix.install([{:amqp, "~> 3.0"}])
About
- Receiving messages selectively
- subscribing only to a subset of the messages
Resources
- RabbitMQ Tutorials
- amqp - Idiomatic Elixir client for RabbitMQ
- https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/elixir
Bindings
- The relationship between exchange and a queue
- Enables a queue to receive messages from a given exchange
binding key
-
A binding can take an extra
routing_key
parameter (binding key) - The meaning of a binding key depends on the exchange type
Direct exchange
-
more flexible than a
fanout
exchange that is only capable of mindless broadcasting - a message goes to the queues whose binding key exactly matches the routing key of the message
Multiple bindings
- It is legal to bind multiple queues with the same binding key
Info
- host: localhost
- port: 5672
- username: guest
- password: guest
- management: http://localhost:15672
Worker
defmodule MessageReceiver do
def wait_for_messages(channel, after_callback \\ fn -> nil end) do
receive do
{:basic_deliver, payload, meta} ->
IO.puts(" [x] #{inspect(self())} received [#{meta.routing_key}] #{payload}")
wait_for_messages(channel)
{:basic_consume_ok, %{consumer_tag: _}} ->
wait_for_messages(channel)
# It is nice to catch unexpected messages for debugging purposes
other ->
IO.puts("unexpected message: #{inspect(other)}")
wait_for_messages(channel)
after
10_000 ->
IO.puts("#{inspect(self())} no message in 10 seconds")
after_callback.()
end
end
end
Main
defmodule Main do
@exchange_name "direct_logs_exchange"
defp open_connection do
{:ok, conn} = AMQP.Connection.open()
{:ok, channel} = AMQP.Channel.open(conn)
close = fn -> AMQP.Connection.close(conn) end
{:ok, {channel, close}}
end
defp declare_direct_exchange(channel) do
AMQP.Exchange.declare(channel, @exchange_name, :direct)
end
defp declare_temporary_queue(channel) do
AMQP.Queue.declare(channel, "", exclusive: true)
end
defp subscribe_log(channel, queue_name, severity) do
AMQP.Queue.bind(channel, queue_name, @exchange_name, routing_key: severity)
end
defp publish_log(channel, severity, message) do
routing_key = severity
AMQP.Basic.publish(channel, @exchange_name, routing_key, message)
IO.puts(" [x] #{inspect(self())} sent '[#{severity}] #{message}' to '#{@exchange_name}'")
end
defp receive_log(severity) do
{:ok, {channel, close}} = open_connection()
declare_direct_exchange(channel)
{:ok, %{queue: tmp_queue_name}} = declare_temporary_queue(channel)
binding_key = severity
subscribe_log(channel, tmp_queue_name, binding_key)
AMQP.Basic.consume(channel, tmp_queue_name, nil, no_ack: true)
IO.puts(" [*] #{inspect(self())} waiting for #{severity} messages")
MessageReceiver.wait_for_messages(channel, close)
end
def main do
for severity <- ["info", "error"] do
Task.start_link(fn ->
receive_log(severity)
end)
end
Process.sleep(5000)
{:ok, {channel, close}} = open_connection()
declare_direct_exchange(channel)
publish_log(channel, "foo", "random news 1")
publish_log(channel, "info", "good news 1")
publish_log(channel, "error", "bad news 1")
publish_log(channel, "info", "good news 2")
publish_log(channel, "foo", "random news 2")
close.()
end
end
Main.main()