RabbitMQ Hello World
import IEx.Helpers
Mix.install([{:amqp, "~> 3.0"}])
About
- Send and receive messages from a named queue
-
Write two small programs
- a producer (sender) that sends a single message
- a consumer (receiver) that receives messages and prints them out
Resources
- RabbitMQ Tutorials
- amqp - Idiomatic Elixir client for RabbitMQ
- https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/elixir
RabbitMQ
- a message broker
- accepts and forwards messages
- like a post box, a post office and a letter carrier
Terminology
word | meaning |
---|---|
producing | sending |
queue | the post box in RabbitMQ |
consuming | receiving |
connection | a TCP connection between an application and the RabbitMQ broker |
channel | a virtual connection inside a connection |
Note
- The producer, consumer and broker do not have to reside on the same host
- An application can be both a producer and consumer
Preparation
- host: localhost
- port: 5672
- username: guest
- password: guest
- management: http://localhost:15672
Establish a connection with RabbitMQ server
-
By default,
AMQP.Connection.open
connects tolocalhost
connect = fn ->
{:ok, conn} = AMQP.Connection.open()
{:ok, channel} = AMQP.Channel.open(conn)
close_conn = fn -> AMQP.Connection.close(conn) end
{channel, close_conn}
end
{channel, close_conn} = connect.()
Create a queue
queue_name = "hello"
AMQP.Queue.declare(channel, queue_name)
Create an exchange (skipped in this exercise)
- In RabbitMQ a message always needs to go through an exchange
- An exchange allows us to specify which queue the message should go to
Sending
-
Our message will just contain a string
Hello World!
-
We want to send it to our
hello
queue
Publish a message
publish_hello_message = fn n ->
exchange = ""
routing_key = "hello"
message = "Hello, World!"
for _ <- 1..n do
:ok = AMQP.Basic.publish(channel, exchange, routing_key, message)
end
end
publish_hello_message.(3)
Receiving
- We’ll keep the consumer running to listen for messages and print them out
Make sure the queue exists
-
Creating a queue using
AMQP.Queue.declare
is idempotent ‒ we can run the command as many times as we like, and only one will be created - When we’re not yet sure which program to run first, it’s a good practice to repeat declaring the queue in both programs.
AMQP.Queue.declare(channel, "hello")
Register a queue consumer process
-
We need to tell RabbitMQ that this particular process should receive messages from our
hello
queue. -
Whenever the client library receives a delivery from RabbitMQ, a
{:basic_deliver, payload, metadata}
Elixir message is sent to the specified Elixir process.
queue_name = "hello"
# consumer process, defaults to self()
consumer_pid = nil
AMQP.Basic.consume(channel, queue_name, consumer_pid)
wait_for_messages = fn wait_for_messages ->
receive do
{:basic_deliver, payload, meta} ->
IO.puts(" [x] Received #{payload}")
AMQP.Basic.ack(channel, meta.delivery_tag)
wait_for_messages.(wait_for_messages)
after
5000 ->
IO.puts("No message in 5 seconds")
close_conn.()
end
end
wait_for_messages.(wait_for_messages)