Task
Mix.install([
{:req, "~> 0.3"}
])
description
- Task module basics
- https://elixirpatterns.dev
- https://pragprog.com/titles/sgdpelixir/concurrent-data-processing-in-elixir
Task module
-
OTP compliant concurrency primitive
-
perform work in separate processes
-
can be used in two different ways:
- awaited
- non-awaited
> …processes meant to execute one particular action throughout their lifetime, often with little or no communication with other processes. The most common use case for tasks is to convert sequential code into concurrent code by computing a value asynchronously
Task.async with Task.await
expensive_work = fn -> Process.sleep(1_500) end
# measure how long our code takes to execute
{time, _result} =
:timer.tc(fn ->
1..10
|> Enum.map(fn _ ->
# the spawned process is linked to and monitored by the parent process
# if it results in an error, the parent process is terminated
# if the parent process is terminated, the task process is terminated
Task.async(expensive_work)
end)
|> Task.await_many()
end)
System.convert_time_unit(time, :microsecond, :millisecond)
If timeouts are unacceptable for the operation at hand, it may be best to let the task and the parent process crash using await and rely instead on a supervisor to bring everything back up and attempt the task again
task =
Task.async(fn ->
Process.sleep(1000)
:task_completed
end)
:task_completed = Task.await(task)
# this will crash
Task.await(task, 500)
Task.async with Task.yield
task =
Task.async(fn ->
Process.sleep(1000)
:task_completed
end)
{:ok, :task_completed} = Task.yield(task)
# this will not crash
nil = Task.yield(task, 500)
Task.async_stream
## HTTP stress tester using Task.async_stream/3
opts = %{
total_requests: 100,
# how many tasks will be running simultaneously.
max_concurrency: 10,
url: "https://www.google.com"
}
average_time =
1..opts.total_requests
|> Task.async_stream(
fn _ ->
start_time = System.monotonic_time()
Req.get!(opts.url)
end_time = System.monotonic_time()
end_time - start_time
end,
max_concurrency: opts.max_concurrency
)
|> Enum.reduce(0, fn {:ok, req_time}, acc ->
acc + req_time
end)
|> System.convert_time_unit(:native, :millisecond)
|> Kernel./(opts.total_requests)
|> dbg()
Compare task functions
defmodule MyTask do
defmodule Mailer do
# Sends one email
def deliver_now("error@example.com" = _email) do
# raise "Oops, couldn't send email to #{email}!"
:error
end
def deliver_now(email) do
Process.sleep(:timer.seconds(2))
IO.puts("Email to #{email} sent")
{:ok, "email_sent"}
end
end
# Send one by one sequentially
def send_emails(emails, :sequential) do
Enum.each(emails, &Mailer.deliver_now/1)
end
# Send async then do not care about the result
def send_emails(emails, :async_forget) do
Enum.each(emails, fn email ->
Task.start_link(fn -> Mailer.deliver_now(email) end)
end)
end
# Send async and get the result
def send_emails(emails, :async_await) do
emails
|> Enum.map(fn email ->
Task.async(fn -> Mailer.deliver_now(email) end)
end)
|> Enum.map(&Task.await/1)
end
# Send async with concurrency limit (back pressure) and do not care about the result
def send_emails(emails, :async_stream_forget) do
emails
|> Task.async_stream(&Mailer.deliver_now/1)
|> Stream.run()
end
# If we do not care about the order of the result, we can potentially speed up the operation
def send_emails(emails, :async_stream_unordered) do
emails
|> Task.async_stream(&Mailer.deliver_now/1, order: false)
|> Enum.to_list()
end
# We can kill tasks that take longer than timeout (default 5 seconds) instead of exiting
def send_emails(emails, :async_stream_kill_task_on_timeout) do
emails
|> Task.async_stream(&Mailer.deliver_now/1, on_timeout: :kill_task)
|> Enum.to_list()
end
# The caller won't crash when a task is crashed.
def send_emails(emails, :async_stream_supervised) do
# Custom TaskSupervisor must be already started
MyApp.TaskSupervisor
|> Task.Supervisor.async_stream_nolink(emails, &Mailer.deliver_now/1)
|> Enum.to_list()
end
end
emails = ["111@example.com", "error@example.com", "222@example.com"]
# MyTask.send_emails(emails, :sequential)
# MyTask.send_emails(emails, :async_forget)
# MyTask.send_emails(emails, :async_await)
MyTask.send_emails(emails, :async_stream_forget)
# MyTask.send_emails(emails, :async_stream_unordered)
# MyTask.send_emails(emails, :async_stream_kill_task_on_timeout)
# MyTask.send_emails(emails, :async_stream_supervised)