Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Elixir notes on processes: Task, Genserver, supervisor

tasks-genservers.livemd

Elixir notes on processes: Task, Genserver, supervisor

Mix.install([
  {:kino, "~> 0.8.0"},
  {:httpoison, "~> 2.0"},
  {:jason, "~> 1.4"}
])

Supervisor.start_link(
  [
    {DynamicSupervisor, name: DynSup, strategy: :one_for_one},
    {Registry, keys: :unique, name: MyRegistry},
    {Task.Supervisor, name: MyTaskSupervisor}
  ],
  strategy: :one_for_one,
  name: MySupervisor
)

Introduction

Elixir/Erlang has two main modules, Task and Genservers, to run code concurrently. When you need to leverage concurency and run code in a short-lived stateless single function without interaction, you use the Task module. When you need a long-running stateful process where you need control and interactivity, you run the code with a GenServer.

Task

Suppose you have to send millions of emails. It is a short-lived process which exits as soon as completed. You can run this code with a Task to leverage concurrency and use Task.async.

We spawn 2 processes:

defmodule Email1 do
  require Logger
  @emails ["a@com", "b@com"]

  def send_email(email) do
    Process.sleep(1_000)
    Logger.info("#{email} received")
  end

  def run do
    Enum.each(@emails, fn email -> Task.async(fn -> send_email(email) end) end)
  end
end

Time.utc_now() |> IO.inspect(label: :before)
Email1.run()
Time.utc_now() |> IO.inspect(label: :after)

> Note that in case of failure, the error will bubble up (see “Supervisor” further down).

Get the response

We can use Enum.each if we just want to run the tasks. It returns :ok. If we want to the response of the task, we use Enum.map instead because this returns a list of corresponding Task structs. This struct is used by the Task.await or Task.yield functions to get the result.

An “awaited” task returns the result or error whilst a “yielded” task returns {:ok, result} or {:error, reason}. Tasks are supposed to run within a certain amount of time. The parameter key is :timeout, defaults to 5000.

t_await = Task.async(fn -> 1 end)
t_yield = Task.async(fn -> 2 end)
%{task: t_await, _await: Task.await(t_await), _yield: Task.yield(t_yield)}

We set the optional “timeout” value in ms in the Task.yield function. When the task isn’t completed within this period, it returns nil.

Task.async(fn -> Process.sleep(1000) end)
|> Task.yield(500)

We can run repeatedly yield whilst not await.

defmodule Continue do
  require Logger

  def _yield(task, timeout: time) do
    case Task.yield(task, time) do
      nil ->
        Logger.info("called again")
        _yield(task, timeout: time)

      result ->
        result
    end
  end
end

Task.async(fn -> Process.sleep(3000) end)
|> Continue._yield(timeout: 500)

Async_stream

Instead of doing Enum.map |> Task.async, we can do in one go with Task.async_stream. It is designed to create task processes from a list of items. The key point is we can set a limit on the number of processes running at the same time with :max_concurrency. This provides back-pressure.

The function Task.async_stream returns a stream, a list of functions ready to be called. We use Stream.run/1 to run it when you don’t care about the result. It returns :ok. If we want a result, we use Enum.to_list/1.

chars = Enum.map(?a..?l, fn x -> <> <> "@com" end)

The function will run 4 concurrent stream so the 12 emails will be processed in 3 runs.

defmodule MyTasks do
  require Logger
  @emails chars

  def send_email(email) do
    Process.sleep(500)
    Logger.info("#{inspect(self())} processed #{email}")
  end

  def back_pressured do
    Task.Supervisor.async_stream_nolink(
      MyTaskSupervisor,
      @emails,
      &amp;send_email(&amp;1),
      max_concurrency: 4,
      ordered: false
    )
  end
end

Time.utc_now() |> IO.inspect(label: :before)
MyTasks.back_pressured() |> Stream.run()
Time.utc_now() |> IO.inspect(label: :after)

# |> Enum.to_list()

Usefull parameters are:

  • on_timeout: :kill_task
  • ordered: false

The function System.schedulers_online() returns the number of available cores that the function would use by default in the max_concurrency parameter.

Example of task: cron job

We want to mimic a cron job with a Task. The task runs another task and is turned into a long-running process by calling itself when the main process receives a delayed message.

We used Process.send_after for not to use sleep. You need to build a permanent message receiver. Another implementation is done further down with a genserver, who supports messages handling among other benefits.

defmodule Period do
  use Task

  def start(args) do
    args = Keyword.merge(args, parent: self())
    Task.start(__MODULE__, :do_run, [args])
  end

  def do_run([every: time, run: f, parent: parent] = args) do
    Task.async(fn -> apply(f, []) end)
    # send to the main process
    Process.send_after(parent, {:loop, args}, time)
  end
end

defmodule Clean do
  require Logger
  def up, do: Logger.info("cleaned!")
end

# continuous listener on the main process
defmodule Listen do
  def loop do
    receive do
      {:loop, args} ->
        # on message, run the task
        Period.do_run(args)
        loop()
    after
      10_000 -> :timeout
    end
  end
end

##### The final API:
pid =
  spawn(fn ->
    Period.start(
      every: 3_000,
      run: fn -> Clean.up() end
    )

    Listen.loop()
  end)

