Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Supervised Tasks

chapters/ch_7.4_supervised_tasks.livemd

Supervised Tasks

Mix.install([
  {:httpoison, "~> 2.1"},
  {:kino, "~> 0.9.0"}
])

Navigation

Home Task async streamAgents

Supervising tasks

In the previous chapters, we have learned how to start tasks and await their completion. These tasks are typically linked to the caller process and are not supervised.

Supervisors, as we have previously discovered, offer valuable control and visibility over processes.

By supervising our tasks, we can enhance our visibility and control over them. We can unlink tasks from the caller process to prevent cascading failures, and still have the ability to await the task’s completion, among other benefits.

In Elixir, we are provided with the Task.Supervisor module, which allows us to dynamically supervise tasks easily and oversee the tasks we spawn.

Lets look at the some code to understand how this works…

We first start a Task.Supervisor as a part of our supervision tree, here we name it AwesomeTaskSupervisor. We can pass other options in the child specifications like :max_restarts, :max_seconds, etc. All the options are documented here in the Task.Supervisor.start_link/1 docs.

children = [{Task.Supervisor, name: AwesomeTaskSupervisor}]

Supervisor.start_link(children, strategy: :one_for_one)

We can now start a supervised Task under our supervisor like so…

task =
  Task.Supervisor.async(
    AwesomeTaskSupervisor,
    fn ->
      :timer.sleep(200)
      IO.puts("Hello from a supervised task!")
      :returned_from_task
    end
  )
  |> IO.inspect(label: "TASK")

Process.info(self(), :links)
|> IO.inspect(label: "Parent process #{inspect(self())} links")

# Wait for spawed task to finish
:timer.sleep(300)

# Notice the the reply message and the "DOWN" message from the completed task
# The "DOWN" message is because of the monitor of the task
IO.inspect(Process.info(self(), :messages))

Note that in the above example the spawed task was still linked to the caller process and was monitered by the caller process. This means that if the task process crashed it would bring down the caller process as well.

In the above example we use a single task supervisor process to launch multiple tasks, this can quickly become a bolttleneck in scenarios were lots of tasks are being spawed and the supervisor process is not able to keep up.

In such cases we can use the partition supervisor that will by default start a dynamic supervisor for each core in our machine. So it will start multiple instances of the Task.Supervisor and then pick a random instance to start the task on.

For this we would need to define the child spec of the supervisor like so

# In the `child_spec` option we pass the name of the supervisor that we want to partition
children = [{PartitionSupervisor, child_spec: Task.Supervisor, name: ScalableTaskSupervisor}]

{:ok, supervisor_pid} = Supervisor.start_link(children, strategy: :one_for_one)

We can then start tasks via the partitioned task supervisor using the using the {:via, PartitionSupervisor, {name, key}} format, name is the name of the partition supervisor and key is the routing key.

Task.Supervisor.async(
  {:via, PartitionSupervisor, {ScalableTaskSupervisor, :my_routing_key}},
  fn ->
    :timer.sleep(200)
    IO.puts("Hello from a supervised task!")
  end
)

Kino.Process.render_sup_tree(supervisor_pid)

You’ll observe the initiation of multiple task supervisors, each associated with a specific partition. These supervisors operate under our “ScalableTaskSupervisor” partition supervisor, and our task was launched within one of these partitions.

Let’s now explore the behavior when a supervised task encounters an unexpected crash.

# Now lets spawn a supervised task process that will crash
Task.Supervisor.start_child(
  AwesomeTaskSupervisor,
  fn ->
    IO.puts("A task was started!")
    raise "boom"
  end,
  # restart process if the exit is abnormal, by default it is :temporary
  restart: :transient
)
|> IO.inspect(label: "Returned from call to start_child")

However, it’s important to note that when using the Task.Supervisor.start_child/2 function, we do not receive a task struct that can be directly awaited or used with other functions in the Task module. Instead, we receive the PID of the task process.

When using Task.Supervisor.start_child/2 the default supervisor restart strategy is :temporary so if a task process crashes, it will not be automatically restarted by the supervisor. In the example above we set the startegy to :transient so that the supervisor restarts the crashed task.

It’s worth mentioning that when using other functions in the Task.Supervisor module, such as async/3, async_nolink/3, etc, the spawned task processes also have a :temporary restart strategy, which cannot be changed. This means that if a task process crashes, it will not be automatically restarted by the supervisor.

Unlinked supervised tasks

To prevent linking a task with the caller process, we can utilize functions like async_nolink/3, async_stream_nolink/4, and others provided by the Task.Supervisor module.

By using these functions, the spawned tasks are not linked to the caller process. However, they are still monitored by the caller process. In the event of a task crash, the caller process remains unaffected and can still be informed about the crashed task through monitoring. Additionally, the unlinked caller process retains the ability to await the completion of the task.

This means if we want to avoid the caller process from exiting when a spawned task process exits abnormally while also retain the ability of the caller process to await the task we can use the functions like async_nolink in the Task.Supervisor module.

To gain a better understanding of this concept, let’s explore some examples using our AwesomeTaskSupervisor that we previously started.

unlinked_task =
  Task.Supervisor.async_nolink(AwesomeTaskSupervisor, fn ->
    :timer.sleep(300)
    1 + 2
  end)
  |> IO.inspect(label: "Unliked Task")

supervisor_pid = Process.whereis(AwesomeTaskSupervisor)

