Easy Concurreny with the Task Module
The Task Module
Running code concurrently in Elixir requires your program to start a process and execute the code in that process.
Elixir ships with a module called Task
which simplifies starting concurrent processes. It provides an abstraction for running code concurrently, retrieving results, handling errors and starting a series of processes.
Task.start/1
has one limitation by design: it does not return the result of the function that was executed. To retrieve the result of a function, you have to use Task.async/1
which returns a %Task{}
struct which you can assign to a variable for later use. By default a task has 5000ms
to run, otherwise an execption is raised. Of course, this can be changed by passed a second parameter to Task.async/2
with the amount of milliseconds, e.g. Task.await(task, 10_000)
.
Task.yield/1
returns nil
if the task hasn’t completed, this function also has a default timeout of 5000ms
except it does not raise an exception upon timeout. Task.yield/1
can be called repeatedly on a %Task{}
struct and if there is a value to be yielded, it will be in the form of {:ok, result}
. After this result is yielded, if Task.yield/1
is called again on that %Task{}
, it will return nil
again. Altogether, Task.yield/1
gives you access to a “pulling” mechanism which in other programming languages or paradigms might require a more complicated setup.
Something to consider is what if a %Task{}
is stuck and never finishes? Task.await/1
takes care of stopping the task but Task.yield/1
will leaving it running. Tasks can be manually stopped by called Task.shutdown(task)
so if you desire to have some shutdown criteria for a task beyond a number of milliseconds it runs for, you have the ability to write your own logic to do so.
Managing Series of Tasks
If you have one million users and you want to send an email to all of them. Using Enum.map/2
and Task.async/1
could be used but this will start one million processes and put sudden pressure on the system. This could lead to degradation in the system’s performance and potentially make other services unresponsive it not architected well.
Ideally you want to be able to run Task
processes to leverage concurrency, but ensure you do not overload the system’s resources as the product scales and the user base increases.
The solution to this problem is Task.async_stream/3
. It is designed to create task processes from a list of items. It works just like the combination of Enum.map/2
and Task.async/1
, with one main difference: you can set a limit on the number of processes running at the same time. This configurable limit is known as handling back-pressure.
Task.async_stream/3
returns a Stream
, which is a data structure that holds one or more operations that don’t run immediately, only when explicitly told so. These are sometimes called lazy enumerables.
The two main options for controlling the behavior of your task stream are max_concurrency: bool()
and ordered: bool()
. See Elixir Docs: Task.async_stream() for the full spec.
Sending Emails with Tasks
defmodule Sender do
def notify_all(emails) do
emails
|> Task.async_stream(&send_email/1)
|> Enum.to_list()
end
def send_email("konnichiwa@world.com" = email) do
raise "Oops, couldn't send emailt to #{email}"
end
def send_email(email) do
Process.sleep(3000)
IO.puts("Email to #{email} sent")
{:ok, "email_sent"}
end
end
{:module, Sender, <<70, 79, 82, 49, 0, 0, 10, ...>>, {:send_email, 1}}
# Supervisor.start_child(
# Task.Supervisor,
# %{
# id: Sender.EmailTaskSupervisor,
# start: {
# Task.Supervisor,
# :start_link,
# [[name: Sender.EmailTaskSupervisor]]
# }
# }
# )
nil
emails = [
"hello@world.com",
"hola@world.com",
"nihao@world.com",
"konnichiwa@world.com"
]
Sender.notify_all(emails)