Task Module Intro
Tasks and Results
defmodule Sender do
def send_email(email) do
Process.sleep(3000)
IO.puts("delivered email to #{email}")
{:ok, :email_sent}
end
def notify_all(emails) do
emails
|> Enum.each(fn email ->
send_email(email)
end)
end
end
Sender.send_email("hello@email.com")
emails = [
"hello@email.com",
"hello2@email.com",
"hello3@email.com",
"hello4@email.com"
]
# 12 seconds
Sender.notify_all(emails)
Starting Processes
Task.start(fn -> IO.puts("Hello async world!") end)
defmodule Sender2 do
def send_email(email) do
Process.sleep(3000)
IO.puts("delivered email to #{email}")
{:ok, :email_sent}
end
def notify_all(emails) do
emails
|> Enum.each(fn email ->
Task.start(fn ->
send_email(email)
end)
end)
end
end
# 3 seconds
Sender2.notify_all(emails)
Result of a task
task =
Task.async(fn ->
Sender.send_email("hello@world.com")
end)
IO.puts("email queued")
Task.yield(task, 500) |> IO.inspect()
Process.sleep(1000)
Task.yield(task, 500) |> IO.inspect()
Process.sleep(1000)
Task.yield(task, 1000) |> IO.inspect()
task =
Task.async(fn ->
Sender.send_email("hello@world.com")
end)
IO.puts("email queued")
Task.await(task, 5000) |> IO.inspect()
task =
Task.async(fn ->
Sender.send_email("hello@world.com")
end)
IO.puts("email queued")
Task.shutdown(task, 1000) |> IO.inspect()
Task.yield(task, 500)
Task.yield/1
will return {:ok, }
if the task is
complete or nil
if the task in incomplete. By default yield/1
will
wait for 5 seconds before returning either result. You can give it a
timeout in ms. You only get the ONCE and nil afterwards. `Task.await/1` will return
if the task completes
during the timeout (default 5000ms) or throws an exception if the
task does not complete during the timeout.
Task.shutdown/1
will wait a default of 5000ms before shutting down
the given task.
defmodule Sender do
def send_email(email) do
Process.sleep(3000)
IO.puts("delivered email to #{email}")
{:ok, :email_sent}
end
def notify_all(emails) do
emails
|> Enum.map(fn email ->
Task.async(fn ->
send_email(email)
end)
end)
|> Enum.map(&Task.await/1)
end
end
Sender.notify_all(emails)
Managing a series of tasks
There is a middle way between doing actions from a list one at time and spawning all the actions into backgrond processes at once.
One approach is async_stream/3
from the
Task
module.
For every item it will start a process and run the function we provide to process the item. You can set a limit on the number of processes running at the same time.
e.g. concurrency limit set to four so that at most you have four processes running while processing a list of arbitrary length.
async_stream/3
returns a stream
Streams in Elixir are data structures that hold one more more operations that only run when told to run. They can also be called lazy enumerables.
Task.async_stream(emails, &Sender.send_email/1)
The code above returned a function but did not actually execute any sending code yet.
Compare Enum.map
vs Stream.map
Enum.map([1, 2, 3], &(&1 * 2))
Stream.map([1, 2, 3], &(&1 * 2))
Many Enum
functions have a lazy
alternative in the Stream
module
One way to run a stream is the
Stream.run/1
function.
But it always returns :ok
so only
useful if you don’t are about the
result.
Enum.to_list/1
will try and convert
the stream into a list data structure
by running all the operations in the
stream and storing the result in a
list.
Enum.reduce/3
is useful if you want
to do more work with the result.
defmodule Sender do
def send_email(email) do
Process.sleep(3000)
IO.puts("delivered email to #{email}")
{:ok, :email_sent}
end
def notify_all(emails) do
emails
|> Task.async_stream(&send_email/1, max_concurrency: 2, ordered: false)
|> Enum.to_list()
end
end
Sender.notify_all(emails)
In addition to the max_concurrency
option
you can set :ordered
.
By default async_stream/3
assumes we
want the results in the same order as
the given list. That means processing
can be slowed down as processing waits
for a slow item in the list.
You can also specify how to handle
task timeouts e.g.
on_timeout: :kill_task
Linking Processes
Processes in Elixir can be linked together.
Task processes are automatically linked to the process that started them.
Process links can help shutdown parts or whole systems to protect the entire system from bad state.
By default linked processes will terminate and clean up the memory they use so the linked processes don’t continue to try and work from bad state.
But a chain reaction is only necessary for severe problems.
Imagine a process responsible for talking to the database and every other piece of the system depends on that process reliably getting data to and from the database. When that process crashes you should absolutely cascade process termination throughout the system.
But a process responsible for sending email is not so critical. Every part of the system that isn’t concerned with sending email can continue to run normally if the email sending process crashes.
You can isolate crashes by configuring a process to trap exits. v
Supervisors
Elixir supervisors manage groups of processes.
They can start/stop/restart processes.
They are configured to trap exits.
Supervised processes are called child processes
Supervisors and child processes form a supervision tree
Elixir provides building blocks for writing your own supervisors and also ships with some built-in supervisors.
One built-in supervisor is
Task.Supervisor
and made specifically for supervising
Task processes.
A child specification is usually a tuple of information the supervisor uses to manage a process.
{
Task.Supervisor,
name: Sender.EmailTaskSupervisor
}
It’s common to append “Supervisor” when naming supervisor processes.
A child specification can also be a map
%{
id: Sender.EmailTaskSupervisor,
start: {
Task.Supervisor,
:start_link,
[[name: Sender.EmailTaskSupervisor]]
}
}
A map is more verbose but also more configurable.
There’s also a helper function called
Supervisor.child_spec/1
which
returns a map and lets you override
only the keys you need.
Raising an exception
defmodule Sender do
def send_email(email = "hello3@email.com") do
raise "Cannot deliver to #{email}"
end
def send_email(email) do
Process.sleep(3000)
IO.puts("delivered email to #{email}")
{:ok, :email_sent}
end
def notify_all(emails) do
emails
|> Task.async_stream(&send_email/1, max_concurrency: 2, ordered: false)
|> Enum.to_list()
end
end
# will kill the Livebook process
# Sender.notify_all(emails)
The above code kills the process running the Livebook because the exception bubbles up with no supervision.
Supervision
opts = [
strategy: :one_for_one,
name: Sender.Supervisor
]
children = [
%{
id: Sender.EmailTaskSupervisor,
start: {
Task.Supervisor,
:start_link,
[[name: Sender.EmailTaskSupervisor]]
}
}
]
Supervisor.start_link(children, opts)
defmodule Sender do
def send_email(email = "hello3@email.com") do
raise "Cannot deliver to #{email}"
end
def send_email(email) do
Process.sleep(3000)
IO.puts("delivered email to #{email}")
{:ok, :email_sent}
end
def notify_all(emails) do
Sender.EmailTaskSupervisor
|> Task.Supervisor.async_stream_nolink(emails, &send_email/1)
|> Enum.to_list()
end
end
Sender.notify_all(emails)
With supervision in place, the email delivery crash is supervised and does not crash the Livebook process.
Let It Crash
We used Task.Supervisor
to isolate
a process crash. We did not try and
prevent the crash by simply adding
error handling.
In practice you should provide error handling when you expect an error to be possible and leave the supervision handling as a last resort.
The phrase “let it crash” does not mean “no error handling”.
{:ok, result}
and {:error, reason}
are frequently used in pattern matching
Elixir also has try/rescue
But ultimately we know errors can happen. Some languages deal with the reality of errors by trying to wrap every call in error handling.
Erlang/Elixir take a different approach.
They focus on how to build systems that can recover from inevitable crashes, not on trying to build systems that will never crash.
With Erlang/Elixir you can choose how your system handles crashes in its different parts. Should the entire system crash and restart? Should only a branch of the system crash and restart? Should only a single process crash and restart?
As an example consider Supervisors. They have three different restart values for their child processes
-
:temporary
will never restart child processes -
:transient
will restart child processes when they exit with an error -
:permanent
will always restart child processes
In Elixir when we say a process restarts that means restarting from its clean initial state.
If a supervisor detects a child process that it’s restarting is crashing too frequently then the supervisor itself will crash.
Wrap up Task
The Task
module is fantastic
but only useful for running one-off
functions concurrently.
Task processes are generally short-lived and exit when they complete. Not the sort of processes to build a forever running system.
We need a process that can run and maintain an internal state for the lifetime of a system.
We need a “generic server” process.