# Spawned task process is not linked to the caller
Process.info(self(), :links)
|> IO.inspect(label: "Parent process #{inspect(self())} links")

# Spawned task process is linked to the supervisor
Process.info(supervisor_pid, :links)
|> IO.inspect(label: "AwesomeTaskSupervisor process #{inspect(self())} links")

# Spawned task process appears as a child under the supervisor
Task.Supervisor.children(AwesomeTaskSupervisor)
|> IO.inspect(label: "AwesomeTaskSupervisor Children")

# We can still await the unlinked task
Task.await(unlinked_task)
|> IO.inspect(label: "Result from task")

Note this function requires the task supervisor to have :temporary as the :restart option (the default that cannot be changed), as async_nolink/3 keeps a direct reference to the task which is lost if the task is restarted.

Task.Supervisor.async_nolink/3 vs Task.Supervisor.start_child/3

  • Use async_nolink/3 when you need to await or yield the task’s result. The spawned task process is not linked to the caller process, but it is monitored. This allows the caller process to receive a message when the task exits and still await its completion.

  • Use start_child/3 when you want a fire-and-forget approach. The spawned task process is not linked to the caller process, and there is no monitoring or message communication. This is suitable for scenarios where you don’t need the task’s result or if it performs side-effects (like I/O) without the need for result handling.

The choice between start_child and async_nolink conveys the semantic intention. start_child indicates that you don’t care about the result, while async_nolink implies that you may have an interest in the task’s life and result, as the monitor will provide information about its termination.

Usage with OTP behaviours

When using async_nolink to create a task within an OTP behavior like GenServer, it’s important to match on the message received from the task in your GenServer.handle_info/2 callback.

The reply sent by the task will be in the format {ref, result}, where ref is the monitor reference held by the task struct and result is the return value of the task function.

Regardless of how the task created with async_nolink terminates, the caller’s process will always receive a :DOWN message with the same ref value held by the task struct. If the task terminates normally, the reason in the :DOWN message will be :normal.

Typically, async_nolink/3 is used when there is a possibility of the task failing, and you want to prevent it from causing the caller process to crash.

Let’s consider an example where we create a GenServer called MyDownloader that allows us to spawn tasks for downloading files. The GenServer process doesn’t get blocked during the download; instead, it delegates the downloading task to a separate task process. The GenServer saves the spawned task reference in a MapSet in its state and logs when a downloading task succeeds or crashes.

defmodule MyDownloader do
  use GenServer

  # == Public API ==

  def start_link() do
    GenServer.start_link(__MODULE__, :noop, name: __MODULE__)
  end

  def start_download(url) do
    GenServer.call(__MODULE__, {:start_download, url})
    :ok
  end

  def download(url) do
    # We are using get! so that in case of failures the task will crash
    HTTPoison.get!(url)
  end

  # == GenServer callbacks ==

  @impl true
  def init(_) do
    {:ok, MapSet.new()}
  end

  @impl true
  def handle_call({:start_download, url}, _from, state) do
    task = Task.Supervisor.async_nolink(AwesomeTaskSupervisor, __MODULE__, :download, [url])

    # Save the spawned task reference in a mapset in the GenServer state
    state = MapSet.put(state, task.ref)
    {:reply, :ok, state}
  end

  # The task completed successfully
  @impl true
  def handle_info({ref, response}, state) do
    # We don't care about the DOWN message now, so let's demonitor and flush it
    # Flushing will remove the {_, MonitorRef, _, _, _} message, if there is one, 
    # from the caller message queue after monitoring has been stopped.
    Process.demonitor(ref, [:flush])

    IO.inspect(response.body, label: "Task #{inspect(ref)} completed with result")

    # Remove finished tasks from the mapset
    state = MapSet.delete(state, ref)
    {:noreply, state}
  end

  # The task failed, that is we received a :DOWN message before the task reply message
  @impl true
  def handle_info({:DOWN, ref, :process, _pid, reason}, state) do
    IO.inspect(elem(reason, 0), label: "Task #{inspect(ref)} failed with reason")

    # Remove failed tasks from the mapset
    state = MapSet.delete(state, ref)
    {:noreply, state}
  end
end

Now lets take it for a spin…

MyDownloader.start_link()
MyDownloader.start_download("https://api.chucknorris.io/jokes/random")

MyDownloader.start_download(
  "https://file-examples.com/storage/fede3f30f864a1f979d2bf0/2017/10/file_example_JPG_100kB.jpg"
)

# Invalid download url, the spawned task to request this url will crash
MyDownloader.start_download("https://invalid-file-download.invalid")

In this example, all three download tasks run concurrently. Two of the download task processes succeed and send a reply message to the GenServer process, which was captured by the handle_info({ref, response}, state) callback. One of the task processes crashes, but the GenServer continues running since the tasks were unlinked.

Due to the task monitor, the GenServer was able to capture the :DOWN message using the handle_info({:DOWN, ref, :process, _pid, reason}, state) callback and was informed about the crashed task.

That concludes our exploration of tasks in Elixir. By now, you should have gained a solid understanding of the power and convenience that tasks offer. The Task module is widely used in practice and provides a straightforward and efficient way to manage concurrent work in Elixir. It is yet another tool in the Elixir ecosystem that empowers us to embrace concurrent and parallel programming with ease, unlocking the full potential of the language.

Happy tasking! ✨🚀

References

Navigation

Home Task async streamAgents