Process.sleep(10_000)
Process.exit(pid, :shutdown)

> It is preferable to use the supervised form Task.Superivsor.async_nolink. See further down.

GenServer

A Genserver is a (spawned) single threaded process. It is a mechanism to run a long-lived stateful process that can communicate with other processes. It works with client-side functions and server-side callbacks and message passing. Messages are processed sequentially, “one at a time”.

The client functions are accessible from everywhere. The callbacks are functions that you can’t call; they exist only in the module that implements the genserver behaviour; you have to implement them within the genserver module.

Reading about using genservers: spawn or not

> A process in Erlang/Elixir is a memory isolated and single threaded code runner; it has a mailbox and an address and communicates with asynchronous message passing.

The main client/server built-in functions in a genserver are: cf Genserver cheat sheet.

cilent / callback what is does replies? what it returns
GenServer.start(args) $\to$ init(init_state) spawns a genserver and inits the state yes {:ok, pid} or {:stop, reason}
GenServer.call(to,match) $\to$ handle_call(pid, from, match) internal sync to perform a task and replies response yes {:reply, response, state}
GenServer.cast(to, match) $\to$ handle_cast(pid, match) internal async to perform a task without a response no {:noreply, state}
GenServer.call $\to$ GenServer.reply async delayed response yes :ok
send(pid, msg) $\to$ handle_info(msg, state) handle an async message from any process without a response no {:noreply, state}

You can use client functions in the callbacks, except:

> ❗ it is not recommended to await a long-running task inside a genserver. Since suchs tasks are monitored by the genserver and return 2 messages, these message should be captured in a handle_info . Uou can also return a delayed response with GenServer.reply from a GenServer.call (see the example in “Running Async Tasks”)

> ❗ You cannot use a GenServer.call within a handle_cast or and handle_info callback because none of them returns whilst call awaits for a response to run. Use instead send and handle_info.

Get info on a process

The module Process offers routines such as Process.alive?(pid) or Process.info(pid) that gives information: on the linked process |> Keyword.get(:links), on the message queue |> Keyworkd.get(:message_queue_len)

The module sys offers primitives for debugging :sys.trace(pid, true), to retrieve the state :sys.get_state(pid).

Note on the __MODULE__ placeholder

The variable __MODULE__ is a placeholder. It is a compilation environment macro which is the current module name as an atom. It is convenient to use this variable instead of writting the literal module name.

defmodule Test do
  IO.puts("This module is named #{__MODULE__} and is an atom: #{is_atom(__MODULE__)}")
end

is_atom(Test)

How to start a Genserver

A genserver is declared with the behaviour use GenServer and is started with (a function that implements a) GenServer.start_link or start to which you pass the tuple {MODULE_NAME, state, options}. This function returns {:ok, pid} or {:error, {:already_started, pid}}. This in turn automatically invokes the callback init/1 to instanciate the state. We are required to define the init callback. You add @impl true to warn the compiler you are defining a callback.

An example of a minimal (and useless) genserver - where we put $1$ in its state - would be:

defmodule MinimalGS do
  use GenServer

  @impl true
  def init(1), do: {:ok, 1}
end

# instantiate
{:ok, pid} = GenServer.start(MinimalGS, 1)

We can check its state:

:sys.get_state(pid)

You elaborate custom client functions to use callbacks. It relies on pattern matching between the client function and the server callback. In the genserver module below, we have implemented only the callbacks:

defmodule FirstGs do
  use GenServer

  @impl true
  def init(init_args), do: {:ok, init_args}

  @impl true
  def handle_call(:double, _from, state), do: {:reply, state * 2, state}
  def handle_call({:triple, n}, _from, state), do: {:reply, n * 3, state}
end

You can instantiate genservers “on-the-fly”, identified by their PID:

for i <- 1..3 do
  {:ok, pid} = GenServer.start(FirstGs, i)

  %{
    pid: pid,
    double: GenServer.call(pid, :double),
    triple: GenServer.call(pid, {:triple, i}),
    state: :sys.get_state(pid)
  }
end

How to stop a genserver

This can be done from a client function by invoking a GenServer.stop(pid, :code) order or equivalently letting a callback return {:stop, reason, state} from a callback. The optional callback terminate is triggered and can be used to handle cleanups before termination.

Other reason than :normal or :shutdown is considered as a crash by OTP. You may set Process.flag(:trap_exit, true) in the init callback to capture the :EXIT and let terminate run (eg save state into a database before exiting).

You can also raise "message"

GenServer.stop(pid, :code)

#or return a `:stop` based tuple in a callback
{:stop, :reason, state}

# or raise in th callback
raise "Stop me"

# or `Process.exit` in a callback
Process.exit(self(), :reason)

More indeep material:

if Process.alive?(pid) do
  :sys.get_state(pid) |> IO.inspect(label: " the state of the genserver LongInitGs is: ")
  GenServer.stop(pid)
  Process.alive?(pid)
end
Example of genserver usage

Long running code is discouraged in the init callback. You may want use the :continue instruction and implement the handle_continue callback to guaranty that this operation is finished before the Genserver handles another message/task. This is a blocking operation and avoids potential race conditions.

