Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Task Module Intro

chapter1_tasks-and-results.livemd

Task Module Intro

Tasks and Results

defmodule Sender do
  def send_email(email) do
    Process.sleep(3000)
    IO.puts("delivered email to #{email}")
    {:ok, :email_sent}
  end

  def notify_all(emails) do
    emails
    |> Enum.each(fn email ->
      send_email(email)
    end)
  end
end
Sender.send_email("hello@email.com")
emails = [
  "hello@email.com",
  "hello2@email.com",
  "hello3@email.com",
  "hello4@email.com"
]
# 12 seconds
Sender.notify_all(emails)

Starting Processes

Task.start(fn -> IO.puts("Hello async world!") end)
defmodule Sender2 do
  def send_email(email) do
    Process.sleep(3000)
    IO.puts("delivered email to #{email}")
    {:ok, :email_sent}
  end

  def notify_all(emails) do
    emails
    |> Enum.each(fn email ->
      Task.start(fn ->
        send_email(email)
      end)
    end)
  end
end
# 3 seconds
Sender2.notify_all(emails)

Result of a task

task =
  Task.async(fn ->
    Sender.send_email("hello@world.com")
  end)

IO.puts("email queued")

Task.yield(task, 500) |> IO.inspect()

Process.sleep(1000)

Task.yield(task, 500) |> IO.inspect()

Process.sleep(1000)

Task.yield(task, 1000) |> IO.inspect()
task =
  Task.async(fn ->
    Sender.send_email("hello@world.com")
  end)

IO.puts("email queued")

Task.await(task, 5000) |> IO.inspect()
task =
  Task.async(fn ->
    Sender.send_email("hello@world.com")
  end)

IO.puts("email queued")

Task.shutdown(task, 1000) |> IO.inspect()

Task.yield(task, 500)

Task.yield/1 will return {:ok, } if the task is complete or nil if the task in incomplete. By default yield/1 will wait for 5 seconds before returning either result. You can give it a timeout in ms. You only get the ONCE and nil afterwards. `Task.await/1` will return if the task completes during the timeout (default 5000ms) or throws an exception if the task does not complete during the timeout.

Task.shutdown/1 will wait a default of 5000ms before shutting down the given task.

defmodule Sender do
  def send_email(email) do
    Process.sleep(3000)
    IO.puts("delivered email to #{email}")
    {:ok, :email_sent}
  end

  def notify_all(emails) do
    emails
    |> Enum.map(fn email ->
      Task.async(fn ->
        send_email(email)
      end)
    end)
    |> Enum.map(&Task.await/1)
  end
end
Sender.notify_all(emails)

Managing a series of tasks

There is a middle way between doing actions from a list one at time and spawning all the actions into backgrond processes at once.

One approach is async_stream/3 from the Task module.

For every item it will start a process and run the function we provide to process the item. You can set a limit on the number of processes running at the same time.

e.g. concurrency limit set to four so that at most you have four processes running while processing a list of arbitrary length.

async_stream/3 returns a stream

Streams in Elixir are data structures that hold one more more operations that only run when told to run. They can also be called lazy enumerables.

Task.async_stream(emails, &Sender.send_email/1)

The code above returned a function but did not actually execute any sending code yet.

Compare Enum.map vs Stream.map

Enum.map([1, 2, 3], &(&1 * 2))
Stream.map([1, 2, 3], &(&1 * 2))

Many Enum functions have a lazy alternative in the Stream module

One way to run a stream is the Stream.run/1 function.

But it always returns :ok so only useful if you don’t are about the result.

Enum.to_list/1 will try and convert the stream into a list data structure by running all the operations in the stream and storing the result in a list.

Enum.reduce/3 is useful if you want to do more work with the result.

defmodule Sender do
  def send_email(email) do
    Process.sleep(3000)
    IO.puts("delivered email to #{email}")
    {:ok, :email_sent}
  end

  def notify_all(emails) do
    emails
    |> Task.async_stream(&send_email/1, max_concurrency: 2, ordered: false)
    |> Enum.to_list()
  end
end
Sender.notify_all(emails)

In addition to the max_concurrency option you can set :ordered.

By default async_stream/3 assumes we want the results in the same order as the given list. That means processing can be slowed down as processing waits for a slow item in the list.

You can also specify how to handle task timeouts e.g. on_timeout: :kill_task

Linking Processes

Processes in Elixir can be linked together.

