Powered by AppSignal & Oban Pro

Task

reading/task.livemd

Task

Mix.install([
  {:jason, "~> 1.4"},
  {:kino, "~> 0.8.0", override: true},
  {:youtube, github: "brooklinjazz/youtube"},
  {:hidden_cell, github: "brooklinjazz/hidden_cell"}
])

Navigation

Return Home Report An Issue

Setup

Ensure you type the ea keyboard shortcut to evaluate all Elixir cells before starting. Alternatively you can evaluate the Elixir cells as you read.

Review Questions

Upon completing this lesson, a student should be able to answer the following questions.

  • How do we use tasks to leverage concurrency?
  • How do we use tasks to send one-off fire-and-forget jobs?

Task

We’ve already seen we can use Kernel.spawn/1 or Kernal.spawn_link/1 to spawn a process that performs some work and then dies.

spawn_pid =
  spawn(fn ->
    IO.puts("Job Started")
    # simulating expensive process
    Process.sleep(1000)
    IO.puts("Job Ended")
  end)

When we want to execute some code in a process, we shouldn’t use Kernel.spawn/1 or Kernel.spawn_link/1 directly. Instead, we should rely on the Task module. The Task module allows us to spawn a process, perform some work in that process, then end the process when our work is finished.

Task is also OTP-compliant, meaning it conform to certain OTP conventions that improve error handling, and allow them to start under a supervisor.

Fire-and-Forget

We can use Task.start/1 to create a new short-lived process that dies when it’s function executes. This is a fire-and-forget process which does not block the caller process or return a response.

{:ok, task_pid} = Task.start(fn -> IO.puts("task ran!") end)

IO.puts("Parent keeps running")
Process.sleep(100)
Process.alive?(task_pid) || IO.puts("task is dead")

Awaiting Task Response

With Task, we can use async/1 and await/1 to spawn a process, perform some calculation, and the retrieve the value when it’s finished.

task =
  Task.async(fn ->
    # simulating expensive calculation
    Process.sleep(1000)
    "response!"
  end)

Task.await(task)

We can run two computations concurrently by separating them into two different Task processes. Here, we’re simulating a clock tick-tocking every second in two separate processes to demonstrate the run in parallel.

task1 =
  Task.async(fn ->
    IO.inspect("tick", label: "task 1")
    Process.sleep(2000)
    IO.inspect("tick", label: "task 1")
    Process.sleep(1000)
    "tick"
  end)

task2 =
  Task.async(fn ->
    Process.sleep(1000)
    IO.inspect("tock", label: "task 2")
    Process.sleep(2000)
    IO.inspect("tock", label: "task 2")
    "tock"
  end)

Task.await(task1) |> IO.inspect(label: "Task 1 Response")
Task.await(task2) |> IO.inspect(label: "Task 2 Response")

A computer with a multi-core processor can perform these concurrent computations in parallel, which may make our program faster. In reality, it’s a bit more complicated than this, but this is a reasonable mental model for now to understand why concurrency is useful for improving performance.

Here we use the par boxes to demonstrate operations happening in parallel.

sequenceDiagram
  par 
    ParentProcess ->> Task1: spawns
    ParentProcess ->> Task2: spawns
  end
  par
    Task1 ->> Task1: performs work
    Task2 ->> Task2: performs work
  end
  Task1 ->> ParentProcess: return awaited result
  Task2 ->> ParentProcess: return awaited result

Task.async/1 returns a Task struct, not a pid. The Task struct contains information about who the parent (:owner) process is, the task’s pid (:pid), and a reference (:ref) used to monitor if the task crashes.

Task.async(fn -> nil end)

To demonstrate the performance value of concurrency, let’s say we have two computations which each take 1 second, it would normally take us 2 seconds to run these tasks synchronously.

computation1 = fn -> Process.sleep(1000) end
computation2 = fn -> Process.sleep(1000) end

{microseconds, _result} =
  :timer.tc(fn ->
    computation1.()
    computation2.()
  end)

