RabbitMQ Remote procedure call (RPC)
import IEx.Helpers
Mix.install([{:amqp, "~> 3.0"}])
About
- Distribute time-consuming tasks among multiple workers
- Build an RPC system: a client and a scalable RPC server
Resources
- RabbitMQ Tutorials
- amqp - Idiomatic Elixir client for RabbitMQ
- https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/elixir
Remote procedure call (RPC)
- Run a function on a remote computer and wait for the result
- Should ideally be idempotent
- In general, doing RPC over RabbitMQ is easy
- Common pattern in computing, but often criticized for confusion, complexity etc
Message properties
- The AMQP 0-9-1 protocol predefines a set of 14 properties that go with a message
message properties | |
---|---|
persistent | Marks a message as persistent |
content_type | The mime-type of the encoding |
reply_to | Commonly used to name a callback queue |
correlation_id | Useful to correlate RPC responses with requests |
Info
- host: localhost
- port: 5672
- username: guest
- password: guest
- management: http://localhost:15672
Time-consuming work
defmodule Fib do
def fib(0), do: 0
def fib(1), do: 1
def fib(n) when n > 1, do: fib(n - 1) + fib(n - 2)
end
RPC server
defmodule FibServer do
import Fib
def queue_name, do: "rpc_queue"
def call do
{:ok, {channel, _close}} = open_connection()
AMQP.Queue.declare(channel, queue_name())
AMQP.Basic.qos(channel, prefetch_count: 1)
AMQP.Basic.consume(channel, queue_name())
IO.puts(" [x] #{inspect(self())} awaiting RPC requests")
wait_for_messages(channel)
end
def open_connection do
{:ok, conn} = AMQP.Connection.open()
{:ok, channel} = AMQP.Channel.open(conn)
close = fn -> AMQP.Connection.close(conn) end
{:ok, {channel, close}}
end
def wait_for_messages(channel, after_callback \\ fn -> nil end) do
receive do
{:basic_deliver, payload, meta} ->
{n, _} = Integer.parse(payload)
IO.puts(" [.] #{inspect(self())} received request fib(#{n})")
response = fib(n)
AMQP.Basic.publish(
channel,
"",
meta.reply_to,
"#{response}",
correlation_id: meta.correlation_id
)
AMQP.Basic.ack(channel, meta.delivery_tag)
wait_for_messages(channel)
after
10_000 ->
IO.puts("#{inspect(self())} no message in 10 seconds")
after_callback.()
end
end
end
RPC client
defmodule FibClient do
def call(number) do
{:ok, {channel, close}} = open_connection()
{:ok, %{queue: tmp_queue_name}} = declare_temporary_queue(channel)
AMQP.Basic.consume(channel, tmp_queue_name, nil, no_ack: true)
exchange_name = ""
routing_key = FibServer.queue_name()
request = to_string(number)
correlation_id = generate_correlation_id()
AMQP.Basic.publish(
channel,
exchange_name,
routing_key,
request,
reply_to: tmp_queue_name,
correlation_id: correlation_id
)
wait_for_messages(channel, correlation_id)
end
def open_connection do
{:ok, conn} = AMQP.Connection.open()
{:ok, channel} = AMQP.Channel.open(conn)
close = fn -> AMQP.Connection.close(conn) end
{:ok, {channel, close}}
end
def declare_temporary_queue(channel) do
AMQP.Queue.declare(channel, "", exclusive: true)
end
def generate_correlation_id do
:erlang.unique_integer()
|> :erlang.integer_to_binary()
|> Base.encode64()
end
def wait_for_messages(channel, correlation_id, after_callback \\ fn -> nil end) do
receive do
{:basic_deliver, payload, meta} ->
{n, _} = Integer.parse(payload)
n
# It is nice to catch unexpected messages for debugging purposes
other ->
IO.puts("#{inspect(self())} unexpected message: #{inspect(other)}")
wait_for_messages(channel, correlation_id, after_callback)
after
10_000 ->
IO.puts("#{inspect(self())} no message in 10 seconds")
after_callback.()
end
end
end
Main
defmodule Main do
def main do
Task.start_link(fn ->
FibServer.call()
end)
for n <- 1..3 do
Process.sleep(3000)
Task.start_link(fn ->
IO.puts(" [x] #{inspect(self())} requesting fib(#{n})")
response = FibClient.call(n)
IO.puts(" [.] #{inspect(self())} got answer: fib(#{n}) = #{response}")
end)
end
:ok
end
end
Main.main()