The following usage of a genserver is an illustration; it is not appropriate as the simple Task used above is enough as you don’t need to interact with this task. We implement again a genserver to run a cron job. It uses handle_continue. Once launched, it will run a reccurent job forever. We then kill the process shortly after.

defmodule Periodic do
  use GenServer
  require Logger
  @start Time.utc_now()

  defp time, do: Time.diff(Time.utc_now(), @start, :millisecond)

  def start_link(opts), do: GenServer.start(__MODULE__, opts)

  def init(args) do
    Logger.info("#{inspect(time())}ms: First Init")
    {:ok, args, {:continue, :run}}
  end

  def handle_continue(:run, [every: time, run: _f] = state) do
    Logger.info("#{inspect(time())}ms: Continue")
    Process.send_after(self(), :repeat, time)
    {:noreply, state}
  end

  def handle_info(:repeat, [every: time, run: myfun] = state) do
    Logger.info("#{inspect(time())}ms: repeat")
    Task.Supervisor.async_nolink(MyTaskSupervisor, fn -> apply(myfun, []) end)
    Process.send_after(self(), :repeat, time)
    {:noreply, state}
  end

  def handle_info({_, msg}, state) do
    Logger.info("message from task: #{msg}")
    {:noreply, state}
  end

  def handle_info(_msg, state) do
    # IO.inspect(msg)
    {:noreply, state}
  end
end

defmodule Send do
  def cleaner do
    Process.sleep(500)
    :cleaned!
  end
end

{:ok, pid} =
  Periodic.start_link(
    every: 3_000,
    run: fn -> Send.cleaner() end
  )

Process.sleep(11_000)
GenServer.stop(pid, :shutdown)

Linking Genservers

You can link processes with Process.link/1. This has to be used within a process: the parent is implicit and the child’s PID is given as an argument. This is what GenServer.start_link does. It starts a bi-directional link with the caller. If you instantiate with GenServer.start, no link is created.

> ❗ If a process is started with start_link, then this process will be linked to the current process, the Livebook. This means the livebook fails when the genserver fails (ie with other reason than :normal or :shutdown). One can use Process.flag(:trap_exit, :true) to capture the message {:EXIT, from, reason}. In Elixir, the Supervisor behaviour is meant to manage these links and restart policies.

Example

Let’s give an example with two linked genservers. The genserver Parent is instanciated with GenServer.start and it instantiates another genserver, called Child with start_link. When we kill one or another, both are killed.

defmodule Child do
  use GenServer
  # -----
  @impl true
  def init(_), do: {:ok, {}}

  @impl true
  def terminate(reason, _), do: IO.inspect(reason, label: :terminate)
end
defmodule Parent do
  use GenServer
  def link(pid, module), do: GenServer.call(pid, {:link, module})

  # -------
  @impl true
  def init(_), do: {:ok, {}}

  @impl true
  def handle_call({:link, module}, _, _), do: {:reply, GenServer.start_link(module, :ok), {}}

  @impl true
  def terminate(reason, _), do: IO.inspect(reason, label: :terminate)
end

The routine Process.info returns a keyword list, in particular the the linked processes. The routine Process.alive? returns a boolean.

We link Parent and child:

# instantiate a "Parent" genserver
{:ok, pid_parent} = GenServer.start(Parent, :ok) |> IO.inspect(label: :parent)
# instantiate a linked genserver "Child"#
{:ok, pid_child} = Parent.link(pid_parent, Child) |> IO.inspect(label: :child)

{Process.alive?(pid_parent), Process.alive?(pid_child)}
|> IO.inspect(label: :start)

Process.info(pid_parent) |> Keyword.get(:links) |> IO.inspect(label: :check_links_of_parent)
Process.info(pid_child) |> Keyword.get(:links) |> IO.inspect(label: :check_links_of_child)

# check 1
{Process.alive?(pid_parent), Process.alive?(pid_child)}
|> IO.inspect(label: :start)

We stop the parent: both are stopped.

GenServer.stop(pid_parent, :shutdown)
# check 1
{Process.alive?(pid_parent), Process.alive?(pid_child)}

We restart and stop the child: both are stopped.

{:ok, pid_parent} = GenServer.start(Parent, :ok)
{:ok, pid_child} = Parent.link(pid_parent, Child)

{Process.alive?(pid_parent), Process.alive?(pid_child)}
|> IO.inspect(label: :start)

# stop Child
GenServer.stop(pid_child, :shutdown)
# check 2
{Process.alive?(pid_parent), Process.alive?(pid_child)}

> ❗ If we instantiate Parent with GenServer.start_link, this will link Parent with the caller which is the main process, this Livebook. If we stop Parent, this will stop the Livebook, which we don’t want.

Restart strategies of processes ( Genserver, Task)

When you instantiate a genserver, you set the restart strategy by declaring it in: use GenServer, restart: :strategy where:

  • restart: :temporary: the process will not be restarted even if it crashes,
  • restart: :transient: when you want the process to be restarted for non-successful exits,
  • restart: :permanent: always restarted. This is the default behaviour

Supervisor

A supervisor is a “state machine”: you declare with functions you want to maintain running. This relies on linking processes together.

Test with a Genserver

