Powered by AppSignal & Oban Pro

GenServer

notebooks/genserver.livemd

GenServer

Introduction

This is my note when studying about GenServer with the book Elixir Patterns book & Livebooks.

example: simple stateful server

defmodule MyQueue do
  # By using the `:transient` restart policy, we encure that our queue is
  # automatically restarted by the supervisor whenever it is stopped with a
  # reason other than `:normal`, `:shutdown` or `{:shutdown, value}`.
  use GenServer, restart: :transient

  ## client api

  # We allow the user of this module to provide a server name. By default,
  # start a singleton process that can only be started once per BEAM instance.
  def start_link(initial_elements, name \\ __MODULE__) do
    GenServer.start_link(__MODULE__, initial_elements, name: name)
  end

  def len(pid \\ __MODULE__) do
    GenServer.call(pid, :len)
  end

  def push(pid \\ __MODULE__, element) do
    # make asynchronous calls
    GenServer.cast(pid, {:push, element})
  end

  def pop(pid \\ __MODULE__) do
    # make synchronous calls
    GenServer.call(pid, :pop)
  end

  ## server callbacks

  @impl GenServer
  def init(initial_elements) do
    state = %{
      queue: :queue.from_list(initial_elements),
      # We manually track the queue length so we do not need to traverse
      # the queue via :queue.len/1.
      len: length(initial_elements)
    }

    {:ok, state}
  end

  @impl GenServer
  def handle_call(:len, _from, %{len: len} = state) do
    {:reply, len, state}
  end

  @impl GenServer
  def handle_call(:pop, _from, state) do
    case :queue.out(state.queue) do
      {:empty, updated_queue} ->
        updated_state = %{state | queue: updated_queue, len: 0}

        {:reply, :empty, updated_state}

      {{:value, element}, updated_queue} ->
        updated_state = %{state | queue: updated_queue, len: state.len - 1}

        {:reply, element, updated_state}
    end
  end

  @impl GenServer
  def handle_cast({:push, element}, state) do
    updated_queue = :queue.in(element, state.queue)
    updated_state = %{state | queue: updated_queue, len: state.len + 1}

    {:noreply, updated_state}
  end
end
MyQueue.start_link([])
for n <- 1..9 do
  MyQueue.push(n)
end
:sys.get_state(MyQueue)
MyQueue.len()
MyQueue.pop()
GenServer.stop(MyQueue)

testing GenServer as functions

  • test callback functions without starting or managing any running processes
  • produces no side effects
  • can run in parallel with any other test in the test suite
ExUnit.start(auto_run: false)

defmodule MyQueueTest.Stateless do
  use ExUnit.Case

  test "returns :empty when popping from an empty queue" do
    {:ok, initial_state} = MyQueue.init([])

    {:reply, client_return, _new_state} = MyQueue.handle_call(:pop, self(), initial_state)

    assert client_return == :empty
  end
end

ExUnit.run()

testing stateful GenServer

  • it would be nice to perform an integration test on a GenServer to ensure that it behaves as expected in a real-world environment
  • the ExUnit start_supervised/2 function
    • starts the queue GenServer for the test
    • guarantees that the process it starts will also be terminated after the test case completes, regardless of whether it encounters an error or completes normally
  • the ExUnit context map
    • given to every test that is run with ExUnit
    • contains information about the currently executing test
      • :module - the name of the test module as atom
      • :test - the name of the test test as atom
      • etc
  • the ExUnit process
    • each test runs in its own process
    • the setup block runs under the context of that test PID
ExUnit.start(auto_run: false)

defmodule MyQueueTest.Stateful do
  use ExUnit.Case

  describe "MyQueue" do
    setup %{module: module, test: test} = _context do
      initial_elements = []
      # create a unique name for the queue GenServer process so that it doesn’t
      # collide with another test starting its own queue GenServer process.
      queue_name = Module.concat([module, test, Queue])

      # define a custom child spec for the queue GenServer so we can control
      # how start_link is called
      child_spec = %{
        # The :id field is used solely by the supervisor and does not impact
        # the ability to look up the running GenServer by its name. If you want
        # to start the same process twice under the same supervisor, you will
        # need different id values.
        id: MyQueue,
        restart: :transient,
        start: {MyQueue, :start_link, [initial_elements, queue_name]}
      }

      {:ok, queue_pid} = start_supervised(child_spec)

      # we can have access to this data in the updated context map in the test
      %{
        queue_pid: queue_pid,
        queue_name: queue_name
      }
    end

    test "returns the first element in the queue", %{queue_name: queue_name} = context do
      context |> dbg()

      MyQueue.push(queue_name, 1)
      MyQueue.push(queue_name, 2)

      value = MyQueue.pop(queue_name)

      assert value == 1
      # Just to ensure that the default queue GenServer name has not been
      # registered, we can write a simple assertion to validate that it has
      # indeed not been started.
      refute Process.whereis(MyQueue)
    end
  end
end

ExUnit.run()

example: simple loop with handle_continue/2 and handle_info/2

  • This is a singleton process as we do not require multiple processes for reporting on memory usage.
  • Aside from the start_link/1 client-facing function that we wrote, there are no other client-facing functions that other processes can leverage to interact with this cron process. In situations like this where you need something to run in the background, all your work can take place in the handle_info/2 and handle_continue/2 callbacks.
defmodule MyCronJob do
  use GenServer, restart: :transient
  require Logger

  ## client api

  def start_link(run_interval_ms) when is_number(run_interval_ms) do
    GenServer.start_link(__MODULE__, run_interval_ms, name: __MODULE__)
  end

  ## server callbacks

  @impl GenServer
  def init(run_interval_ms) do
    {:ok, run_interval_ms, {:continue, :schedule_next_run}}
  end

  @impl GenServer
  def handle_continue(:schedule_next_run, run_interval_ms) do
    Process.send_after(self(), :perform_cron_work, run_interval_ms)

    {:noreply, run_interval_ms}
  end

  @impl GenServer
  def handle_info(:perform_cron_work, run_interval_ms) do
    # List the top 3 memory hogs
    memory_hogs =
      Process.list()
      |> Enum.map(fn pid ->
        {:memory, memory} = Process.info(pid, :memory)
        {pid, memory}
      end)
      |> Enum.sort_by(fn {_, memory} -> memory end, :desc)
      |> Enum.take(3)

    Logger.info("Top 3 memory hogs: #{inspect(memory_hogs)}")

    {:noreply, run_interval_ms, {:continue, :schedule_next_run}}
  end
end
MyCronJob.start_link(:timer.seconds(5))
GenServer.stop(MyCronJob)