# expected to be ~2 seconds
microseconds / 1000 / 1000

By running these computations in parallel, we can theoretically reduce this time to 1 second instead of 2.

> Note, if your computer does not have multiple cores, then it will still take 2 seconds rather than the expected 1 second.

computation1 = fn -> Process.sleep(1000) end
computation2 = fn -> Process.sleep(1000) end

{microseconds, _result} =
  :timer.tc(fn ->
    task1 = Task.async(fn -> computation1.() end)
    task2 = Task.async(fn -> computation2.() end)

    Task.await(task1)
    Task.await(task2)
  end)

# expected to be ~1 second
microseconds / 1000 / 1000

Your Turn

Use Task.async/1 and Task.await/1 to demonstrate the performance benefits between synchronous execution and parallel execution.

You may consider using Process.sleep/1 to simulate an expensive computation.

Awaiting Many Tasks

When working with many parallel tasks, we can use enumeration to spawn many tasks.

tasks =
  Enum.map(1..5, fn each ->
    Task.async(fn ->
      Process.sleep(1000)
      each * 2
    end)
  end)

Then we can also use enumeration to await/1 each task.

Enum.map(tasks, fn task -> Task.await(task) end)

Alternatively, you can use the convenient Taskl.await_many/1 function instead.

tasks =
  Enum.map(1..5, fn each ->
    Task.async(fn ->
      Process.sleep(1000)
      each * 2
    end)
  end)

Task.await_many(tasks)

Timeouts

Task.await/1 pauses the current execution to wait until a task has finished. However, it will not wait forever. By default, Task.await/1 and Task.await_many/1 will wait for five seconds for the task to complete. If the task does not finish, it will raise an error.

task = Task.async(fn -> Process.sleep(6000) end)

Task.await(task)

If we want to wait for more or less time, we can override the default value. await/2 and await_many/2 accept a timeout value as the second argument to the function.

task = Task.async(fn -> Process.sleep(6000) end)

Task.await(task, 7000)
task1 = Task.async(fn -> Process.sleep(6000) end)
task2 = Task.async(fn -> Process.sleep(6000) end)

Task.await_many([task1, task2], 7000)

Your Turn

In the Elixir cell below, spawn a task which takes one second to complete. await/2 the task and alter the timeout value to be one second. Awaiting the task should crash.

Further Reading

Consider the following resource(s) to deepen your understanding of the topic.

Mark As Completed

file_name = Path.basename(Regex.replace(~r/#.+/, __ENV__.file, ""), ".livemd")

save_name =
  case Path.basename(__DIR__) do
    "reading" -> "task_reading"
    "exercises" -> "task_exercise"
  end

progress_path = __DIR__ <> "/../progress.json"
existing_progress = File.read!(progress_path) |> Jason.decode!()

default = Map.get(existing_progress, save_name, false)

form =
  Kino.Control.form(
    [
      completed: input = Kino.Input.checkbox("Mark As Completed", default: default)
    ],
    report_changes: true
  )

Task.async(fn ->
  for %{data: %{completed: completed}} <- Kino.Control.stream(form) do
    File.write!(
      progress_path,
      Jason.encode!(Map.put(existing_progress, save_name, completed), pretty: true)
    )
  end
end)

form

Commit Your Progress

Run the following in your command line from the curriculum folder to track and save your progress in a Git commit. Ensure that you do not already have undesired or unrelated changes by running git status or by checking the source control tab in Visual Studio Code.

$ git checkout -b task-reading
$ git add .
$ git commit -m "finish task reading"
$ git push origin task-reading

Create a pull request from your task-reading branch to your solutions branch. Please do not create a pull request to the DockYard Academy repository as this will spam our PR tracker.

DockYard Academy Students Only:

Notify your teacher by including @BrooklinJazz in your PR description to get feedback. You (or your teacher) may merge your PR into your solutions branch after review.

If you are interested in joining the next academy cohort, sign up here to receive more news when it is available.

Up Next

Previous Next
Games: Supervised Score Tracker Task Supervisor