We start and supervise a genserver with Supervisor.start_link and pass a list of child_spec with a strategy. It is a good idea to use the convention to name MyModule.start_link the client function that invokes GenServer.start_link because of the defaults. ❗ Note that you have to use GenServer.start_link to link the supervisor and the module.

> ❗❗ The supervisor expects a return in the form {:ok, pid} from the function that starts a process. This means function can’t contain any other instruction else than GenServer.start_link.

Let’s supervise this minimal genserver:

defmodule SupMinimalGs do
  use GenServer
  # <- returns {:ok,pid} for the supervisor
  def start_link, do: GenServer.start_link(__MODULE__, nil)

  @impl true
  def init(_), do: {:ok, nil}
end

The supervisor expects child_specs. It can be the module name or a map. If the supervised module is a genserver with no state, then we have a shortcut: the genserver name. Here we use show case an explicit child_spec map.

child_spec = %{id: SupMinimalGs, start: {SupMinimalGs, :start_link, []}}

Supervisor.start_link([child_spec], strategy: :one_for_one, name: TestSupervisor)
# or
Supervisor.start_link([SupMinimalGs], strategy: :one_for_one, name: TestSupervisor)

In action, we instanciate a supervised module, check it is alive and kill it.

child_spec = %{id: SupMinimalGs, start: {SupMinimalGs, :start_link, []}}
Supervisor.start_link([child_spec], strategy: :one_for_one, name: TestSupervisor)
# Supervisor.start_link([SupMinimalGs], strategy: :one_for_one, name: TestSupervisor)

[{SupMinimalGs, pid, _, _}] =
  Supervisor.which_children(TestSupervisor)
  |> Kino.inspect()

IO.puts("the genserver with pid #{inspect(pid)} is supervised")
Process.alive?(pid) |> Kino.inspect(label: :alive?)

GenServer.stop(pid, :shutdown) |> IO.inspect(label: :genserver_stopped)
Process.sleep(100)
Process.alive?(pid) |> Kino.inspect(label: :alive_after_exit?)

Then we observe that the supervisor created a new one.

Process.sleep(1000)

# we check agai which children are supervised
[{SupMinimalGs, pid, _, _}] =
  Supervisor.which_children(TestSupervisor)
  |> Kino.inspect()

IO.puts("a new genserver is created with pid: #{inspect(pid)}")
Process.alive?(pid) |> Kino.inspect(label: :recreated_and_alive?)

Save the state

You notice that the state of the module is reset to the initial values. One way is to save the state in a database (ETS for example) in the terminate callback (for “normal” shutdowns or when :EXIT is trapped), and read from the database the the init callback.

Supervision tree

You normally start it within the module with the behaviour use Application with Supervisor.start_link. You can also define a module with the behaviour use Supervisor.

Strategies of Supervisors
  • strategy: :one_for_all. If a child process terminates, all other child processes are terminated and then all child processes (including the terminated one) are restarted.

  • strategy :one_for_one. Only restart the failed child process

  • strategy :rest_for_one. Restart the failed process and any process started after it.

Look at the supervision tree: we use a Dynamical Supervisor (see below), a Registry (see later) and a Task.Supervisor later so we start and supervise them here.

Process.whereis(MySupervisor)

The Supervisor module comes with some useful routines:

pid = Process.whereis(MySupervisor)
Supervisor.count_children(pid)
Supervisor.which_children(MySupervisor)

Dynamic Supervisor

A dynamical supervisor is design to supervise “on-the-fly” singleton modules, such as genservers with specific attributes.

The usage is quite similar to supervisors.

defmodule DynSupGs do
  use GenServer, restart: :permanent
  def start_link(v), do: GenServer.start_link(__MODULE__, v)
  @impl true
  def init(v), do: {:ok, v + 1}
end
child_spec = %{id: DynSupGs, start: {DynSupGs, :start_link, [1]}}
# or simply child_spec = {DynSupGs, [1]} since we used the standard "start_link"
!is_nil(Process.whereis(DynSup))
|> IO.inspect()

DynamicSupervisor.start_child(DynSup, child_spec)

# we check which children are supervised, get the pid
list = DynamicSupervisor.which_children(DynSup)

{_, pid, _, _} = List.last(list)
IO.puts("the genserver with pid #{inspect(pid)} is supervised")
Process.alive?(pid) |> Kino.inspect(label: :alive?)
GenServer.stop(pid, :shutdown) |> IO.inspect(label: :genserver_stopped)
Process.sleep(100)
Process.alive?(pid) |> Kino.inspect(label: :alive_after_exit?)
Process.sleep(3000)

# we check again which children are supervised
list = DynamicSupervisor.which_children(DynSup)
{_, pid, _, _} = List.last(list)
IO.puts("a new genserver is created with pid: #{inspect(pid)}")
Process.alive?(pid) |> Kino.inspect()
:sys.get_state(pid)

You can review the supervised processes with the supervision tree by running:

Process.whereis(MySupervisor)

Task Supervisor

Tasks can also be supervised. You declare a Task.Supervisor in your supervising tree with the child spec: {Task.Supervisor, name: MyTaskSupervisor}. You can use the unlinked with async_nolkink or linked with async version:

