Task
Mix.install([
{:jason, "~> 1.4"},
{:kino, "~> 0.8.0", override: true},
{:youtube, github: "brooklinjazz/youtube"},
{:hidden_cell, github: "brooklinjazz/hidden_cell"}
])
Navigation
Setup
Ensure you type the ea keyboard shortcut to evaluate all Elixir cells before starting. Alternatively you can evaluate the Elixir cells as you read.
Review Questions
Upon completing this lesson, a student should be able to answer the following questions.
- How do we use tasks to leverage concurrency?
- How do we use tasks to send one-off fire-and-forget jobs?
Task
We’ve already seen we can use Kernel.spawn/1 or Kernal.spawn_link/1 to spawn a process that performs some work and then dies.
spawn_pid =
spawn(fn ->
IO.puts("Job Started")
# simulating expensive process
Process.sleep(1000)
IO.puts("Job Ended")
end)
When we want to execute some code in a process, we shouldn’t use Kernel.spawn/1 or Kernel.spawn_link/1 directly. Instead, we should rely on the Task module. The Task module allows us to spawn a process, perform some work in that process, then end the process when our work is finished.
Task is also OTP-compliant, meaning it conform to certain OTP conventions that improve error handling, and allow them to start under a supervisor.
Fire-and-Forget
We can use Task.start/1 to create a new short-lived process that dies when it’s function executes. This is a fire-and-forget process which does not block the caller process or return a response.
{:ok, task_pid} = Task.start(fn -> IO.puts("task ran!") end)
IO.puts("Parent keeps running")
Process.sleep(100)
Process.alive?(task_pid) || IO.puts("task is dead")
Awaiting Task Response
With Task, we can use async/1 and await/1 to spawn a process, perform some calculation, and the retrieve the value when it’s finished.
task =
Task.async(fn ->
# simulating expensive calculation
Process.sleep(1000)
"response!"
end)
Task.await(task)
We can run two computations concurrently by separating them into two different Task processes. Here, we’re simulating a clock tick-tocking every second in two separate processes to demonstrate the run in parallel.
task1 =
Task.async(fn ->
IO.inspect("tick", label: "task 1")
Process.sleep(2000)
IO.inspect("tick", label: "task 1")
Process.sleep(1000)
"tick"
end)
task2 =
Task.async(fn ->
Process.sleep(1000)
IO.inspect("tock", label: "task 2")
Process.sleep(2000)
IO.inspect("tock", label: "task 2")
"tock"
end)
Task.await(task1) |> IO.inspect(label: "Task 1 Response")
Task.await(task2) |> IO.inspect(label: "Task 2 Response")
A computer with a multi-core processor can perform these concurrent computations in parallel, which may make our program faster. In reality, it’s a bit more complicated than this, but this is a reasonable mental model for now to understand why concurrency is useful for improving performance.
Here we use the par boxes to demonstrate operations happening in parallel.
sequenceDiagram
par
ParentProcess ->> Task1: spawns
ParentProcess ->> Task2: spawns
end
par
Task1 ->> Task1: performs work
Task2 ->> Task2: performs work
end
Task1 ->> ParentProcess: return awaited result
Task2 ->> ParentProcess: return awaited result
Task.async/1 returns a Task struct, not a pid. The Task struct
contains information about who the parent (:owner) process is, the task’s pid (:pid), and a reference (:ref) used to monitor if the task crashes.
Task.async(fn -> nil end)
To demonstrate the performance value of concurrency, let’s say we have two computations which each take 1 second, it would normally take us 2 seconds
to run these tasks synchronously.
computation1 = fn -> Process.sleep(1000) end
computation2 = fn -> Process.sleep(1000) end
{microseconds, _result} =
:timer.tc(fn ->
computation1.()
computation2.()
end)
# expected to be ~2 seconds
microseconds / 1000 / 1000
By running these computations in parallel, we can theoretically reduce this time to 1 second instead of 2.
> Note, if your computer does not have multiple cores, then it will still take 2 seconds rather than the expected 1 second.
computation1 = fn -> Process.sleep(1000) end
computation2 = fn -> Process.sleep(1000) end
{microseconds, _result} =
:timer.tc(fn ->
task1 = Task.async(fn -> computation1.() end)
task2 = Task.async(fn -> computation2.() end)
Task.await(task1)
Task.await(task2)
end)
# expected to be ~1 second
microseconds / 1000 / 1000
Your Turn
Use Task.async/1 and Task.await/1 to demonstrate the performance benefits between synchronous execution and parallel execution.
You may consider using Process.sleep/1 to simulate an expensive computation.
Awaiting Many Tasks
When working with many parallel tasks, we can use enumeration to spawn many tasks.
tasks =
Enum.map(1..5, fn each ->
Task.async(fn ->
Process.sleep(1000)
each * 2
end)
end)
Then we can also use enumeration to await/1 each task.
Enum.map(tasks, fn task -> Task.await(task) end)
Alternatively, you can use the convenient Taskl.await_many/1 function instead.
tasks =
Enum.map(1..5, fn each ->
Task.async(fn ->
Process.sleep(1000)
each * 2
end)
end)
Task.await_many(tasks)
Timeouts
Task.await/1 pauses the current execution to wait until a task has finished. However, it will not wait forever. By default, Task.await/1 and Task.await_many/1 will wait for five seconds for the task to complete. If the task does not finish, it will raise an error.
task = Task.async(fn -> Process.sleep(6000) end)
Task.await(task)
If we want to wait for more or less time, we can override the default value. await/2 and await_many/2 accept
a timeout value as the second argument to the function.
task = Task.async(fn -> Process.sleep(6000) end)
Task.await(task, 7000)
task1 = Task.async(fn -> Process.sleep(6000) end)
task2 = Task.async(fn -> Process.sleep(6000) end)
Task.await_many([task1, task2], 7000)
Your Turn
In the Elixir cell below, spawn a task which takes one second to complete.
await/2 the task and alter the timeout value to be one second. Awaiting the task should crash.
Further Reading
Consider the following resource(s) to deepen your understanding of the topic.
Mark As Completed
file_name = Path.basename(Regex.replace(~r/#.+/, __ENV__.file, ""), ".livemd")
save_name =
case Path.basename(__DIR__) do
"reading" -> "task_reading"
"exercises" -> "task_exercise"
end
progress_path = __DIR__ <> "/../progress.json"
existing_progress = File.read!(progress_path) |> Jason.decode!()
default = Map.get(existing_progress, save_name, false)
form =
Kino.Control.form(
[
completed: input = Kino.Input.checkbox("Mark As Completed", default: default)
],
report_changes: true
)
Task.async(fn ->
for %{data: %{completed: completed}} <- Kino.Control.stream(form) do
File.write!(
progress_path,
Jason.encode!(Map.put(existing_progress, save_name, completed), pretty: true)
)
end
end)
form
Commit Your Progress
Run the following in your command line from the curriculum folder to track and save your progress in a Git commit.
Ensure that you do not already have undesired or unrelated changes by running git status or by checking the source control tab in Visual Studio Code.
$ git checkout -b task-reading
$ git add .
$ git commit -m "finish task reading"
$ git push origin task-reading
Create a pull request from your task-reading branch to your solutions branch.
Please do not create a pull request to the DockYard Academy repository as this will spam our PR tracker.
DockYard Academy Students Only:
Notify your teacher by including @BrooklinJazz in your PR description to get feedback.
You (or your teacher) may merge your PR into your solutions branch after review.
If you are interested in joining the next academy cohort, sign up here to receive more news when it is available.
Up Next
| Previous | Next |
|---|---|
| Games: Supervised Score Tracker | Task Supervisor |