Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

RabbitMQ Hello World

rabbitmq_01_hello_world.livemd

RabbitMQ Hello World

import IEx.Helpers

Mix.install([{:amqp, "~> 3.0"}])

About

  • Send and receive messages from a named queue
  • Write two small programs
    1. a producer (sender) that sends a single message
    2. a consumer (receiver) that receives messages and prints them out

Resources

RabbitMQ

  • a message broker
  • accepts and forwards messages
  • like a post box, a post office and a letter carrier

Terminology

word meaning
producing sending
queue the post box in RabbitMQ
consuming receiving
connection a TCP connection between an application and the RabbitMQ broker
channel a virtual connection inside a connection

Note

  • The producer, consumer and broker do not have to reside on the same host
  • An application can be both a producer and consumer

Preparation

Establish a connection with RabbitMQ server

  • By default, AMQP.Connection.open connects to localhost
connect = fn ->
  {:ok, conn} = AMQP.Connection.open()
  {:ok, channel} = AMQP.Channel.open(conn)
  close_conn = fn -> AMQP.Connection.close(conn) end

  {channel, close_conn}
end

{channel, close_conn} = connect.()

Create a queue

queue_name = "hello"

AMQP.Queue.declare(channel, queue_name)

Create an exchange (skipped in this exercise)

  • In RabbitMQ a message always needs to go through an exchange
  • An exchange allows us to specify which queue the message should go to

Sending

  • Our message will just contain a string Hello World!
  • We want to send it to our hello queue

Publish a message

publish_hello_message = fn n ->
  exchange = ""
  routing_key = "hello"
  message = "Hello, World!"

  for _ <- 1..n do
    :ok = AMQP.Basic.publish(channel, exchange, routing_key, message)
  end
end

publish_hello_message.(3)

Receiving

  • We’ll keep the consumer running to listen for messages and print them out

Make sure the queue exists

  • Creating a queue using AMQP.Queue.declare is idempotent ‒ we can run the command as many times as we like, and only one will be created
  • When we’re not yet sure which program to run first, it’s a good practice to repeat declaring the queue in both programs.
AMQP.Queue.declare(channel, "hello")

Register a queue consumer process

  • We need to tell RabbitMQ that this particular process should receive messages from our hello queue.
  • Whenever the client library receives a delivery from RabbitMQ, a {:basic_deliver, payload, metadata} Elixir message is sent to the specified Elixir process.
queue_name = "hello"
# consumer process, defaults to self()
consumer_pid = nil

AMQP.Basic.consume(channel, queue_name, consumer_pid)
wait_for_messages = fn wait_for_messages ->
  receive do
    {:basic_deliver, payload, meta} ->
      IO.puts(" [x] Received #{payload}")
      AMQP.Basic.ack(channel, meta.delivery_tag)
      wait_for_messages.(wait_for_messages)
  after
    5000 ->
      IO.puts("No message in 5 seconds")
      close_conn.()
  end
end

wait_for_messages.(wait_for_messages)