Task processes are automatically linked to the process that started them.

Process links can help shutdown parts or whole systems to protect the entire system from bad state.

By default linked processes will terminate and clean up the memory they use so the linked processes don’t continue to try and work from bad state.

But a chain reaction is only necessary for severe problems.

Imagine a process responsible for talking to the database and every other piece of the system depends on that process reliably getting data to and from the database. When that process crashes you should absolutely cascade process termination throughout the system.

But a process responsible for sending email is not so critical. Every part of the system that isn’t concerned with sending email can continue to run normally if the email sending process crashes.

You can isolate crashes by configuring a process to trap exits. v

Supervisors

Elixir supervisors manage groups of processes.

They can start/stop/restart processes.

They are configured to trap exits.

Supervised processes are called child processes

Supervisors and child processes form a supervision tree

Elixir provides building blocks for writing your own supervisors and also ships with some built-in supervisors.

One built-in supervisor is Task.Supervisor and made specifically for supervising Task processes.

A child specification is usually a tuple of information the supervisor uses to manage a process.

{
  Task.Supervisor,
  name: Sender.EmailTaskSupervisor
}

It’s common to append “Supervisor” when naming supervisor processes.

A child specification can also be a map

%{
  id: Sender.EmailTaskSupervisor,
  start: {
    Task.Supervisor,
    :start_link,
    [[name: Sender.EmailTaskSupervisor]]
  }
}

A map is more verbose but also more configurable.

There’s also a helper function called Supervisor.child_spec/1 which returns a map and lets you override only the keys you need.

Raising an exception

defmodule Sender do
  def send_email(email = "hello3@email.com") do
    raise "Cannot deliver to #{email}"
  end

  def send_email(email) do
    Process.sleep(3000)
    IO.puts("delivered email to #{email}")
    {:ok, :email_sent}
  end

  def notify_all(emails) do
    emails
    |> Task.async_stream(&send_email/1, max_concurrency: 2, ordered: false)
    |> Enum.to_list()
  end
end
# will kill the Livebook process
# Sender.notify_all(emails)

The above code kills the process running the Livebook because the exception bubbles up with no supervision.

Supervision

opts = [
  strategy: :one_for_one,
  name: Sender.Supervisor
]

children = [
  %{
    id: Sender.EmailTaskSupervisor,
    start: {
      Task.Supervisor,
      :start_link,
      [[name: Sender.EmailTaskSupervisor]]
    }
  }
]

Supervisor.start_link(children, opts)
defmodule Sender do
  def send_email(email = "hello3@email.com") do
    raise "Cannot deliver to #{email}"
  end

  def send_email(email) do
    Process.sleep(3000)
    IO.puts("delivered email to #{email}")
    {:ok, :email_sent}
  end

  def notify_all(emails) do
    Sender.EmailTaskSupervisor
    |> Task.Supervisor.async_stream_nolink(emails, &send_email/1)
    |> Enum.to_list()
  end
end
Sender.notify_all(emails)

With supervision in place, the email delivery crash is supervised and does not crash the Livebook process.

Let It Crash

We used Task.Supervisor to isolate a process crash. We did not try and prevent the crash by simply adding error handling.

In practice you should provide error handling when you expect an error to be possible and leave the supervision handling as a last resort.

The phrase “let it crash” does not mean “no error handling”.

{:ok, result} and {:error, reason} are frequently used in pattern matching

Elixir also has try/rescue

But ultimately we know errors can happen. Some languages deal with the reality of errors by trying to wrap every call in error handling.

Erlang/Elixir take a different approach.

They focus on how to build systems that can recover from inevitable crashes, not on trying to build systems that will never crash.

With Erlang/Elixir you can choose how your system handles crashes in its different parts. Should the entire system crash and restart? Should only a branch of the system crash and restart? Should only a single process crash and restart?

As an example consider Supervisors. They have three different restart values for their child processes

  • :temporary will never restart child processes
  • :transient will restart child processes when they exit with an error
  • :permanent will always restart child processes

In Elixir when we say a process restarts that means restarting from its clean initial state.

If a supervisor detects a child process that it’s restarting is crashing too frequently then the supervisor itself will crash.

Wrap up Task

The Task module is fantastic but only useful for running one-off functions concurrently.

Task processes are generally short-lived and exit when they complete. Not the sort of processes to build a forever running system.

We need a process that can run and maintain an internal state for the lifetime of a system.

We need a “generic server” process.