Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

RabbitMQ Remote procedure call (RPC)

notebooks/amqp/rabbitmq_06_rpc.livemd

RabbitMQ Remote procedure call (RPC)

import IEx.Helpers

Mix.install([{:amqp, "~> 3.0"}])

About

  • Distribute time-consuming tasks among multiple workers
  • Build an RPC system: a client and a scalable RPC server

Resources

Remote procedure call (RPC)

  • Run a function on a remote computer and wait for the result
  • Should ideally be idempotent
  • In general, doing RPC over RabbitMQ is easy
  • Common pattern in computing, but often criticized for confusion, complexity etc

Message properties

  • The AMQP 0-9-1 protocol predefines a set of 14 properties that go with a message
message properties
persistent Marks a message as persistent
content_type The mime-type of the encoding
reply_to Commonly used to name a callback queue
correlation_id Useful to correlate RPC responses with requests

Info

Time-consuming work

defmodule Fib do
  def fib(0), do: 0
  def fib(1), do: 1
  def fib(n) when n > 1, do: fib(n - 1) + fib(n - 2)
end

RPC server

defmodule FibServer do
  import Fib

  def queue_name, do: "rpc_queue"

  def call do
    {:ok, {channel, _close}} = open_connection()

    AMQP.Queue.declare(channel, queue_name())
    AMQP.Basic.qos(channel, prefetch_count: 1)
    AMQP.Basic.consume(channel, queue_name())

    IO.puts(" [x] #{inspect(self())} awaiting RPC requests")
    wait_for_messages(channel)
  end

  def open_connection do
    {:ok, conn} = AMQP.Connection.open()
    {:ok, channel} = AMQP.Channel.open(conn)
    close = fn -> AMQP.Connection.close(conn) end

    {:ok, {channel, close}}
  end

  def wait_for_messages(channel, after_callback \\ fn -> nil end) do
    receive do
      {:basic_deliver, payload, meta} ->
        {n, _} = Integer.parse(payload)
        IO.puts(" [.] #{inspect(self())} received request fib(#{n})")
        response = fib(n)

        AMQP.Basic.publish(
          channel,
          "",
          meta.reply_to,
          "#{response}",
          correlation_id: meta.correlation_id
        )

        AMQP.Basic.ack(channel, meta.delivery_tag)

        wait_for_messages(channel)
    after
      10_000 ->
        IO.puts("#{inspect(self())} no message in 10 seconds")
        after_callback.()
    end
  end
end

RPC client

defmodule FibClient do
  def call(number) do
    {:ok, {channel, close}} = open_connection()

    {:ok, %{queue: tmp_queue_name}} = declare_temporary_queue(channel)

    AMQP.Basic.consume(channel, tmp_queue_name, nil, no_ack: true)

    exchange_name = ""
    routing_key = FibServer.queue_name()
    request = to_string(number)
    correlation_id = generate_correlation_id()

    AMQP.Basic.publish(
      channel,
      exchange_name,
      routing_key,
      request,
      reply_to: tmp_queue_name,
      correlation_id: correlation_id
    )

    wait_for_messages(channel, correlation_id)
  end

  def open_connection do
    {:ok, conn} = AMQP.Connection.open()
    {:ok, channel} = AMQP.Channel.open(conn)
    close = fn -> AMQP.Connection.close(conn) end

    {:ok, {channel, close}}
  end

  def declare_temporary_queue(channel) do
    AMQP.Queue.declare(channel, "", exclusive: true)
  end

  def generate_correlation_id do
    :erlang.unique_integer()
    |> :erlang.integer_to_binary()
    |> Base.encode64()
  end

  def wait_for_messages(channel, correlation_id, after_callback \\ fn -> nil end) do
    receive do
      {:basic_deliver, payload, meta} ->
        {n, _} = Integer.parse(payload)
        n

      # It is nice to catch unexpected messages for debugging purposes
      other ->
        IO.puts("#{inspect(self())} unexpected message: #{inspect(other)}")
        wait_for_messages(channel, correlation_id, after_callback)
    after
      10_000 ->
        IO.puts("#{inspect(self())} no message in 10 seconds")
        after_callback.()
    end
  end
end

Main

defmodule Main do
  def main do
    Task.start_link(fn ->
      FibServer.call()
    end)

    for n <- 1..3 do
      Process.sleep(3000)

      Task.start_link(fn ->
        IO.puts(" [x] #{inspect(self())} requesting fib(#{n})")
        response = FibClient.call(n)
        IO.puts(" [.] #{inspect(self())} got answer: fib(#{n}) = #{response}")
      end)
    end

    :ok
  end
end

Main.main()