RabbitMQ Topics
import IEx.Helpers
Mix.install([{:amqp, "~> 3.0"}])
About
- learn about a topic exchange that enables us to do routing based on multiple criteria
Resources
- RabbitMQ Tutorials
- amqp - Idiomatic Elixir client for RabbitMQ
- https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/elixir
Topic exchange
- The logic behind the topic exchange is similar to a topic one, but it can support multiple criteria
routing key
- must be a list of words, delimited by dots
- up to the limit of 255 bytes
-
e.g.,
quick.orange.rabbit
special characters for binding
char | |
---|---|
* |
substitute for exactly one word |
# |
substitute for zero or more words |
-
When a queue is bound with
#
, the exchange will behave likefanout
-
When no special character is used in binding, the exchange will behave like
topic
Info
- host: localhost
- port: 5672
- username: guest
- password: guest
- management: http://localhost:15672
Worker
defmodule MessageReceiver do
def wait_for_messages(channel, after_callback \\ fn -> nil end) do
receive do
{:basic_deliver, payload, meta} ->
IO.puts(" [x] #{inspect(self())} received [#{meta.routing_key}] #{payload}")
wait_for_messages(channel)
{:basic_consume_ok, %{consumer_tag: _}} ->
wait_for_messages(channel)
# It is nice to catch unexpected messages for debugging purposes
other ->
IO.puts("unexpected message: #{inspect(other)}")
wait_for_messages(channel)
after
10_000 ->
IO.puts("#{inspect(self())} no message in 10 seconds")
after_callback.()
end
end
end
Main
defmodule Main do
@exchange_name "topic_logs_exchange"
defp open_connection do
{:ok, conn} = AMQP.Connection.open()
{:ok, channel} = AMQP.Channel.open(conn)
close = fn -> AMQP.Connection.close(conn) end
{:ok, {channel, close}}
end
defp declare_topic_exchange(channel) do
AMQP.Exchange.declare(channel, @exchange_name, :topic)
end
defp declare_temporary_queue(channel) do
AMQP.Queue.declare(channel, "", exclusive: true)
end
defp subscribe_log(channel, queue_name, topic) do
AMQP.Queue.bind(channel, queue_name, @exchange_name, routing_key: topic)
end
defp publish_log(channel, topic, message) do
routing_key = topic
AMQP.Basic.publish(channel, @exchange_name, routing_key, message)
IO.puts(" [x] #{inspect(self())} sent '[#{topic}] #{message}' to '#{@exchange_name}'")
end
defp receive_log(topic) do
{:ok, {channel, close}} = open_connection()
declare_topic_exchange(channel)
{:ok, %{queue: tmp_queue_name}} = declare_temporary_queue(channel)
binding_key = topic
subscribe_log(channel, tmp_queue_name, binding_key)
AMQP.Basic.consume(channel, tmp_queue_name, nil, no_ack: true)
IO.puts(" [*] #{inspect(self())} waiting for #{topic} messages")
MessageReceiver.wait_for_messages(channel, close)
end
def main do
# subscribing
Task.start_link(fn -> receive_log("user2.info") end)
Task.start_link(fn -> receive_log("*.error") end)
Process.sleep(5000)
{:ok, {channel, close}} = open_connection()
declare_topic_exchange(channel)
# publishing with routing key: .
publish_log(channel, "user1.info", "good news for user 1")
publish_log(channel, "user1.error", "bad news for user 1")
publish_log(channel, "user2.info", "good news for user 2")
publish_log(channel, "user2.error", "bad news for user 2")
close.()
end
end
Main.main()