Elixir notes on processes: Task, Genserver, supervisor
Mix.install([
{:kino, "~> 0.8.0"},
{:httpoison, "~> 2.0"},
{:jason, "~> 1.4"}
])
Supervisor.start_link(
[
{DynamicSupervisor, name: DynSup, strategy: :one_for_one},
{Registry, keys: :unique, name: MyRegistry},
{Task.Supervisor, name: MyTaskSupervisor}
],
strategy: :one_for_one,
name: MySupervisor
)
Introduction
Elixir/Erlang has two main modules, Task and Genservers, to run code concurrently. When you need to leverage concurency and run code in a short-lived stateless single function without interaction, you use the Task
module. When you need a long-running stateful process where you need control and interactivity, you run the code with a GenServer
.
Task
Suppose you have to send millions of emails. It is a short-lived process which exits as soon as completed. You can run this code with a Task
to leverage concurrency and use Task.async.
We spawn 2 processes:
defmodule Email1 do
require Logger
@emails ["a@com", "b@com"]
def send_email(email) do
Process.sleep(1_000)
Logger.info("#{email} received")
end
def run do
Enum.each(@emails, fn email -> Task.async(fn -> send_email(email) end) end)
end
end
Time.utc_now() |> IO.inspect(label: :before)
Email1.run()
Time.utc_now() |> IO.inspect(label: :after)
> Note that in case of failure, the error will bubble up (see “Supervisor” further down).
Get the response
We can use Enum.each
if we just want to run the tasks. It returns :ok
. If we want to the response of the task, we use Enum.map
instead because this returns a list of corresponding Task structs. This struct is used by the Task.await or Task.yield functions to get the result.
An “awaited” task returns the result or error whilst a “yielded” task returns {:ok, result}
or {:error, reason}
. Tasks are supposed to run within a certain amount of time. The parameter key is :timeout
, defaults to 5000.
t_await = Task.async(fn -> 1 end)
t_yield = Task.async(fn -> 2 end)
%{task: t_await, _await: Task.await(t_await), _yield: Task.yield(t_yield)}
We set the optional “timeout” value in ms in the Task.yield function. When the task isn’t completed within this period, it returns nil
.
Task.async(fn -> Process.sleep(1000) end)
|> Task.yield(500)
We can run repeatedly yield
whilst not await
.
defmodule Continue do
require Logger
def _yield(task, timeout: time) do
case Task.yield(task, time) do
nil ->
Logger.info("called again")
_yield(task, timeout: time)
result ->
result
end
end
end
Task.async(fn -> Process.sleep(3000) end)
|> Continue._yield(timeout: 500)
Async_stream
Instead of doing Enum.map |> Task.async
, we can do in one go with Task.async_stream
. It is designed to create task processes from a list of items. The key point is we can set a limit on the number of processes running at the same time with :max_concurrency
. This provides back-pressure.
The function Task.async_stream
returns a stream, a list of functions ready to be called. We use Stream.run/1
to run it when you don’t care about the result. It returns :ok
. If we want a result, we use Enum.to_list/1
.
chars = Enum.map(?a..?l, fn x -> <> <> "@com" end)
The function will run 4 concurrent stream so the 12 emails will be processed in 3 runs.
defmodule MyTasks do
require Logger
@emails chars
def send_email(email) do
Process.sleep(500)
Logger.info("#{inspect(self())} processed #{email}")
end
def back_pressured do
Task.Supervisor.async_stream_nolink(
MyTaskSupervisor,
@emails,
&send_email(&1),
max_concurrency: 4,
ordered: false
)
end
end
Time.utc_now() |> IO.inspect(label: :before)
MyTasks.back_pressured() |> Stream.run()
Time.utc_now() |> IO.inspect(label: :after)
# |> Enum.to_list()
Usefull parameters are:
-
on_timeout: :kill_task
-
ordered: false
The function System.schedulers_online()
returns the number of available cores that the function would use by default in the max_concurrency
parameter.
Example of task: cron job
We want to mimic a cron job with a Task. The task runs another task and is turned into a long-running process by calling itself when the main process receives a delayed message.
We used Process.send_after
for not to use sleep. You need to build a permanent message receiver. Another implementation is done further down with a genserver, who supports messages handling among other benefits.
defmodule Period do
use Task
def start(args) do
args = Keyword.merge(args, parent: self())
Task.start(__MODULE__, :do_run, [args])
end
def do_run([every: time, run: f, parent: parent] = args) do
Task.async(fn -> apply(f, []) end)
# send to the main process
Process.send_after(parent, {:loop, args}, time)
end
end
defmodule Clean do
require Logger
def up, do: Logger.info("cleaned!")
end
# continuous listener on the main process
defmodule Listen do
def loop do
receive do
{:loop, args} ->
# on message, run the task
Period.do_run(args)
loop()
after
10_000 -> :timeout
end
end
end
##### The final API:
pid =
spawn(fn ->
Period.start(
every: 3_000,
run: fn -> Clean.up() end
)
Listen.loop()
end)
Process.sleep(10_000)
Process.exit(pid, :shutdown)
> It is preferable to use the supervised form Task.Superivsor.async_nolink. See further down.
GenServer
A Genserver
is a (spawned) single threaded process. It is a mechanism to run a long-lived stateful process that can communicate with other processes. It works with client-side functions and server-side callbacks and message passing. Messages are processed sequentially, “one at a time”.
The client functions are accessible from everywhere. The callbacks are functions that you can’t call; they exist only in the module that implements the genserver behaviour; you have to implement them within the genserver module.
Reading about using genservers: spawn or not
> A process in Erlang/Elixir
is a memory isolated and single threaded code runner; it has a mailbox and an address and communicates with asynchronous message passing.
The main client/server built-in functions in a genserver are: cf Genserver cheat sheet.
cilent / callback | what is does | replies? | what it returns |
---|---|---|---|
GenServer.start(args) $\to$ init(init_state) |
spawns a genserver and inits the state | yes |
{:ok, pid} or {:stop, reason} |
GenServer.call(to,match) $\to$ handle_call(pid, from, match) |
internal sync to perform a task and replies response |
yes |
{:reply, response, state} |
GenServer.cast(to, match) $\to$ handle_cast(pid, match) |
internal async to perform a task without a response | no |
{:noreply, state} |
GenServer.call $\to$ GenServer.reply |
async delayed response | yes |
:ok |
send(pid, msg) $\to$ handle_info(msg, state) |
handle an async message from any process without a response | no |
{:noreply, state} |
You can use client functions in the callbacks, except:
> ❗ it is not recommended to await
a long-running task inside a genserver. Since suchs tasks are monitored by the genserver and return 2 messages, these message should be captured in a handle_info
. Uou can also return a delayed response with GenServer.reply
from a GenServer.call
(see the example in “Running Async Tasks”)
> ❗ You cannot use a GenServer.call
within a handle_cast
or and handle_info
callback because none of them returns whilst call
awaits for a response to run. Use instead send
and handle_info
.
Get info on a process
The module Process offers routines such as Process.alive?(pid)
or Process.info(pid)
that gives information: on the linked process |> Keyword.get(:links)
, on the message queue |> Keyworkd.get(:message_queue_len)
…
The module sys offers primitives for debugging :sys.trace(pid, true)
, to retrieve the state :sys.get_state(pid)
.
Note on the __MODULE__
placeholder
The variable __MODULE__
is a placeholder. It is a compilation environment macro which is the current module name as an atom. It is convenient to use this variable instead of writting the literal module name.
defmodule Test do
IO.puts("This module is named #{__MODULE__} and is an atom: #{is_atom(__MODULE__)}")
end
is_atom(Test)
How to start a Genserver
A genserver is declared with the behaviour use GenServer
and is started with (a function that implements a) GenServer.start_link
or start
to which you pass the tuple {MODULE_NAME, state, options}
. This function returns {:ok, pid}
or {:error, {:already_started, pid}}
. This in turn automatically invokes the callback init/1
to instanciate the state. We are required to define the init
callback. You add @impl true
to warn the compiler you are defining a callback.
An example of a minimal (and useless) genserver - where we put $1$ in its state - would be:
defmodule MinimalGS do
use GenServer
@impl true
def init(1), do: {:ok, 1}
end
# instantiate
{:ok, pid} = GenServer.start(MinimalGS, 1)
We can check its state:
:sys.get_state(pid)
You elaborate custom client functions to use callbacks. It relies on pattern matching between the client function and the server callback. In the genserver module below, we have implemented only the callbacks:
defmodule FirstGs do
use GenServer
@impl true
def init(init_args), do: {:ok, init_args}
@impl true
def handle_call(:double, _from, state), do: {:reply, state * 2, state}
def handle_call({:triple, n}, _from, state), do: {:reply, n * 3, state}
end
You can instantiate genservers “on-the-fly”, identified by their PID:
for i <- 1..3 do
{:ok, pid} = GenServer.start(FirstGs, i)
%{
pid: pid,
double: GenServer.call(pid, :double),
triple: GenServer.call(pid, {:triple, i}),
state: :sys.get_state(pid)
}
end
How to stop a genserver
This can be done from a client function by invoking a GenServer.stop(pid, :code)
order or equivalently letting a callback return {:stop, reason, state}
from a callback. The optional callback terminate
is triggered and can be used to handle cleanups before termination.
Other reason than :normal
or :shutdown
is considered as a crash by OTP. You may set Process.flag(:trap_exit, true)
in the init
callback to capture the :EXIT
and let terminate
run (eg save state into a database before exiting).
You can also raise "message"
GenServer.stop(pid, :code)
#or return a `:stop` based tuple in a callback
{:stop, :reason, state}
# or raise in th callback
raise "Stop me"
# or `Process.exit` in a callback
Process.exit(self(), :reason)
More indeep material:
- this blog about Stop a genserver
-
you have a nice
Livebook
about this:
if Process.alive?(pid) do
:sys.get_state(pid) |> IO.inspect(label: " the state of the genserver LongInitGs is: ")
GenServer.stop(pid)
Process.alive?(pid)
end
Example of genserver usage
Long running code is discouraged in the init
callback. You may want use the :continue
instruction and implement the handle_continue
callback to guaranty that this operation is finished before the Genserver handles another message/task. This is a blocking operation and avoids potential race conditions.
The following usage of a genserver is an illustration; it is not appropriate as the simple Task used above is enough as you don’t need to interact with this task. We implement again a genserver to run a cron job. It uses handle_continue
. Once launched, it will run a reccurent job forever. We then kill the process shortly after.
defmodule Periodic do
use GenServer
require Logger
@start Time.utc_now()
defp time, do: Time.diff(Time.utc_now(), @start, :millisecond)
def start_link(opts), do: GenServer.start(__MODULE__, opts)
def init(args) do
Logger.info("#{inspect(time())}ms: First Init")
{:ok, args, {:continue, :run}}
end
def handle_continue(:run, [every: time, run: _f] = state) do
Logger.info("#{inspect(time())}ms: Continue")
Process.send_after(self(), :repeat, time)
{:noreply, state}
end
def handle_info(:repeat, [every: time, run: myfun] = state) do
Logger.info("#{inspect(time())}ms: repeat")
Task.Supervisor.async_nolink(MyTaskSupervisor, fn -> apply(myfun, []) end)
Process.send_after(self(), :repeat, time)
{:noreply, state}
end
def handle_info({_, msg}, state) do
Logger.info("message from task: #{msg}")
{:noreply, state}
end
def handle_info(_msg, state) do
# IO.inspect(msg)
{:noreply, state}
end
end
defmodule Send do
def cleaner do
Process.sleep(500)
:cleaned!
end
end
{:ok, pid} =
Periodic.start_link(
every: 3_000,
run: fn -> Send.cleaner() end
)
Process.sleep(11_000)
GenServer.stop(pid, :shutdown)
Linking Genservers
You can link processes with Process.link/1
. This has to be used within a process: the parent is implicit and the child’s PID is given as an argument. This is what GenServer.start_link
does. It starts a bi-directional link with the caller. If you instantiate with GenServer.start
, no link is created.
> ❗ If a process is started with start_link
, then this process will be linked to the current process, the Livebook
. This means the livebook fails when the genserver fails (ie with other reason than :normal
or :shutdown
). One can use Process.flag(:trap_exit, :true)
to capture the message {:EXIT, from, reason}
. In Elixir, the Supervisor
behaviour is meant to manage these links and restart policies.
Example
Let’s give an example with two linked genservers. The genserver Parent
is instanciated with GenServer.start
and it instantiates another genserver, called Child
with start_link
. When we kill one or another, both are killed.
defmodule Child do
use GenServer
# -----
@impl true
def init(_), do: {:ok, {}}
@impl true
def terminate(reason, _), do: IO.inspect(reason, label: :terminate)
end
defmodule Parent do
use GenServer
def link(pid, module), do: GenServer.call(pid, {:link, module})
# -------
@impl true
def init(_), do: {:ok, {}}
@impl true
def handle_call({:link, module}, _, _), do: {:reply, GenServer.start_link(module, :ok), {}}
@impl true
def terminate(reason, _), do: IO.inspect(reason, label: :terminate)
end
The routine Process.info
returns a keyword list, in particular the the linked processes. The routine Process.alive?
returns a boolean.
We link Parent
and child
:
# instantiate a "Parent" genserver
{:ok, pid_parent} = GenServer.start(Parent, :ok) |> IO.inspect(label: :parent)
# instantiate a linked genserver "Child"#
{:ok, pid_child} = Parent.link(pid_parent, Child) |> IO.inspect(label: :child)
{Process.alive?(pid_parent), Process.alive?(pid_child)}
|> IO.inspect(label: :start)
Process.info(pid_parent) |> Keyword.get(:links) |> IO.inspect(label: :check_links_of_parent)
Process.info(pid_child) |> Keyword.get(:links) |> IO.inspect(label: :check_links_of_child)
# check 1
{Process.alive?(pid_parent), Process.alive?(pid_child)}
|> IO.inspect(label: :start)
We stop the parent: both are stopped.
GenServer.stop(pid_parent, :shutdown)
# check 1
{Process.alive?(pid_parent), Process.alive?(pid_child)}
We restart and stop the child: both are stopped.
{:ok, pid_parent} = GenServer.start(Parent, :ok)
{:ok, pid_child} = Parent.link(pid_parent, Child)
{Process.alive?(pid_parent), Process.alive?(pid_child)}
|> IO.inspect(label: :start)
# stop Child
GenServer.stop(pid_child, :shutdown)
# check 2
{Process.alive?(pid_parent), Process.alive?(pid_child)}
> ❗ If we instantiate Parent
with GenServer.start_link
, this will link Parent
with the caller which is the main process, this Livebook
. If we stop Parent
, this will stop the Livebook
, which we don’t want.
Restart strategies of processes ( Genserver, Task)
When you instantiate a genserver, you set the restart strategy by declaring it in: use GenServer, restart: :strategy
where:
-
restart: :temporary
: the process will not be restarted even if it crashes, -
restart: :transient
: when you want the process to be restarted for non-successful exits, -
restart: :permanent
: always restarted. This is the default behaviour
Supervisor
A supervisor is a “state machine”: you declare with functions you want to maintain running. This relies on linking processes together.
Test with a Genserver
We start and supervise a genserver with Supervisor.start_link
and pass a list of child_spec
with a strategy. It is a good idea to use the convention to name MyModule.start_link
the client function that invokes GenServer.start_link
because of the defaults. ❗ Note that you have to use GenServer.start_link
to link the supervisor and the module.
> ❗❗ The supervisor expects a return in the form {:ok, pid}
from the function that starts a process. This means function can’t contain any other instruction else than GenServer.start_link
.
Let’s supervise this minimal genserver:
defmodule SupMinimalGs do
use GenServer
# <- returns {:ok,pid} for the supervisor
def start_link, do: GenServer.start_link(__MODULE__, nil)
@impl true
def init(_), do: {:ok, nil}
end
The supervisor expects child_specs
. It can be the module name or a map. If the supervised module is a genserver with no state, then we have a shortcut: the genserver name. Here we use show case an explicit child_spec
map.
child_spec = %{id: SupMinimalGs, start: {SupMinimalGs, :start_link, []}}
Supervisor.start_link([child_spec], strategy: :one_for_one, name: TestSupervisor)
# or
Supervisor.start_link([SupMinimalGs], strategy: :one_for_one, name: TestSupervisor)
In action, we instanciate a supervised module, check it is alive and kill it.
child_spec = %{id: SupMinimalGs, start: {SupMinimalGs, :start_link, []}}
Supervisor.start_link([child_spec], strategy: :one_for_one, name: TestSupervisor)
# Supervisor.start_link([SupMinimalGs], strategy: :one_for_one, name: TestSupervisor)
[{SupMinimalGs, pid, _, _}] =
Supervisor.which_children(TestSupervisor)
|> Kino.inspect()
IO.puts("the genserver with pid #{inspect(pid)} is supervised")
Process.alive?(pid) |> Kino.inspect(label: :alive?)
GenServer.stop(pid, :shutdown) |> IO.inspect(label: :genserver_stopped)
Process.sleep(100)
Process.alive?(pid) |> Kino.inspect(label: :alive_after_exit?)
Then we observe that the supervisor created a new one.
Process.sleep(1000)
# we check agai which children are supervised
[{SupMinimalGs, pid, _, _}] =
Supervisor.which_children(TestSupervisor)
|> Kino.inspect()
IO.puts("a new genserver is created with pid: #{inspect(pid)}")
Process.alive?(pid) |> Kino.inspect(label: :recreated_and_alive?)
Save the state
You notice that the state of the module is reset to the initial values. One way is to save the state in a database (ETS
for example) in the terminate
callback (for “normal” shutdowns or when :EXIT
is trapped), and read from the database the the init
callback.
Supervision tree
You normally start it within the module with the behaviour use Application
with Supervisor.start_link
. You can also define a module with the behaviour use Supervisor
.
Strategies of Supervisors
-
strategy: :one_for_all
. If a child process terminates, all other child processes are terminated and then all child processes (including the terminated one) are restarted. -
strategy :one_for_one
. Only restart the failed child process -
strategy :rest_for_one
. Restart the failed process and any process started after it.
Look at the supervision tree: we use a Dynamical Supervisor (see below), a Registry (see later) and a Task.Supervisor later so we start and supervise them here.
Process.whereis(MySupervisor)
The Supervisor
module comes with some useful routines:
pid = Process.whereis(MySupervisor)
Supervisor.count_children(pid)
Supervisor.which_children(MySupervisor)
Dynamic Supervisor
A dynamical supervisor is design to supervise “on-the-fly” singleton modules, such as genservers with specific attributes.
The usage is quite similar to supervisors.
defmodule DynSupGs do
use GenServer, restart: :permanent
def start_link(v), do: GenServer.start_link(__MODULE__, v)
@impl true
def init(v), do: {:ok, v + 1}
end
child_spec = %{id: DynSupGs, start: {DynSupGs, :start_link, [1]}}
# or simply child_spec = {DynSupGs, [1]} since we used the standard "start_link"
!is_nil(Process.whereis(DynSup))
|> IO.inspect()
DynamicSupervisor.start_child(DynSup, child_spec)
# we check which children are supervised, get the pid
list = DynamicSupervisor.which_children(DynSup)
{_, pid, _, _} = List.last(list)
IO.puts("the genserver with pid #{inspect(pid)} is supervised")
Process.alive?(pid) |> Kino.inspect(label: :alive?)
GenServer.stop(pid, :shutdown) |> IO.inspect(label: :genserver_stopped)
Process.sleep(100)
Process.alive?(pid) |> Kino.inspect(label: :alive_after_exit?)
Process.sleep(3000)
# we check again which children are supervised
list = DynamicSupervisor.which_children(DynSup)
{_, pid, _, _} = List.last(list)
IO.puts("a new genserver is created with pid: #{inspect(pid)}")
Process.alive?(pid) |> Kino.inspect()
:sys.get_state(pid)
You can review the supervised processes with the supervision tree by running:
Process.whereis(MySupervisor)
Task Supervisor
Tasks can also be supervised. You declare a Task.Supervisor in your supervising tree with the child spec: {Task.Supervisor, name: MyTaskSupervisor}
. You can use the unlinked with async_nolkink or linked with async version:
defmodule Email2 do
require Logger
@emails ["a@com", "b@com", "c@com"]
def send_email("b@com"), do: raise("oh no!")
def send_email(email), do: Logger.info("#{email}")
def run do
Enum.each(
@emails,
fn email ->
Task.Supervisor.async_nolink(
MyTaskSupervisor,
fn -> send_email(email) end
)
end
)
end
end
Email2.run()
Genserver Registration
Every process can be given a name, aka as registration, with an atom: Process.register(pid, :name)
. A Genserver can be referenced by its PID or with the :name
option (cf name).
The identifier passed to :name
must be:
-
a unique atom such as
name: __MODULE__
. -
or use the
Registry
mechanism:name: {:via, Registry, {MyRegistry, "name"}}
> Note: this will uniquely identify a genserver on a node. You can identify it globally with name: {:global, name}
. The genserver will now be uniquely identified within distributed Erlang nodes. Solutions to distribute registries exists with Horde.
# using the PID
{:ok, pid} = GenServer.call(__MODULE__, init_args)
Genserver.call(pid, :action)
# using the atom module name __MODULE__
GenServer.start_link(__MODULE__, init_args, name: __MODULE__)
GenServer.call(__MODULE__, :action)
# using any atom
GenServer.start_link(__MODULE__, init_args, name: :an_atom)
GenServer.call(:an_atom, :action)
# via a Registry. Make sure one has been instantiated
Registry.start_link(keys: :unique, name: MyRegistry)
via_tuple = {:via, Registry, {MyRegistry, "name"}}
GenServer.start_link(__MODULE__, init_args, name: via_tuple)
GenServer.call(via_tuple, :action)
❗ A Genserver instantiated with the :name
attribute or with the :via
tuple can only be instantiated once. The instantiation routine GenServer.start
returns {:error, {already_started, pid}
or {:ok, pid}
.
pid =
case GenServer.start(..) do
{:ok, pid} -> pid
{:error, {:already_started, pid}} -> pid
end
See the following examples.
PID as identifier
The generated PID is grabbed by pattern matching on the reponse. You pass it to the client functions GenServer.call(PID, message)
and GenServer.cast(PID, message)
.
Each time you run the module below, a new PID is generated.
> Notice below that when the Kernel.send
function is run in the context of the genserver (for example in the handle_call
), we can use self()
whereas when used in the context of the main thread (for example in the CallByPid.start_link
), we have to use the PID to reach the handle_info
handled by the Genserver.
defmodule CallByPid do
require Logger
use GenServer
def start_link(state: state) do
{:ok, pid} = GenServer.start_link(__MODULE__, state)
# message handled by "handle_info({:msg, pid})"
send(pid, {:msg, pid})
# '.call' returns the 2d argument, which is the spwaned pid
GenServer.call(pid, :ready)
end
# callbacks
@impl true
def init(state) do
Logger.info("spawned #{inspect(self())} is instantiated with state: #{state}")
{:ok, state}
end
@impl true
def handle_call(:ready, {pid, _} = _from, state) do
send(
self(),
{:msg, "process #{inspect(pid)} asks #{inspect(self())} to run :ready and returns its PID"}
)
{:reply, self(), state}
end
# message printer
@impl true
def handle_info({:msg, message}, state) do
Logger.info("spawned received: #{inspect(message)} from main")
{:noreply, state}
end
end
require Logger
Logger.info("main process pid: " <> "#{inspect(self())}")
Kino.Process.render_seq_trace(fn ->
# sending to main process the pid of the spawned process with state 1
send(self(), {:spw1, CallByPid.start_link(state: "1")}) |> Kino.inspect()
# this returns the pid of the spwaned process with state 2
CallByPid.start_link(state: "2")
end)
We can check that our spawned process - the Genserver - is still alive and check its state directly:
receive do
{:spw1, pid} ->
Logger.info(inspect(pid))
Process.alive?(pid) |> Kino.inspect(label: "spawned process #{inspect(pid)} is still alive?")
:sys.get_state(pid)
end
The atom __MODULE__
as identifier
Instead of the pid
, we can use name: __MODULE__
which is an atom. You can also use any atom.
Note that we can run start_link
only once when we name the Genserver this way.
We see that the “named” Genserver” can only be instantiated once per name, thus we cannot change the initial state.
defmodule CallByAtom do
require Logger
use GenServer
def start_link(state: state) do
case GenServer.start_link(__MODULE__, state, name: __MODULE__) do
{:ok, pid} ->
{:ok, pid}
{:error, {:already_started, pid}} ->
Logger.info(
"Genserver #{inspect(pid)} with name #{__MODULE__} and state #{state} was already started"
)
{:ok, pid}
end
end
# the process identifier is "__MODULE__"
def get_state, do: GenServer.call(__MODULE__, :get_state)
def init(state) do
Logger.info("#{inspect(self())} is instantiated with state: #{inspect(state)}")
{:ok, state}
end
def handle_call(:get_state, _from, state), do: {:reply, state, state}
end
CallByAtom.start_link(state: "1")
CallByAtom.get_state() |> IO.inspect()
CallByAtom.start_link(state: "2")
CallByAtom.get_state()
The Registry
You can’t use a string to register a Genserver. However, you can cast the string into an atom. This is however bad pratice in the Elixir world as the number of available atoms is limited.
You can use the Registry mechanism instead. It is a [name -> pid] dictionnary, based on ETS, thus local to each node
.
You instantiate a Registry, name it (MyRegistry
here below).
Registry.start_link(keys: :unique, name: MyRegistry)
You can also isntantiate it in the supervision tree of the app:
children = [
...
{Registry, keys: :unique, name: MyOTPApp.Registry},
]
Then you can identify the Genserver with any string or a number when you use the following tuple below. It associates a PID with the variable “var” and stores it in the registry MyRegistry. When a Genserver receives such a tuple with the :via, he will lookup for the PID in the given registry:
{:via, Registry, {MyRegistry, "var"}}
defmodule CallByRegistry do
require Logger
use GenServer
defp via_tuple(name), do: {:via, Registry, {ARegistry, name}}
def start_link(name: name, state: state) do
# instantiate a Registry
Registry.start_link(keys: :unique, name: ARegistry)
# reference a Genserver with the registry mechanism
GenServer.start_link(__MODULE__, state, name: via_tuple(name))
# use the registry mechanism
GenServer.call(via_tuple(name), :ready)
end
def get_state(name), do: via_tuple(name) |> :sys.get_state()
@impl true
def init(state) do
Logger.info("#{inspect(self())} is instantiated with state: #{inspect(state)}")
{:ok, state}
end
@impl true
def handle_call(:ready, _from, state) do
Logger.info("#{inspect(state)} ready")
{:reply, :step_0, state}
end
end
If we reevaluate several times the cell below, we see that can instanciate a Genserver with a given name only once, so the “name” and “state” is unique. If we want to instanciate another state, we create another Genserver with this state.
CallByRegistry.start_link(name: "fish_1", state: "fish_1")
CallByRegistry.get_state("fish_1") |> Kino.inspect()
CallByRegistry.start_link(name: "fish_1", state: "fish_2")
CallByRegistry.get_state("fish_1") |> Kino.inspect()
CallByRegistry.start_link(name: "fish_2", state: "fish_2")
CallByRegistry.get_state("fish_2")
The Registry modules comes with useful routines:
Registry.count(ARegistry)
CallByRegistry.get_state("fish_1")
CallByRegistry.get_state("fish_2")
Registry.lookup(ARegistry, "fish_2")
[{pid, _}] = Registry.lookup(ARegistry, "fish_2")
Registry.keys(ARegistry, pid)
Useful resources on Registries
Besides the doc, you might find interesting:
Running Async Tasks in a Genserver
You could run a Task.async |> Task.await
in a genserver process. However, it is not recommended to await a long task in a genserver.
If you just want to run the task but don’t care of the result, then you can launch an async
task from a callback. This task is monitored by the Genserver. Once terminated, two messages are generated with the format {:DOWN,...,:normal}
and {ref, result}
. You use a GenServer.handle_info
to match on the response (see the example below).
Even if you may not use await
, you can still get a delayed response from a Task.async
when you use GenServer.reply
(see the “successful’ example below).
The typical use cases can be:
-
you may run a long task in a Liveview and don’t want it to crash because of this process. You will use
Task.Supervisor
(see example below). To supervise tasks, you register aTask.Supervisor
in the supervision tree. The failure if any is collected in theterminate
callback of the Genserver. -
you may also want the process to stop running when you change Liveview. You will use
Task.Supervisor.async
for this because the processes will be linked. -
in the situation you just want to launch a task and don’t care of receiving a response, then you may use
Task.Supervisor.async_nolink
so the processes are not linked.
We will use this HTTP call:
defmodule AsyncTask do
def request do
Process.sleep(1_000)
HTTPoison.get!("http://httpbin.org/json")
|> Map.get(:body)
|> Jason.decode!()
|> get_in(["slideshow", "slides"])
end
def run, do: Task.async(&request/0)
end
AsyncTask.run()
The genserver below calls the AsyncTask
module. It will receive 2 messages: the result {ref, return from async task}
and a completion message of the finished task: {:DOWN, ref, :process, pid}
. The “catch all” handle_info
below receives these two messages.
defmodule FirstAsyncTaskGsRunner do
use GenServer
require Logger
def run(pid), do: GenServer.call(pid, :async)
def init(_), do: {:ok, nil}
def handle_call(:async, _, _s) do
%{ref: ref, pid: pid} = AsyncTask.run()
reply = "task with pid #{inspect(pid)} sent"
{:reply, reply, ref}
end
# catch all
def handle_info(msg, ref) do
Logger.debug(inspect(msg))
{:noreply, ref}
end
def terminate(reason, _s), do: Logger.info("Terminate Child with reason: #{inspect(reason)}")
end
GenServer.start(FirstAsyncTaskGsRunner, {})
|> elem(1)
|> FirstAsyncTaskGsRunner.run()
We can use Process.demonitor(ref, [:flush])
inside the handle_info
callback where you received the first message {ref, message}
. In case the task is successful, you will not receive the {:DOWN, ref, :process, pid, :normal}
status message. If the task fails, you will receive a message in the :DOWN
handler.
Successful AsyncTask
and usage of GenServer.reply
to return
It is recommended to run this AsyncTask
as a supervised task (see the next example). Recall that you can link the task to the main process or not. If you want to stop the task when the main process stops (eg you leave a page), use Task.Supervisor.async
, and Task.Supervisor.async_nolink
otherwise.
Example with GenServer.reply
You can get a response back with GenServer.reply/2. In the callback handle_call(:key, from, state)
, you reply instead with {:noreply, state}
. You save the from = {pid, [alias: ref]}
in the state so you can use it later. When the (monitored) async task is finished, there is message tuple sent with {ref, result}
. Inside the callback you can use the function GenServer.reply(from, result)
to return a delayed message from the .call
.
defmodule AsyncTaskReplier do
use GenServer
require Logger
def start, do: GenServer.start(__MODULE__, nil, name: __MODULE__)
def init(_), do: {:ok, nil}
def nolink, do: GenServer.call(__MODULE__, :nolink)
def handle_call(:nolink, from, _) do
%{ref: ref} = Task.Supervisor.async_nolink(MyTaskSupervisor, &AsyncTask.request/0)
# Task.Supervisor.async_nolink(MyTaskSupervisor, fn -> raise "bad" end)
{:noreply, {ref, from}}
end
def handle_info({r, response}, {ref, from}) when ref == r do
# <-- stops the second :DOWN message if success
Process.demonitor(ref, [:flush])
{pid, _} = from
GenServer.reply(from, {pid, response})
{:noreply, response}
end
def handle_info({:DOWN, _ref, :process, _pid, status}, s) do
Logger.debug("received status: #{inspect(status)}")
{:noreply, s}
end
# the Genserver will call "terminate" if killed.
def terminate(reason, _s) do
Logger.info("Terminate Main with reason: #{inspect(reason)}")
end
end
AsyncTaskReplier.start()
{pid, response} = AsyncTaskReplier.nolink()
Process.alive?(pid) |> IO.inspect()
response
We use this time the “normal” monitoring mechanism of async tasks (when we don’t care of the response): we listen to the 2 messages sent. Both forms, linked and unlined, are coded.
defmodule NoFailAsyncTaskRunner do
use GenServer
require Logger
def start, do: GenServer.start(__MODULE__, nil, name: __MODULE__)
def link, do: GenServer.call(__MODULE__, :async)
def nolink, do: GenServer.call(__MODULE__, :nolink)
def add, do: GenServer.call(__MODULE__, :one)
def init(_), do: {:ok, nil}
def handle_call(:one, _, s), do: {:reply, 1, s}
def handle_call(:async, _from, _) do
%{ref: ref, pid: pid} = Task.Supervisor.async(MyTaskSupervisor, &AsyncTask.request/0)
{:reply, pid, ref}
end
def handle_call(:nolink, _from, _) do
%{ref: ref, pid: pid} = Task.Supervisor.async_nolink(MyTaskSupervisor, &AsyncTask.request/0)
{:reply, pid, ref}
end
def handle_info({r, response}, ref) when ref == r do
Process.demonitor(ref, [:flush])
Logger.debug("response: #{inspect(response)}")
{:noreply, nil}
end
def handle_info({:DOWN, _ref, :process, pid, status}, _rf) do
Logger.debug("received status: #{inspect(status)} for the task #{inspect(pid)}}")
{:noreply, nil}
end
# the Genserver will call "terminate" if killed.
def terminate(reason, _s) do
Logger.info("Terminate Main with reason: #{inspect(reason)}")
end
end
We run the task:
NoFailAsyncTaskRunner.start()
pid_task = NoFailAsyncTaskRunner.link()
We kill the main process. We check that the task is killed.
Process.whereis(NoFailAsyncTaskRunner) |> GenServer.stop(:kill)
IO.puts("Is task alive? :#{Process.alive?(pid)}")
We kill the main process and run the task unlinked. We check that the task is still alive as we get the response from the asyn task.
NoFailAsyncTaskRunner.start()
pid_task =
NoFailAsyncTaskRunner.nolink()
|> IO.inspect()
Process.whereis(NoFailAsyncTaskRunner) |> GenServer.stop(:kill)
IO.puts("Is task alive? :#{Process.alive?(pid_task)}")
Supervising an async task
Set-up another genserver below to run again the previous AsyncTask
but we make it fail this time.
-
when we pass
bool = true
, we supervise it withTask.Supervisor.async
. -
when
bool = false
, we do not supervise it and runTask.async
defmodule AsyncTaskRunner do
use GenServer
require Logger
def start_link() do
GenServer.start_link(__MODULE__, {}, name: __MODULE__)
end
def run(bool \\ false), do: GenServer.call(__MODULE__, {:async, bool})
def init(_), do: {:ok, {}}
def handle_call({:async, bool}, _, _) do
%{ref: ref} =
case bool do
true ->
Task.Supervisor.async(MyTaskSupervisor, fn ->
Process.exit(self(), :kill)
AsyncTask.request()
end)
false ->
Task.async(fn ->
Process.exit(self(), :kill)
AsyncTask.request()
end)
end
{:reply, Process.alive?(self()), ref}
end
def handle_info({r, response}, ref) when ref == r do
Logger.debug("response: #{inspect(response)}")
{:noreply, ref}
end
def handle_info({:DOWN, _ref, :process, _pid, status}, s) do
Logger.debug("received status: #{inspect(status)}")
{:noreply, s}
end
# the Genserver will call "terminate" if killed.
def terminate(reason, _s) do
Logger.info("Terminate Child with reason: #{inspect(reason)}")
end
end
We supervise the task runner above as a failing linked process will also kill the parent. It will run the (failing) AsyncTask
:
DynamicSupervisor.start_child(DynSup, %{id: 1, start: {AsyncTaskRunner, :start_link, []}})
{_, pid1, _, _} =
DynamicSupervisor.which_children(DynSup)
|> List.last()
We run the linked (failing) AsyncTask
is a supervised task: Task.Supervisor.async
. We check that the parent process AsyncTaskRunner
hasn’t been restarted (same PID).
AsyncTaskRunner.run()
# check pid
{_, pid2, _, _} =
DynamicSupervisor.which_children(DynSup)
|> List.last()
|> IO.inspect(label: :after_supervised_task)
IO.puts("Same process? #{pid2 == pid2}")
The linked (failing) AsyncTask
is not supervised: Task.async
. We check that a new AsyncTaskRunner
has been restarted by the DynamicSupervisor.
AsyncTaskRunner.run(true)
Process.sleep(1000)
{_, pid3, _, _} =
DynamicSupervisor.which_children(DynSup)
|> List.last()
|> IO.inspect()
IO.puts("Same process? #{pid2 == pid3}")
Concurrency
How fast does it take to launch plenty of genservers concurrently?
Even if using a genserver is clearly a wrong use case (no need for state nor communication), let’s compute the factorial of a number with it. Recall the factorial noted $n!$ of a positive integer $n$ is the product of all the integers from 1 to $n$.
We compute this product in three ways:
-
with a single process using
Enum.reduce
since $n! = n \cdot (n-1)$. -
lazily and concurrently by streaming slices of the list
[1,2,..,n]
into chunks of say 10 elements, and spawn (withGenServer.start
) a genserver of each chunk. Each genserver is responsbile to calculate the subproduct of the integers in the chunk he receives. We finally run anEnum.reduce
on this list of subproducts. This method will lazily run concurrently many genservers. - running the same “stream chunks” process as above, lazily and concurrently, but without genservers.
defmodule FactorialGs do
@moduledoc """
Genserver to compute the product of all integers in the received chunk
"""
def start_link(chunk),
do: GenServer.start(__MODULE__, chunk) |> elem(1) |> GenServer.call(:factorial)
def init(chunk), do: {:ok, chunk}
def handle_call(:factorial, _, s), do: {:reply, Eval.reduce(s), s}
end
defmodule Eval do
@moduledoc """
Expose three functions, `worker/1` and `stream/1` and `reduce/1`.
The first spawns genservers, the second is a pur stream implementation,
and the last is a single reduction process.
"""
@n 10
@doc """
Lazy streaming genservers without recursion
iex> Eval.worker(4)
24
"""
def worker(n) do
1..n
|> Stream.chunk_every(@n)
|> Stream.map(&FactorialGs.start_link(&1))
|> Enum.to_list()
|> Enum.reduce(1, &(&1 * &2))
end
@doc """
Lazy stream recursion without genserver
iex> Eval.stream(4)
24
"""
def stream(n), do: Enum.to_list(1..n) |> rec_stream()
def rec_stream(list) when is_list(list) and length(list) > @n do
list
|> Stream.chunk_every(@n)
|> Stream.map(&Eval.reduce/1)
|> Enum.to_list()
|> rec_stream()
end
def rec_stream(list) when is_list(list), do: Enum.reduce(list, 1, &(&1 * &2))
@doc """
Simple single process reduction
iex> Eval.reduce(1..4)
24
"""
def reduce(list), do: Enum.reduce(list, 1, &(&1 * &2))
end
Eval.worker(3)
When we measure the time (in ms) to compute the factorial of a big number, say 10_000, we see that the concurrent process spawning genservers is faster than a single process, even with the overhead of using genservers. The fastest is the lazy concurrent without the overhead of using genservers. We see that the BEAM is able to spawn around 1000 genservers per ms.
n = 10_000
%{
reduce: :timer.tc(fn -> Eval.reduce(1..n) end) |> elem(0) |> div(1000),
genserver: :timer.tc(fn -> Eval.worker(n) end) |> elem(0) |> div(1000),
stream: :timer.tc(fn -> Eval.stream(n) end) |> elem(0) |> div(1000)
}