defmodule Email2 do
  require Logger
  @emails ["a@com", "b@com", "c@com"]

  def send_email("b@com"), do: raise("oh no!")
  def send_email(email), do: Logger.info("#{email}")

  def run do
    Enum.each(
      @emails,
      fn email ->
        Task.Supervisor.async_nolink(
          MyTaskSupervisor,
          fn -> send_email(email) end
        )
      end
    )
  end
end

Email2.run()

Genserver Registration

Every process can be given a name, aka as registration, with an atom: Process.register(pid, :name). A Genserver can be referenced by its PID or with the :name option (cf name).

The identifier passed to :name must be:

  • a unique atom such as name: __MODULE__.
  • or use the Registry mechanism: name: {:via, Registry, {MyRegistry, "name"}}

> Note: this will uniquely identify a genserver on a node. You can identify it globally with name: {:global, name}. The genserver will now be uniquely identified within distributed Erlang nodes. Solutions to distribute registries exists with Horde.

# using the PID
{:ok, pid} = GenServer.call(__MODULE__, init_args)
Genserver.call(pid, :action)

# using the atom module name __MODULE__
GenServer.start_link(__MODULE__, init_args, name: __MODULE__)
GenServer.call(__MODULE__, :action)

# using any atom
GenServer.start_link(__MODULE__, init_args, name: :an_atom)
GenServer.call(:an_atom, :action)

# via a Registry.  Make sure one has been instantiated
Registry.start_link(keys: :unique, name: MyRegistry)
via_tuple = {:via, Registry, {MyRegistry, "name"}}
GenServer.start_link(__MODULE__, init_args, name: via_tuple)
GenServer.call(via_tuple, :action)

