Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

GenServer Intro

chapter2_genserver-intro.livemd

GenServer Intro

Intro

Task is useful for running single async functions. But now we’ll look at long lived processes that also run in the background and offer greater flexibility.

Concurrent work often takes a long time to complete. This is a challenge for the Task module.

You also may want to run code in the background continuously.

These are the kinds of problems that GenServer solves.

GenServer is short for generic server and enables us to create concurrent processes we can interact with. It listens for requests and can respond with a result.

GenServer processes hold their own state in memory until their process exits.

Basic GenServer

defmodule SendServer do
  use GenServer
end

Done! That’s a basic GenServer and we didn’t have to write any code. It also doesn’t really do anything yet.

GenServer provides default implementations of functions required for the behavior. The functions are known as callbacks and init/1 is one of them.

GenServer callbacks in depth

You implement a callback by declaring it in the module and providing your own implementation.

You need to know what arguments the callback function takes and what return values are supported.

Callback functions we’ll cover

  • handle_call/3
  • handle_cast/2
  • handle_continue/2
  • handle_info/2
  • init/1
  • terminate/2

There’s also these that won’t be covered here

  • code_change/2
  • format_status/2

Initializing

init/1 runs as soon as the process starts

When you start a GenServer process you can provide an optional list of arguments, these are given to the init/1 function. The init/1 function sets up the GenServer state.

defmodule SendServer do
  use GenServer

  def init(args) do
    IO.puts("Arguments: #{inspect(args)}")
    max_retries = Keyword.get(args, :max_retries, 5)
    state = %{emails: [], max_retries: max_retries}
    {:ok, state}
  end
end
{:ok, pid} = GenServer.start(SendServer, max_retries: 1)
GenServer.stop(pid)

Common returns from init/1

  • {:ok, state}
  • {:ok, state, {:continue, term}}
  • :ignore
  • {:stop, reason}

:continue, term is used for post-init work

Don’t do complex work from init/1 such as populating state from a database. The init/1 is synchronous and expected to be quick.

{:continue, term} will trigger the handle_continue/2 callback

e.g.

{:ok, state, {:continue, :fetch_from_database}}

def handle_continue(:fetch_from_database, state) do
  # do the things to fill state from database
end

:ignore and {:stop, reason} both keep the GenServer from starting. :stop will trigger the GenServer supervisor to restart the process while :ignore won’t.

Breaking down work

handle_continue/2 is a recent addition to GenServer

Since it receives the GenServer state it can return a new state

{:noreply, new_state}
def handle_continue(:fetch_from_database, state) do
  # get users from db and then...
  {:noreply, Map.put(state, :users, users)}
end

handle_continue/2 can also return {:continue, term} to continue a sequence of steps

Sending process messages

You interact with GenServers while they are running by sending them messages.

If you want to get information back: use GenServer.call/3 (sync)

If you don’t need a result you can use GenServer.cast/2 (async)

Respectively they will invoke the handle_call/3 and handle_cast/2 callbacks

defmodule SendServer do
  use GenServer

  def init(args) do
    IO.puts("Arguments: #{inspect(args)}")
    max_retries = Keyword.get(args, :max_retries, 5)
    state = %{emails: [], max_retries: max_retries}
    {:ok, state}
  end

  def handle_call(:get_state, _from, state) do
    {:reply, state, state}
  end
end
{:ok, pid} = GenServer.start(SendServer, max_retries: 1)
GenServer.call(pid, :get_state)
GenServer.stop(pid)
defmodule Sender do
  def send_email(email) do
    Process.sleep(3000)
    IO.puts("delivered email to #{email}")
    {:ok, :email_sent}
  end
end
defmodule SendServer do
  use GenServer

  def init(args) do
    IO.puts("Arguments: #{inspect(args)}")
    max_retries = Keyword.get(args, :max_retries, 5)
    state = %{emails: [], max_retries: max_retries}
    {:ok, state}
  end

  def handle_call(:get_state, _from, state) do
    {:reply, state, state}
  end

  def handle_cast({:send, email}, state) do
    Sender.send_email(email)
    emails = [%{email: email, status: "sent", retries: 0}] ++ state.emails
    {:noreply, %{state | emails: emails}}
  end
end
{:ok, pid} = GenServer.start(SendServer, max_retries: 1)
GenServer.cast(pid, {:send, "hello@email.com"})
GenServer.call(pid, :get_state)
GenServer.stop(pid)

Notifying process of events

A generic Process.send/2 message will trigger the handle_info/2 callback which works exactly like handle_cast/2

