Asynchronous Messages
Mix.install([
{:jason, "~> 1.4"},
{:kino, "~> 0.8.0", override: true},
{:youtube, github: "brooklinjazz/youtube"},
{:hidden_cell, github: "brooklinjazz/hidden_cell"}
])
Navigation
Setup
Ensure you type the ea keyboard shortcut to evaluate all Elixir cells before starting. Alternatively, you can evaluate the Elixir cells as you read.
Review Questions
Upon completing this lesson, a student should be able to answer the following questions.
- Why might we use asynchronous vs synchronous messages?
- How do we send a GenServer an asynchronous message and handle it?
Fire and Forget
So far, we’ve seen only synchronous message sending, where every message blocks the calling process. To demonstrate, here’s an example that simulates a GenServer with a slower operation.
defmodule SlowServer do
use GenServer
@impl true
def init(_opts) do
{:ok, nil}
end
@impl true
def handle_call(:sleep, _from, state) do
IO.puts("Starting sleep")
Process.sleep(1000)
IO.puts("Ending sleep")
{:reply, "response!", state}
end
end
{:ok, pid} = GenServer.start_link(SlowServer, 0)
The caller process only continues when the slow server finishes the handle_call/3 function.
IO.puts("Caller starting")
response = GenServer.call(pid, :sleep)
IO.puts("Caller received: #{response}")
Most of the time, we want this behavior. This makes our code predictable, especially when we want to receive a response from the GenServer. However, sometimes we want to fire-and-forget, meaning we send the GenServer a message so it can handle some work for us or update it’s state without blocking the caller process.
For example, we can modify our CounterServer to use handle_cast/2 to increment the count, then handle_call/3 to return the updated count. We’ll use Process.sleep/1 to increment the count after one second.
handle_cast/2 does not need to know the parent process, because it doesn’t return a response. It returns {:noreply, state}.
defmodule Counter do
use GenServer
@impl true
def init(_opts) do
{:ok, 0}
end
@impl true
def handle_call(:get_count, _from, state) do
{:reply, state, state}
end
@impl true
def handle_cast(:increment, state) do
IO.puts("count: #{state + 1}")
Process.sleep(1000)
{:noreply, state + 1}
end
end
{:ok, pid} = GenServer.start_link(Counter, 0)
Now, even though it takes a second to increment the count, the caller process will be able to continue.
GenServer.cast(pid, :increment)
GenServer.cast(pid, :increment)
GenServer.cast(pid, :increment)
IO.puts("not blocked!")
Your Turn
Create a Journal GenServer that uses handle_cast/2 to add journal entries. You can ensure this updates the Journal process’s state using :sys.get_state/1.
{:ok, journal_pid} = GenServer.start_link(Journal, [])
GenServer.cast(journal_pid, {:add_entry, "first entry"})
["first entry"] = :sys.get_state(journal_pid)
Example Solution
defmodule Journal do
use GenServer
@impl true
def init(_opts) do
{:ok, []}
end
@impl true
def handle_cast({:add_entry, message}, entries) do
{:noreply, [message | entries]}
end
end
Process
We’ve seen we can use some Kernel functions for working with processes.
pid =
spawn(fn ->
receive do
:message -> IO.puts("I received a message")
end
end)
send(pid, :message)
Generally speaking, we won’t use these functions directly and will use the GenServer.call/3 and GenServer.cast/2 functions for sending messages.
We’ll also use the Process which provides some generic functions for working with processes and sending messages.
For example, we can send messages using Process.send/3 the same way we can with Kernel.send/2.
Process.send(self(), :message, [])
receive do
:message -> "I received a message"
end
We also have a Process.send_after/3 function for sending processes after a certain number of milliseconds.
Process.send_after(self(), :message, 2000)
receive do
:message -> "I received a message after 2 seconds"
end
A GenServer process can receive generic messages from other processes. It handles them using a handle_info/2 callback function.
defmodule Receiver do
use GenServer
def init(_opts) do
{:ok, []}
end
def handle_info(:message, state) do
IO.puts("Received a message!")
{:noreply, state}
end
end
Any messages sent using the Process module or generic Kernel functions can be handled using the generic handle_info/2 function.
{:ok, pid} = GenServer.start_link(Receiver, [])
Process.send(pid, :message, [])
A GenServer should not call it’s own callback functions, as this can cause it to misbehave.
defmodule MisbehavingGenServer do
use GenServer
def init(_opts) do
{:ok, []}
end
def handle_call(:message, _from, state) do
GenServer.call(self(), :talking_to_myself)
IO.puts("Received a message!")
{:reply, state}
end
def handle_call(:talking_to_myself, _from, state) do
IO.puts("Sent a message to myself")
{:reply, state}
end
end
Uncomment and evaluate the code below, and you’ll see the MisbehavingGenServer crashes, because it tries to send itself a message. Re-comment it when finished.
# {:ok, pid} = GenServer.start_link(MisbehavingGenServer, [])
# GenServer.call(pid, :message)
Instead of sending itself a message using GenServer.cast/2 or GenServer.call/3, a GenServer can send itself generic messages using Process.send/3 or Process.send_after/4.
Often we’ll use this to schedule some kind of recurring work. Here we have an example of a counter that automatically increments every second.
defmodule IncrementingCounter do
def init(_opts) do
schedule_increment()
{:ok, 0}
end
def handle_info(:increment, state) do
schedule_increment()
{:noreply, state + 1}
end
defp schedule_increment do
Process.send_after(self(), :increment, 1000)
end
end
{:ok, counter_pid} = GenServer.start_link(IncrementingCounter, [])
Re-evaluate the cell to see that the counter is constantly incrementing.
:sys.get_state(counter_pid)
Your Turn
Create a DecrementingCounter module which stores a counter in its state and decrements it every 500 milliseconds.
Example Solution
defmodule DecrementingCounter do
def init(_opts) do
schedule_increment()
{:ok, 0}
end
def handle_info(:decrement, state) do
schedule_increment()
{:noreply, state - 1}
end
defp schedule_increment do
Process.send_after(self(), :decrement, 500)
end
end
{:ok, counter_pid} = GenServer.start_link(DecrementingCounter, [])
Further Reading
Consider the following resource(s) to deepen your understanding of the topic.
Mark As Completed
file_name = Path.basename(Regex.replace(~r/#.+/, __ENV__.file, ""), ".livemd")
save_name =
case Path.basename(__DIR__) do
"reading" -> "async_messages_reading"
"exercises" -> "async_messages_exercise"
end
progress_path = __DIR__ <> "/../progress.json"
existing_progress = File.read!(progress_path) |> Jason.decode!()
default = Map.get(existing_progress, save_name, false)
form =
Kino.Control.form(
[
completed: input = Kino.Input.checkbox("Mark As Completed", default: default)
],
report_changes: true
)
Task.async(fn ->
for %{data: %{completed: completed}} <- Kino.Control.stream(form) do
File.write!(
progress_path,
Jason.encode!(Map.put(existing_progress, save_name, completed), pretty: true)
)
end
end)
form
Commit Your Progress
Run the following in your command line from the curriculum folder to track and save your progress in a Git commit.
Ensure that you do not already have undesired or unrelated changes by running git status or by checking the source control tab in Visual Studio Code.
$ git checkout -b async-messages-reading
$ git add .
$ git commit -m "finish async messages reading"
$ git push origin async-messages-reading
Create a pull request from your async-messages-reading branch to your solutions branch.
Please do not create a pull request to the DockYard Academy repository as this will spam our PR tracker.
DockYard Academy Students Only:
Notify your teacher by including @BrooklinJazz in your PR description to get feedback.
You (or your teacher) may merge your PR into your solutions branch after review.
If you are interested in joining the next academy cohort, sign up here to receive more news when it is available.
Up Next
| Previous | Next |
|---|---|
| Pokemon Server | Mailbox Server |