❗ A Genserver instantiated with the :name attribute or with the :via tuple can only be instantiated once. The instantiation routine GenServer.start returns {:error, {already_started, pid} or {:ok, pid}.

pid = 
  case GenServer.start(..) do
    {:ok, pid} -> pid
    {:error, {:already_started, pid}} -> pid
  end

See the following examples.

PID as identifier

The generated PID is grabbed by pattern matching on the reponse. You pass it to the client functions GenServer.call(PID, message) and GenServer.cast(PID, message).

Each time you run the module below, a new PID is generated.

> Notice below that when the Kernel.send function is run in the context of the genserver (for example in the handle_call), we can use self() whereas when used in the context of the main thread (for example in the CallByPid.start_link), we have to use the PID to reach the handle_info handled by the Genserver.

defmodule CallByPid do
  require Logger
  use GenServer

  def start_link(state: state) do
    {:ok, pid} = GenServer.start_link(__MODULE__, state)
    # message handled by "handle_info({:msg, pid})"
    send(pid, {:msg, pid})

    # '.call' returns the 2d argument, which is the spwaned pid
    GenServer.call(pid, :ready)
  end

  # callbacks
  @impl true
  def init(state) do
    Logger.info("spawned #{inspect(self())} is instantiated with state: #{state}")
    {:ok, state}
  end

  @impl true
  def handle_call(:ready, {pid, _} = _from, state) do
    send(
      self(),
      {:msg, "process #{inspect(pid)} asks #{inspect(self())} to run :ready and returns its PID"}
    )

    {:reply, self(), state}
  end

  # message printer
  @impl true
  def handle_info({:msg, message}, state) do
    Logger.info("spawned received: #{inspect(message)} from main")
    {:noreply, state}
  end
end

require Logger
Logger.info("main process pid: " <> "#{inspect(self())}")

Kino.Process.render_seq_trace(fn ->
  # sending to main process the pid of the spawned process with state 1
  send(self(), {:spw1, CallByPid.start_link(state: "1")}) |> Kino.inspect()
  # this returns the pid of the spwaned process with state 2
  CallByPid.start_link(state: "2")
end)

We can check that our spawned process - the Genserver - is still alive and check its state directly:

receive do
  {:spw1, pid} ->
    Logger.info(inspect(pid))
    Process.alive?(pid) |> Kino.inspect(label: "spawned process #{inspect(pid)} is still alive?")
    :sys.get_state(pid)
end

The atom __MODULE__ as identifier

Instead of the pid, we can use name: __MODULE__ which is an atom. You can also use any atom.

Note that we can run start_link only once when we name the Genserver this way.

We see that the “named” Genserver” can only be instantiated once per name, thus we cannot change the initial state.

defmodule CallByAtom do
  require Logger
  use GenServer

  def start_link(state: state) do
    case GenServer.start_link(__MODULE__, state, name: __MODULE__) do
      {:ok, pid} ->
        {:ok, pid}

      {:error, {:already_started, pid}} ->
        Logger.info(
          "Genserver #{inspect(pid)} with name #{__MODULE__} and state #{state} was already started"
        )

        {:ok, pid}
    end
  end

  # the process identifier is "__MODULE__"
  def get_state, do: GenServer.call(__MODULE__, :get_state)

  def init(state) do
    Logger.info("#{inspect(self())} is instantiated with state: #{inspect(state)}")
    {:ok, state}
  end

  def handle_call(:get_state, _from, state), do: {:reply, state, state}
end

CallByAtom.start_link(state: "1")
CallByAtom.get_state() |> IO.inspect()
CallByAtom.start_link(state: "2")
CallByAtom.get_state()

The Registry

You can’t use a string to register a Genserver. However, you can cast the string into an atom. This is however bad pratice in the Elixir world as the number of available atoms is limited.

You can use the Registry mechanism instead. It is a [name -> pid] dictionnary, based on ETS, thus local to each node.

You instantiate a Registry, name it (MyRegistry here below).

Registry.start_link(keys: :unique, name: MyRegistry)

You can also isntantiate it in the supervision tree of the app:

children = [
  ...
  {Registry, keys: :unique, name: MyOTPApp.Registry},
]

Then you can identify the Genserver with any string or a number when you use the following tuple below. It associates a PID with the variable “var” and stores it in the registry MyRegistry. When a Genserver receives such a tuple with the :via, he will lookup for the PID in the given registry:

{:via, Registry, {MyRegistry, "var"}}
defmodule CallByRegistry do
  require Logger
  use GenServer

  defp via_tuple(name), do: {:via, Registry, {ARegistry, name}}

  def start_link(name: name, state: state) do
    # instantiate a Registry
    Registry.start_link(keys: :unique, name: ARegistry)

    # reference a Genserver with the registry mechanism
    GenServer.start_link(__MODULE__, state, name: via_tuple(name))

    # use the registry mechanism
    GenServer.call(via_tuple(name), :ready)
  end

  def get_state(name), do: via_tuple(name) |> :sys.get_state()

  @impl true
  def init(state) do
    Logger.info("#{inspect(self())} is instantiated with state: #{inspect(state)}")
    {:ok, state}
  end

  @impl true
  def handle_call(:ready, _from, state) do
    Logger.info("#{inspect(state)} ready")
    {:reply, :step_0, state}
  end
end

If we reevaluate several times the cell below, we see that can instanciate a Genserver with a given name only once, so the “name” and “state” is unique. If we want to instanciate another state, we create another Genserver with this state.

CallByRegistry.start_link(name: "fish_1", state: "fish_1")
CallByRegistry.get_state("fish_1") |> Kino.inspect()
CallByRegistry.start_link(name: "fish_1", state: "fish_2")
CallByRegistry.get_state("fish_1") |> Kino.inspect()
CallByRegistry.start_link(name: "fish_2", state: "fish_2")
CallByRegistry.get_state("fish_2")

The Registry modules comes with useful routines:

Registry.count(ARegistry)
CallByRegistry.get_state("fish_1")
CallByRegistry.get_state("fish_2")
Registry.lookup(ARegistry, "fish_2")
[{pid, _}] = Registry.lookup(ARegistry, "fish_2")
Registry.keys(ARegistry, pid)

Useful resources on Registries

Besides the doc, you might find interesting:

Running Async Tasks in a Genserver

You could run a Task.async |> Task.await in a genserver process. However, it is not recommended to await a long task in a genserver.

If you just want to run the task but don’t care of the result, then you can launch an async task from a callback. This task is monitored by the Genserver. Once terminated, two messages are generated with the format {:DOWN,...,:normal} and {ref, result}. You use a GenServer.handle_info to match on the response (see the example below).

Even if you may not use await, you can still get a delayed response from a Task.async when you use GenServer.reply (see the “successful’ example below).

The typical use cases can be:

  • you may run a long task in a Liveview and don’t want it to crash because of this process. You will use Task.Supervisor (see example below). To supervise tasks, you register a Task.Supervisor in the supervision tree. The failure if any is collected in the terminate callback of the Genserver.
  • you may also want the process to stop running when you change Liveview. You will use Task.Supervisor.async for this because the processes will be linked.
  • in the situation you just want to launch a task and don’t care of receiving a response, then you may use Task.Supervisor.async_nolink so the processes are not linked.

We will use this HTTP call:

defmodule AsyncTask do
  def request do
    Process.sleep(1_000)

    HTTPoison.get!("http://httpbin.org/json")
    |> Map.get(:body)
    |> Jason.decode!()
    |> get_in(["slideshow", "slides"])
  end

  def run, do: Task.async(&amp;request/0)
end

AsyncTask.run()

The genserver below calls the AsyncTask module. It will receive 2 messages: the result {ref, return from async task} and a completion message of the finished task: {:DOWN, ref, :process, pid}. The “catch all” handle_info below receives these two messages.

defmodule FirstAsyncTaskGsRunner do
  use GenServer
  require Logger

  def run(pid), do: GenServer.call(pid, :async)

  def init(_), do: {:ok, nil}

  def handle_call(:async, _, _s) do
    %{ref: ref, pid: pid} = AsyncTask.run()
    reply = "task with pid #{inspect(pid)} sent"
    {:reply, reply, ref}
  end

  # catch all
  def handle_info(msg, ref) do
    Logger.debug(inspect(msg))
    {:noreply, ref}
  end

  def terminate(reason, _s), do: Logger.info("Terminate Child with reason: #{inspect(reason)}")
end

GenServer.start(FirstAsyncTaskGsRunner, {})
|> elem(1)
|> FirstAsyncTaskGsRunner.run()

We can use Process.demonitor(ref, [:flush]) inside the handle_info callback where you received the first message {ref, message}. In case the task is successful, you will not receive the {:DOWN, ref, :process, pid, :normal} status message. If the task fails, you will receive a message in the :DOWN handler.

Successful AsyncTask and usage of GenServer.reply to return

It is recommended to run this AsyncTask as a supervised task (see the next example). Recall that you can link the task to the main process or not. If you want to stop the task when the main process stops (eg you leave a page), use Task.Supervisor.async, and Task.Supervisor.async_nolink otherwise.

Example with GenServer.reply

You can get a response back with GenServer.reply/2. In the callback handle_call(:key, from, state), you reply instead with {:noreply, state}. You save the from = {pid, [alias: ref]} in the state so you can use it later. When the (monitored) async task is finished, there is message tuple sent with {ref, result}. Inside the callback you can use the function GenServer.reply(from, result) to return a delayed message from the .call.

defmodule AsyncTaskReplier do
  use GenServer
  require Logger

  def start, do: GenServer.start(__MODULE__, nil, name: __MODULE__)
  def init(_), do: {:ok, nil}

  def nolink, do: GenServer.call(__MODULE__, :nolink)

  def handle_call(:nolink, from, _) do
    %{ref: ref} = Task.Supervisor.async_nolink(MyTaskSupervisor, &amp;AsyncTask.request/0)
    # Task.Supervisor.async_nolink(MyTaskSupervisor, fn -> raise "bad" end)
    {:noreply, {ref, from}}
  end

  def handle_info({r, response}, {ref, from}) when ref == r do
    # <-- stops the second :DOWN message if success
    Process.demonitor(ref, [:flush])
    {pid, _} = from
    GenServer.reply(from, {pid, response})
    {:noreply, response}
  end

  def handle_info({:DOWN, _ref, :process, _pid, status}, s) do
    Logger.debug("received status: #{inspect(status)}")
    {:noreply, s}
  end

  # the Genserver will call "terminate" if killed.
  def terminate(reason, _s) do
    Logger.info("Terminate Main with reason: #{inspect(reason)}")
  end
end

AsyncTaskReplier.start()
{pid, response} = AsyncTaskReplier.nolink()
Process.alive?(pid) |> IO.inspect()
response

We use this time the “normal” monitoring mechanism of async tasks (when we don’t care of the response): we listen to the 2 messages sent. Both forms, linked and unlined, are coded.

defmodule NoFailAsyncTaskRunner do
  use GenServer
  require Logger

  def start, do: GenServer.start(__MODULE__, nil, name: __MODULE__)

  def link, do: GenServer.call(__MODULE__, :async)
  def nolink, do: GenServer.call(__MODULE__, :nolink)
  def add, do: GenServer.call(__MODULE__, :one)
  def init(_), do: {:ok, nil}

  def handle_call(:one, _, s), do: {:reply, 1, s}

  def handle_call(:async, _from, _) do
    %{ref: ref, pid: pid} = Task.Supervisor.async(MyTaskSupervisor, &amp;AsyncTask.request/0)
    {:reply, pid, ref}
  end

  def handle_call(:nolink, _from, _) do
    %{ref: ref, pid: pid} = Task.Supervisor.async_nolink(MyTaskSupervisor, &amp;AsyncTask.request/0)
    {:reply, pid, ref}
  end

  def handle_info({r, response}, ref) when ref == r do
    Process.demonitor(ref, [:flush])
    Logger.debug("response: #{inspect(response)}")
    {:noreply, nil}
  end

  def handle_info({:DOWN, _ref, :process, pid, status}, _rf) do
    Logger.debug("received status: #{inspect(status)} for the task #{inspect(pid)}}")
    {:noreply, nil}
  end

  # the Genserver will call "terminate" if killed.
  def terminate(reason, _s) do
    Logger.info("Terminate Main with reason: #{inspect(reason)}")
  end
end

We run the task:

NoFailAsyncTaskRunner.start()
pid_task = NoFailAsyncTaskRunner.link()

We kill the main process. We check that the task is killed.

Process.whereis(NoFailAsyncTaskRunner) |> GenServer.stop(:kill)

IO.puts("Is task alive? :#{Process.alive?(pid)}")

We kill the main process and run the task unlinked. We check that the task is still alive as we get the response from the asyn task.

NoFailAsyncTaskRunner.start()

pid_task =
  NoFailAsyncTaskRunner.nolink()
  |> IO.inspect()

Process.whereis(NoFailAsyncTaskRunner) |> GenServer.stop(:kill)
IO.puts("Is task alive? :#{Process.alive?(pid_task)}")

Supervising an async task

Set-up another genserver below to run again the previous AsyncTask but we make it fail this time.

  • when we pass bool = true, we supervise it with Task.Supervisor.async.
  • when bool = false, we do not supervise it and run Task.async
defmodule AsyncTaskRunner do
  use GenServer
  require Logger

  def start_link() do
    GenServer.start_link(__MODULE__, {}, name: __MODULE__)
  end

  def run(bool \\ false), do: GenServer.call(__MODULE__, {:async, bool})

  def init(_), do: {:ok, {}}

  def handle_call({:async, bool}, _, _) do
    %{ref: ref} =
      case bool do
        true ->
          Task.Supervisor.async(MyTaskSupervisor, fn ->
            Process.exit(self(), :kill)
            AsyncTask.request()
          end)

        false ->
          Task.async(fn ->
            Process.exit(self(), :kill)
            AsyncTask.request()
          end)
      end

    {:reply, Process.alive?(self()), ref}
  end

  def handle_info({r, response}, ref) when ref == r do
    Logger.debug("response: #{inspect(response)}")
    {:noreply, ref}
  end

  def handle_info({:DOWN, _ref, :process, _pid, status}, s) do
    Logger.debug("received status: #{inspect(status)}")
    {:noreply, s}
  end

  # the Genserver will call "terminate" if killed.
  def terminate(reason, _s) do
    Logger.info("Terminate Child with reason: #{inspect(reason)}")
  end
end

We supervise the task runner above as a failing linked process will also kill the parent. It will run the (failing) AsyncTask:

DynamicSupervisor.start_child(DynSup, %{id: 1, start: {AsyncTaskRunner, :start_link, []}})

{_, pid1, _, _} =
  DynamicSupervisor.which_children(DynSup)
  |> List.last()

We run the linked (failing) AsyncTask is a supervised task: Task.Supervisor.async. We check that the parent process AsyncTaskRunner hasn’t been restarted (same PID).

AsyncTaskRunner.run()

# check pid
{_, pid2, _, _} =
  DynamicSupervisor.which_children(DynSup)
  |> List.last()
  |> IO.inspect(label: :after_supervised_task)

IO.puts("Same process? #{pid2 == pid2}")

The linked (failing) AsyncTask is not supervised: Task.async. We check that a new AsyncTaskRunner has been restarted by the DynamicSupervisor.

AsyncTaskRunner.run(true)

Process.sleep(1000)

{_, pid3, _, _} =
  DynamicSupervisor.which_children(DynSup)
  |> List.last()
  |> IO.inspect()

IO.puts("Same process? #{pid2 == pid3}")

Concurrency

How fast does it take to launch plenty of genservers concurrently?

Even if using a genserver is clearly a wrong use case (no need for state nor communication), let’s compute the factorial of a number with it. Recall the factorial noted $n!$ of a positive integer $n$ is the product of all the integers from 1 to $n$.

We compute this product in three ways:

  • with a single process using Enum.reduce since $n! = n \cdot (n-1)$.
  • lazily and concurrently by streaming slices of the list [1,2,..,n] into chunks of say 10 elements, and spawn (with GenServer.start) a genserver of each chunk. Each genserver is responsbile to calculate the subproduct of the integers in the chunk he receives. We finally run an Enum.reduce on this list of subproducts. This method will lazily run concurrently many genservers.
  • running the same “stream chunks” process as above, lazily and concurrently, but without genservers.
defmodule FactorialGs do
  @moduledoc """
  Genserver to compute the product of all integers in the received chunk
  """
  def start_link(chunk),
    do: GenServer.start(__MODULE__, chunk) |> elem(1) |> GenServer.call(:factorial)

  def init(chunk), do: {:ok, chunk}

  def handle_call(:factorial, _, s), do: {:reply, Eval.reduce(s), s}
end

defmodule Eval do
  @moduledoc """
  Expose three functions, `worker/1` and `stream/1` and `reduce/1`. 
  The first spawns genservers, the second is a pur stream implementation, 
  and the last is a single reduction process.
  """
  @n 10

  @doc """
  Lazy streaming genservers without recursion
    iex> Eval.worker(4)
    24
  """
  def worker(n) do
    1..n
    |> Stream.chunk_every(@n)
    |> Stream.map(&amp;FactorialGs.start_link(&amp;1))
    |> Enum.to_list()
    |> Enum.reduce(1, &amp;(&amp;1 * &amp;2))
  end

  @doc """
  Lazy stream recursion without genserver
    iex> Eval.stream(4)
    24
  """
  def stream(n), do: Enum.to_list(1..n) |> rec_stream()

  def rec_stream(list) when is_list(list) and length(list) > @n do
    list
    |> Stream.chunk_every(@n)
    |> Stream.map(&amp;Eval.reduce/1)
    |> Enum.to_list()
    |> rec_stream()
  end

  def rec_stream(list) when is_list(list), do: Enum.reduce(list, 1, &amp;(&amp;1 * &amp;2))

  @doc """
  Simple single process reduction
    iex> Eval.reduce(1..4)
    24
  """
  def reduce(list), do: Enum.reduce(list, 1, &amp;(&amp;1 * &amp;2))
end

Eval.worker(3)

When we measure the time (in ms) to compute the factorial of a big number, say 10_000, we see that the concurrent process spawning genservers is faster than a single process, even with the overhead of using genservers. The fastest is the lazy concurrent without the overhead of using genservers. We see that the BEAM is able to spawn around 1000 genservers per ms.

n = 10_000

%{
  reduce: :timer.tc(fn -> Eval.reduce(1..n) end) |> elem(0) |> div(1000),
  genserver: :timer.tc(fn -> Eval.worker(n) end) |> elem(0) |> div(1000),
  stream: :timer.tc(fn -> Eval.stream(n) end) |> elem(0) |> div(1000)
}