GenServer
Introduction
This is my note when studying about GenServer with the book Elixir Patterns book & Livebooks.
example: simple stateful server
defmodule MyQueue do
# By using the `:transient` restart policy, we encure that our queue is
# automatically restarted by the supervisor whenever it is stopped with a
# reason other than `:normal`, `:shutdown` or `{:shutdown, value}`.
use GenServer, restart: :transient
## client api
# We allow the user of this module to provide a server name. By default,
# start a singleton process that can only be started once per BEAM instance.
def start_link(initial_elements, name \\ __MODULE__) do
GenServer.start_link(__MODULE__, initial_elements, name: name)
end
def len(pid \\ __MODULE__) do
GenServer.call(pid, :len)
end
def push(pid \\ __MODULE__, element) do
# make asynchronous calls
GenServer.cast(pid, {:push, element})
end
def pop(pid \\ __MODULE__) do
# make synchronous calls
GenServer.call(pid, :pop)
end
## server callbacks
@impl GenServer
def init(initial_elements) do
state = %{
queue: :queue.from_list(initial_elements),
# We manually track the queue length so we do not need to traverse
# the queue via :queue.len/1.
len: length(initial_elements)
}
{:ok, state}
end
@impl GenServer
def handle_call(:len, _from, %{len: len} = state) do
{:reply, len, state}
end
@impl GenServer
def handle_call(:pop, _from, state) do
case :queue.out(state.queue) do
{:empty, updated_queue} ->
updated_state = %{state | queue: updated_queue, len: 0}
{:reply, :empty, updated_state}
{{:value, element}, updated_queue} ->
updated_state = %{state | queue: updated_queue, len: state.len - 1}
{:reply, element, updated_state}
end
end
@impl GenServer
def handle_cast({:push, element}, state) do
updated_queue = :queue.in(element, state.queue)
updated_state = %{state | queue: updated_queue, len: state.len + 1}
{:noreply, updated_state}
end
end
MyQueue.start_link([])
for n <- 1..9 do
MyQueue.push(n)
end
:sys.get_state(MyQueue)
MyQueue.len()
MyQueue.pop()
GenServer.stop(MyQueue)
testing GenServer as functions
- test callback functions without starting or managing any running processes
- produces no side effects
- can run in parallel with any other test in the test suite
ExUnit.start(auto_run: false)
defmodule MyQueueTest.Stateless do
use ExUnit.Case
test "returns :empty when popping from an empty queue" do
{:ok, initial_state} = MyQueue.init([])
{:reply, client_return, _new_state} = MyQueue.handle_call(:pop, self(), initial_state)
assert client_return == :empty
end
end
ExUnit.run()
testing stateful GenServer
- it would be nice to perform an integration test on a GenServer to ensure that it behaves as expected in a real-world environment
-
the ExUnit
start_supervised/2function- starts the queue GenServer for the test
- guarantees that the process it starts will also be terminated after the test case completes, regardless of whether it encounters an error or completes normally
-
the ExUnit context map
- given to every test that is run with ExUnit
-
contains information about the currently executing test
-
:module- the name of the test module as atom -
:test- the name of the test test as atom - etc
-
-
the ExUnit process
- each test runs in its own process
- the setup block runs under the context of that test PID
ExUnit.start(auto_run: false)
defmodule MyQueueTest.Stateful do
use ExUnit.Case
describe "MyQueue" do
setup %{module: module, test: test} = _context do
initial_elements = []
# create a unique name for the queue GenServer process so that it doesn’t
# collide with another test starting its own queue GenServer process.
queue_name = Module.concat([module, test, Queue])
# define a custom child spec for the queue GenServer so we can control
# how start_link is called
child_spec = %{
# The :id field is used solely by the supervisor and does not impact
# the ability to look up the running GenServer by its name. If you want
# to start the same process twice under the same supervisor, you will
# need different id values.
id: MyQueue,
restart: :transient,
start: {MyQueue, :start_link, [initial_elements, queue_name]}
}
{:ok, queue_pid} = start_supervised(child_spec)
# we can have access to this data in the updated context map in the test
%{
queue_pid: queue_pid,
queue_name: queue_name
}
end
test "returns the first element in the queue", %{queue_name: queue_name} = context do
context |> dbg()
MyQueue.push(queue_name, 1)
MyQueue.push(queue_name, 2)
value = MyQueue.pop(queue_name)
assert value == 1
# Just to ensure that the default queue GenServer name has not been
# registered, we can write a simple assertion to validate that it has
# indeed not been started.
refute Process.whereis(MyQueue)
end
end
end
ExUnit.run()
example: simple loop with handle_continue/2 and handle_info/2
- This is a singleton process as we do not require multiple processes for reporting on memory usage.
-
Aside from the
start_link/1client-facing function that we wrote, there are no other client-facing functions that other processes can leverage to interact with this cron process. In situations like this where you need something to run in the background, all your work can take place in thehandle_info/2andhandle_continue/2callbacks.
defmodule MyCronJob do
use GenServer, restart: :transient
require Logger
## client api
def start_link(run_interval_ms) when is_number(run_interval_ms) do
GenServer.start_link(__MODULE__, run_interval_ms, name: __MODULE__)
end
## server callbacks
@impl GenServer
def init(run_interval_ms) do
{:ok, run_interval_ms, {:continue, :schedule_next_run}}
end
@impl GenServer
def handle_continue(:schedule_next_run, run_interval_ms) do
Process.send_after(self(), :perform_cron_work, run_interval_ms)
{:noreply, run_interval_ms}
end
@impl GenServer
def handle_info(:perform_cron_work, run_interval_ms) do
# List the top 3 memory hogs
memory_hogs =
Process.list()
|> Enum.map(fn pid ->
{:memory, memory} = Process.info(pid, :memory)
{pid, memory}
end)
|> Enum.sort_by(fn {_, memory} -> memory end, :desc)
|> Enum.take(3)
Logger.info("Top 3 memory hogs: #{inspect(memory_hogs)}")
{:noreply, run_interval_ms, {:continue, :schedule_next_run}}
end
end
MyCronJob.start_link(:timer.seconds(5))
GenServer.stop(MyCronJob)