Powered by AppSignal & Oban Pro

Task

notebooks/task.livemd

Task

Mix.install([
  {:req, "~> 0.3"}
])

description

Task module

  • OTP compliant concurrency primitive

  • perform work in separate processes

  • can be used in two different ways:

    • awaited
    • non-awaited

According to the doc

> …processes meant to execute one particular action throughout their lifetime, often with little or no communication with other processes. The most common use case for tasks is to convert sequential code into concurrent code by computing a value asynchronously

Task.async with Task.await

expensive_work = fn -> Process.sleep(1_500) end

# measure how long our code takes to execute
{time, _result} =
  :timer.tc(fn ->
    1..10
    |> Enum.map(fn _ ->
      # the spawned process is linked to and monitored by the parent process
      # if it results in an error, the parent process is terminated
      # if the parent process is terminated, the task process is terminated
      Task.async(expensive_work)
    end)
    |> Task.await_many()
  end)

System.convert_time_unit(time, :microsecond, :millisecond)

If timeouts are unacceptable for the operation at hand, it may be best to let the task and the parent process crash using await and rely instead on a supervisor to bring everything back up and attempt the task again

task =
  Task.async(fn ->
    Process.sleep(1000)
    :task_completed
  end)

:task_completed = Task.await(task)

# this will crash
Task.await(task, 500)

Task.async with Task.yield

task =
  Task.async(fn ->
    Process.sleep(1000)
    :task_completed
  end)

{:ok, :task_completed} = Task.yield(task)

# this will not crash
nil = Task.yield(task, 500)

Task.async_stream

## HTTP stress tester using Task.async_stream/3

opts = %{
  total_requests: 100,
  # how many tasks will be running simultaneously.
  max_concurrency: 10,
  url: "https://www.google.com"
}

average_time =
  1..opts.total_requests
  |> Task.async_stream(
    fn _ ->
      start_time = System.monotonic_time()
      Req.get!(opts.url)
      end_time = System.monotonic_time()
      end_time - start_time
    end,
    max_concurrency: opts.max_concurrency
  )
  |> Enum.reduce(0, fn {:ok, req_time}, acc ->
    acc + req_time
  end)
  |> System.convert_time_unit(:native, :millisecond)
  |> Kernel./(opts.total_requests)
  |> dbg()

Compare task functions

defmodule MyTask do
  defmodule Mailer do
    # Sends one email
    def deliver_now("error@example.com" = _email) do
      # raise "Oops, couldn't send email to #{email}!"
      :error
    end

    def deliver_now(email) do
      Process.sleep(:timer.seconds(2))
      IO.puts("Email to #{email} sent")

      {:ok, "email_sent"}
    end
  end

  # Send one by one sequentially
  def send_emails(emails, :sequential) do
    Enum.each(emails, &Mailer.deliver_now/1)
  end

  # Send async then do not care about the result
  def send_emails(emails, :async_forget) do
    Enum.each(emails, fn email ->
      Task.start_link(fn -> Mailer.deliver_now(email) end)
    end)
  end

  # Send async and get the result
  def send_emails(emails, :async_await) do
    emails
    |> Enum.map(fn email ->
      Task.async(fn -> Mailer.deliver_now(email) end)
    end)
    |> Enum.map(&Task.await/1)
  end

  # Send async with concurrency limit (back pressure) and do not care about the result
  def send_emails(emails, :async_stream_forget) do
    emails
    |> Task.async_stream(&Mailer.deliver_now/1)
    |> Stream.run()
  end

  # If we do not care about the order of the result, we can potentially speed up the operation
  def send_emails(emails, :async_stream_unordered) do
    emails
    |> Task.async_stream(&Mailer.deliver_now/1, order: false)
    |> Enum.to_list()
  end

  # We can kill tasks that take longer than timeout (default 5 seconds) instead of exiting
  def send_emails(emails, :async_stream_kill_task_on_timeout) do
    emails
    |> Task.async_stream(&Mailer.deliver_now/1, on_timeout: :kill_task)
    |> Enum.to_list()
  end

  # The caller won't crash when a task is crashed.
  def send_emails(emails, :async_stream_supervised) do
    # Custom TaskSupervisor must be already started
    MyApp.TaskSupervisor
    |> Task.Supervisor.async_stream_nolink(emails, &Mailer.deliver_now/1)
    |> Enum.to_list()
  end
end
emails = ["111@example.com", "error@example.com", "222@example.com"]

# MyTask.send_emails(emails, :sequential)
# MyTask.send_emails(emails, :async_forget)
# MyTask.send_emails(emails, :async_await)
MyTask.send_emails(emails, :async_stream_forget)
# MyTask.send_emails(emails, :async_stream_unordered)
# MyTask.send_emails(emails, :async_stream_kill_task_on_timeout)
# MyTask.send_emails(emails, :async_stream_supervised)