Awaiting Tasks
Mix.install([])
Navigation
Home Introduction to tasksTask async streamIntroduction
Till now we have seen examples were we spawn a process to do something concurrently however sometimes we might need the value returned by the process. In such situations it is important to wait for the process to complete and then fetch the result.
The Task module provides the async and await functions to handle this common use case. With Task.async/1, a new process is created, linked, and monitored by the caller. Once the task action finishes, a message containing the result is sent to the caller. The Task.await/2 function is then used to read this message and obtain the result.
Here are some key points to note about async and await:
-
Async tasks establish a link between the caller and the spawned process. If either the caller or the task crashes, the other process will crash as well. This intentional linkage ensures that the computation is not carried out if the process meant to receive the result no longer exists.
-
When using async tasks, it is important to await a reply as they are always sent. If you don’t expect a reply but still want to launch a linked process, consider using
Task.start_link/1instead.
Lets look at some code to understand this better…
my_task =
Task.async(fn ->
# Sleep for 2 seconds
:timer.sleep(2000)
IO.puts("Done sleeping")
DateTime.utc_now()
end)
|> IO.inspect()
# Notice how the above task process is linked to the caller process
Process.info(self(), :links)
|> IO.inspect(label: "Parent process #{inspect(self())} links")
Task.await(my_task) |> IO.inspect(label: "Task returned")
Lets do this again, but check the process mailbox to see the message returned by the spawned task process.
my_task = Task.async(fn -> "return value" end)
IO.inspect(my_task)
# Wait for process to complete
:timer.sleep(100)
:erlang.process_info(self(), :messages) |> IO.inspect(label: "Messages in mailbox")
Task.await(my_task)
Here notice how the spawed task process sends a message back to the caller in the format
{, }. The Task.await/1 call basically awaits this message in a recieve block like so…
receive do
# The reply message from the task
{^ref, reply} ->
# Stop monitoring the task since th task has sent a reply so must have completed successfully so we no longer monitor the process for crashes
demonitor(ref)
reply
# This is the message received from the task monitor, if this happens it means we received the :DOWN message without getting the reply message first, which means the task crashed
{:DOWN, ^ref, _, proc, reason} ->
# Exit the linked caller process that is awaiting since the task process crashed
exit({reason(reason, proc), {__MODULE__, :await, [task, timeout]}})
# more code
end
The other message returned is {:DOWN, ref, :process, pid, reason} - since all tasks are also monitored, you will also receive the :DOWN message delivered by Process.monitor/1. If you receive the :DOWN message without getting the reply message, it means the task crashed.
At any point we can ignore a linked task by calling Task.ignore/1 which means the task will continue running, but it will be unlinked and we can no longer yield, await or shut it down. Also this means if the task fails the owner process will be unaffected. Lets look at and example…
time_bomb_task =
Task.async(fn ->
:timer.sleep(2000)
raise "BOOOOM!"
end)
|> IO.inspect()
IO.inspect(Process.info(self(), :links), label: "Parent process #{inspect(self())} links")
# Unlink the spawned task
Task.ignore(time_bomb_task)
IO.inspect(Process.info(self(), :links), label: "Parent process #{inspect(self())} links")
:timer.sleep(2100)
IO.puts("Parent process survived!")
Lets see another example were we launch 3 tasks using the Task.async/3 function that takes mfa(module function args) as arguments. Each of tasks generating a random number.
We then await there results and return the sum of the random numbers
my_task1 = Task.async(Enum, :random, [0..10])
my_task2 = Task.async(Enum, :random, [10..20])
my_task3 = Task.async(Enum, :random, [20..30])
Task.await(my_task1) + Task.await(my_task2) + Task.await(my_task3)
In cases were wee need to await multiple tasks the Task module provides a better apporach using the Task.await_many/1 that awaits replies from multiple tasks and returns them as a list.
For example we could rewrite the above example like so
my_task1 = Task.async(Enum, :random, [0..10])
my_task2 = Task.async(Enum, :random, [10..20])
my_task3 = Task.async(Enum, :random, [20..30])
results = Task.await_many([my_task1, my_task2, my_task3])
IO.inspect(results, label: "Results from await_many")
Enum.sum(results)
Some important points to note about Task.await_many/1 are…
- If any of the task processes dies, the caller process will exit with the same reason as that task.
- It returns a list of the results, in the same order as the tasks supplied in the tasks input argument.
- A timeout, in milliseconds or :infinity, can be given with a default value of 5000. If the timeout is exceeded, then the caller process will exit. Any task processes that are linked to the caller process (which is the case when a task is started with async) will also exit. Any task processes that are trapping exits or not linked to the caller process will continue to run.
Task await timeouts
When calling Task.await/1 by default the await timeout is 5 seconds after which the caller process will exit. If the task process is linked to the caller process which is the case when a task is started with async, then the task process will also exit. If the task process is trapping exits or not linked to the caller process, then it will continue to run.
Lets look at an example…
dont_await_me = Task.async(fn -> :timer.sleep(:infinity) end)
Task.await(dont_await_me)
The Task.await/1 function can only be called once for any given task.
If we want to check if a task has completed or not and not risk the caller process exiting we must use Task.yield/2.
Yielding tasks
Sometimes we only wish to check if a Task is completed within a given timeout, if not we want the caller process to continue. Unlike Task.await/1 were the caller process exits in cases of timeouts with Task.yield/2 the caller process will continue to run if the Task has not yet completed within the timeout. Therefore Task.yield/2 can be called multiple times on the same task.
Just like await the yield function will also block the caller process until the task completes or the timeout is reached.
These are the different scenarios when calling Task.yield/1
-
When the task process finishes within the yield timeout - Returns
{:ok, result}wereresultis the value returned by the task. -
When the task process does not reply within the yield timeout - Returns
nil. This can happen if the timeout expires OR if the message from the task has already been consumed by the caller. -
When the task process has already exited OR if the task is not linked to the calling process - Returns
{:exit, reason}
Now lets look at some code…
heavy_task =
Task.async(fn ->
:timer.sleep(5000)
:finished_heavy_task
end)
Task.yield(heavy_task, 1000) |> IO.inspect(label: "after 1 second")
Task.yield(heavy_task, 1000) |> IO.inspect(label: "after 2 second")
Task.yield(heavy_task, 1000) |> IO.inspect(label: "after 3 second")
Task.yield(heavy_task, 1000) |> IO.inspect(label: "after 4 second")
:timer.sleep(1500)
:erlang.process_info(self(), :messages) |> IO.inspect(label: "Messages in mailbox")
Task.yield(heavy_task, 1000) |> IO.inspect(label: "After task finished")
:erlang.process_info(self(), :messages) |> IO.inspect(label: "Messages in mailbox")
Task.yield(heavy_task, 1000) |> IO.inspect(label: "After message from task was consumed")
Similar to Task.await_many/2 we also have Task.yield_many/2
This function receives a list of tasks and waits for their replies in the given time interval. It returns a list of two-element tuples, with the task as the first element and the yielded result as the second. The tasks in the returned list will be in the same order as the tasks supplied in the tasks input argument.
Similarly to yield/2, each task’s result will be {:ok, term} if the task has successfully reported its result back in the given time interval or {:exit, reason} if the task has died
nil if the task keeps running past the timeout
tasks =
for i <- 1..10 do
Task.async(fn ->
Process.sleep(i * 1000)
i
end)
end
tasks_with_results = Task.yield_many(tasks)
results =
Enum.map(tasks_with_results, fn {task, res} ->
# Shut down the tasks that did not reply or exit
res || Task.shutdown(task, :brutal_kill)
end)
# Here we are matching only on {:ok, value} and
# ignoring {:exit, _} (crashed tasks) and `nil` (no replies)
for {:ok, value} <- results do
IO.inspect(value)
end
In the example above, we create tasks that sleep from 1 up to 10 seconds and return the number of seconds they slept for. If you execute the code all at once, you should see 1 up to 4 printed, as those were the tasks that have replied in the default timeout (5 seconds) of Task.yield_many/1. All other tasks will have been shut down using the Task.shutdown/2 call.
As a convenience, you can achieve a similar behaviour to above by specifying the :on_timeout option to be :kill_task (or :ignore).
For example to kill all tasks which do not yield within 7 seconds we can write
Task.yield_many(tasks_list, timeout: 7000, on_timeout: :kill_task) (this option is available from elixir 1.15.0+)