Usually handle_info/2 deals with system messages and cast/call is your server API.

defmodule Sender do
  def send_email("error@email.com"), do: :error

  def send_email(email) do
    Process.sleep(3000)
    IO.puts("delivered email to #{email}")
    {:ok, :email_sent}
  end
end
defmodule SendServer do
  use GenServer

  def init(args) do
    IO.puts("Arguments: #{inspect(args)}")
    max_retries = Keyword.get(args, :max_retries, 5)
    state = %{emails: [], max_retries: max_retries}
    Process.send_after(self(), :retry, 5000)
    {:ok, state}
  end

  def handle_call(:get_state, _from, state) do
    {:reply, state, state}
  end

  def handle_cast({:send, email}, state) do
    status =
      case Sender.send_email(email) do
        {:ok, :email_sent} -> "sent"
        :error -> "failed"
      end

    emails = [%{email: email, status: status, retries: 0}] ++ state.emails
    {:noreply, %{state | emails: emails}}
  end

  def handle_info(:retry, state) do
    {failed, done} =
      Enum.split_with(state.emails, fn item ->
        item.status == "failed" &amp;&amp; item.retries < state.max_retries
      end)

    retried =
      Enum.map(failed, fn item ->
        IO.puts("Retrying email #{item.email}...")

        new_status =
          case Sender.send_email(item.email) do
            {:ok, :email_sent} -> "sent"
            :error -> "failed"
          end

        %{email: item.email, status: new_status, retries: item.retries + 1}
      end)

    Process.send_after(self(), :retry, 5000)

    {:noreply, %{state | emails: retried ++ done}}
  end
end
{:ok, pid} = GenServer.start(SendServer, max_retries: 2)
GenServer.cast(pid, {:send, "hello@email.com"})
GenServer.cast(pid, {:send, "hello2@email.com"})
GenServer.cast(pid, {:send, "hello3@email.com"})
GenServer.cast(pid, {:send, "error@email.com"})
GenServer.call(pid, :get_state)
GenServer.stop(pid)

Process teardown

The final callback in the list is terminate/2 and usually invoked before the process exits but only when the process itself is responsible for the exit. Usually when returning {:stop, reason, state} from a callback (excluding init/1) or when an unhandled exception occurs in the process.

You cannot rely on terminate/2 always being called when a GenServer stops. If a process is forced to exit due to an external event such as the whole application shutting down then terminate/2 may not be called.

If you need to ensure terminate/2 is always called look into setting Process.flag(:trap_exit, true) or using Process.monitor/1 to perform the required work in a separate process.

defmodule SendServer do
  use GenServer

  def init(args) do
    IO.puts("Arguments: #{inspect(args)}")
    max_retries = Keyword.get(args, :max_retries, 5)
    state = %{emails: [], max_retries: max_retries}
    Process.send_after(self(), :retry, 5000)
    {:ok, state}
  end

  def handle_call(:get_state, _from, state) do
    {:reply, state, state}
  end

  def handle_cast({:send, email}, state) do
    status =
      case Sender.send_email(email) do
        {:ok, :email_sent} -> "sent"
        :error -> "failed"
      end

    emails = [%{email: email, status: status, retries: 0}] ++ state.emails
    {:noreply, %{state | emails: emails}}
  end

  def handle_info(:retry, state) do
    {failed, done} =
      Enum.split_with(state.emails, fn item ->
        item.status == "failed" &amp;&amp; item.retries < state.max_retries
      end)

    retried =
      Enum.map(failed, fn item ->
        IO.puts("Retrying email #{item.email}...")

        new_status =
          case Sender.send_email(item.email) do
            {:ok, :email_sent} -> "sent"
            :error -> "failed"
          end

        %{email: item.email, status: new_status, retries: item.retries + 1}
      end)

    Process.send_after(self(), :retry, 5000)

    {:noreply, %{state | emails: retried ++ done}}
  end

  def terminate(reason, _state) do
    IO.puts("Shutting down with reason: #{reason}")
  end
end
{:ok, pid} = GenServer.start(SendServer, max_retries: 1)
GenServer.stop(pid)

This SendServer implementation is far from ideal.

If you try running GenServer.cast(pid, {:send, "some@email"}) several times in a row then each cast will return :ok right away but the server will be blocked from doing work as it waits for each email in turn.

If you call :get_state while delivering emails you will get a timeout as the server is blocked doing email delivery work.

You can use the Task module with GenServer and have the Task module send messages back to the GenServer on completion. See Task.Supervisor.async_nolink/3

Next we’ll fix these issues by building a job processing system capable of performing any job concurrently.