Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Tasks

02-tasks.livemd

Tasks

Processes On Steroids

Tasks are processes that handle a bunch of the nitty-gritty details for you. They’re the natural evolution from processes for many use cases.

Let’s start by exploring the async + await use case.

async and await

Task.async/1 spawns a new task, similarly to spawn/1. Instead of sending messages back and forth, you can use Task.await/1 to collect the value returned by the spawned task.

task =
  Task.async(fn ->
    Process.sleep(1000)
    IO.puts("Expensive computation is done!")
    Enum.random(1..100)
  end)

IO.puts("Running task...")
Task.await(task)

Yielding

Task.yield/2 is similar to Task.await/1, but it returns nil if the task doesn’t return a value within the specified timeout (Task.await/1 exits instead).

task =
  Task.async(fn ->
    Process.sleep(1000)
    IO.puts("Expensive computation is done!")
    Enum.random(1..100)
  end)

Task.yield(task, _timeout = 5000)

As you can see, if the task returns in time, Task.yield/2 returns {:ok, result}. Let’s see what happens if the task doesn’t return in time, instead:

task =
  Task.async(fn ->
    Process.sleep(1000)
    IO.puts("Expensive computation is done!")
    Enum.random(1..100)
  end)

Task.yield(task, _timeout = 500)

Task.yield/2 returns nil, but after a while the task seems to still print something. That’s because Task.yield/2 “peeks” into whether the task finished, but doesn’t shut the task down in case it hasn’t finished. To stop the task, we can use Task.shutdown/1.

Task.yield/2 and Task.shutdown/1 are often combined to implement the use case when you need a computation to be bound by time. It goes something like this:

  1. Start the computation
  2. Do some other work on the side
  3. When you’re ready, check the result of the task with Task.yield/2.
  4. If the task does not complete within the timeout, shut down the task.

Task.shutdown/1 also takes care of race conditions, which can happen in case the task completes right as we are telling it to shut down.

task =
  Task.async(fn ->
    Process.sleep(1000)
    IO.puts("Expensive computation is done!")
    Enum.random(1..100)
  end)

IO.puts("Running task...")
Task.yield(task, 500) || Task.shutdown(task)

Parallel Map — Take #2 with async_stream

Task provides the most underrated function (IMO) in all of Elixir’s standard library: Task.async_stream/3. It takes an enumerable and a function, and returns a stream that maps the function over the enumerable in parallel.

stream =
  Task.async_stream([200, 100, 400], fn timeout ->
    Process.sleep(timeout)
    IO.puts("Slept for #{timeout} ms")
    timeout * 2
  end)

Enum.to_list(stream)

Seems like nothing special, right? Well, it is!

async_stream‘s coolest feature is that it uses a bounded number of processes. You can control this number through the :max_concurrency option, and it defaults to the number of cores on your machine. This feature is huge: our previous naive parallel-map implementation would spawn one process per element in the enumerable, regardless of the number of elements. Billions of processes? Not good. async_stream will happily churn through infinite streams, using :max_concurrency processes at a time.

async_stream is also flexible. It accepts any enumerable as its input (including infinite streams) and returns itself an enumerable.

When to Use Tasks

  • If you want to perform a few requests to different services and then collect the results
  • If you need a simple parallel mapping approach
  • If you need to perform a computation in a limited timeframe and want to stop it if it times out
  • When you want to spawn a computation in the background (Task.start/1), for something like side effects

Practical Tips

Tip #1 — ordered: false with async_stream

If you’re using Task.async_stream/1 and don’t care about the ordering of results, use the ordered: false option.

This is great for when you’re using async_stream/1 to parallelize side-effects over a collection, for example. It’s also useful when you’re going to do something with the mapped collection that doesn’t require ordering, like aggregating into a map.

print_after_timeout = fn timeout ->
  Process.sleep(timeout)
  IO.puts("Slept for #{timeout} ms")
  timeout
end

[200, 100, 400]
|> Task.async_stream(print_after_timeout, ordered: false)
|> Enum.to_list()

As you can see, the results are returned in the order in which they finish computing, and not in the order of the original list.

Tip #2 — Follow the Documentation for Task.yield/2

The documentation for Task.yield/2 has a great code snippet to use when you need to perform a time-capped computation.

task =
  Task.async(fn ->
    Process.sleep(Enum.random(499..501))
    IO.puts("Done!")
  end)

case Task.yield(task, 500) || Task.shutdown(task) do
  {:ok, result} -> result
  nil -> :timeout
end

Tip #3 — async_stream Goes a Long Way

Before talking about GenStage, Broadway, and Flow, I want to stress the importance of async_stream. I’ve seen many cases of solutions that used GenStage or Flow that were essentially overengineered async_streams. async_stream has some limitations, but combining the bounded number of processes, the optional ordering, and the fact that it processes lazy streams makes it a great choice in many situations.