Async Await - With Visualization
Mix.install([
{:kino, "~> 0.12.0"}
])
Introduction
Please see my blog post The Mechanics of Async Await for details 🏴☠️
Observer
defmodule Observer do
use GenServer
def start_link(_opts \\ []) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
def init(state) do
{:ok, state}
end
def log(type, kind, send, recv, args \\ nil, loop \\ nil) do
GenServer.call(__MODULE__, {:event, type, kind, send, recv, args, loop})
end
def get do
GenServer.call(__MODULE__, :get)
end
def handle_call({:event, _type, _kind, _send, _recv, _args, _loop} = event, _from, state) do
{:reply, nil, state ++ [event]}
end
def handle_call(:get, _from, state) do
{:reply, state, state}
end
end
{:ok, pid} = Observer.start_link(nil)
Visualization
defmodule MermaidGenerator do
def generate_comp_diagram do
events =
Enum.filter(Observer.get(), fn {:event, part, _type, _kind, _send, _recv, _args} ->
part == :comp
end)
part =
events
|> Enum.flat_map(fn {:event, :comp, source_type, source_name, target_type, target_name,
_rel} ->
[{source_type, source_name}, {target_type, target_name}]
end)
|> Enum.uniq()
|> Enum.map(fn {type, name} = typeAndName ->
" #{hash(typeAndName)}(#{inspect(type)} #{inspect(name)})\n"
end)
body =
Enum.map(events, fn {:event, :comp, source_type, source_name, target_type, target_name, rel} ->
id1 = hash({source_type, source_name})
id2 = hash({target_type, target_name})
" #{id1}--#{rel}-->#{id2}\n"
end)
["graph TD;\n" | part ++ body]
|> Enum.join()
end
def generate_flow_diagram() do
events =
Enum.filter(Observer.get(), fn {:event, part, _type, _kind, _send, _recv, _args} ->
part == :flow
end)
part =
events
|> Enum.flat_map(fn {:event, _part, _type, _kind, send, recv, _args} -> [send, recv] end)
|> Enum.uniq()
|> Enum.map(fn name -> " participant #{hash(name)} as #{sanitize(name)}\n" end)
body =
Enum.map(events, fn {:event, part, type, kind, send, recv, args} ->
format_interaction(part, type, kind, send, recv, args)
end)
["sequenceDiagram\n" | part ++ body]
|> Enum.join()
end
defp format_interaction(:flow, :send, kind, send, recv, args) do
" #{hash(send)}->>#{hash(recv)}:#{kind} #{generate_message(:flow, :send, kind, send, recv, args)}\n"
end
defp format_interaction(:flow, :recv, kind, recv, send, args) do
" #{hash(recv)}-->>#{hash(recv)}:#{kind} #{generate_message(:flow, :recv, kind, send, recv, args)}\n"
end
defp generate_message(:flow, _type, :async, _source, _target, args) do
info = :erlang.fun_info(args)
name = Keyword.get(info, :index)
"Function(FID.#{name})"
end
defp generate_message(:flow, _type, :return_p, _source, _target, args) do
"Promise(#{sanitize(args)})"
end
defp generate_message(:flow, _type, :await, _source, _target, args) do
"Promise(#{sanitize(args)})"
end
defp generate_message(:flow, _type, :return_v, _source, _target, args) do
"Value(#{sanitize(args)})"
end
defp generate_message(:flow, _type, :spawn, _source, _target, args) do
info = :erlang.fun_info(args)
name = Keyword.get(info, :index)
"Function(FID.#{name})"
end
defp generate_message(:flow, _type, :compl, _source, _target, args) do
"Value(#{sanitize(args)})"
end
defp generate_message(_a, _b, _c, _d, _e, _f) do
"None"
end
def hash(v) do
"P#{:erlang.phash2(v)}"
end
defp sanitize(pid) do
pid
|> inspect
|> String.replace(~r/[#<>]/, "")
end
end
Event Loop
defmodule State do
defstruct promises: %{}, awaiters: %{}
# Initializes a new state
def new(), do: %__MODULE__{}
# Retrieves promise
def get_promise(state, pid) do
Map.get(state.promises, pid)
end
# Retrieves awaiters for a promise
def get_awaiter(state, pid) do
Map.get(state.awaiters, pid, [])
end
# Adds a new promise
def add_promise(state, pid) do
new_promises = Map.put(state.promises, pid, :pending)
new_awaiters = Map.put(state.awaiters, pid, [])
%State{state | promises: new_promises, awaiters: new_awaiters}
end
# Adds a caller to the awaiters list of a promise
def add_awaiter(state, pid, caller) do
new_awaiters = Map.put(state.awaiters, pid, [caller | Map.get(state.awaiters, pid, [])])
%State{state | awaiters: new_awaiters}
end
# Marks a promise as completed
def set_promise(state, pid, result) do
new_promises = Map.put(state.promises, pid, {:completed, result})
new_awaiters = Map.put(state.awaiters, pid, [])
%State{state | promises: new_promises, awaiters: new_awaiters}
end
end
defmodule EventLoop do
use GenServer
alias State
def start_link(_opts \\ []) do
GenServer.start_link(__MODULE__, State.new(), name: __MODULE__)
end
def init(state) do
{:ok, state}
end
def handle_call({:invoke, func, args}, {caller, _} = _from, state) do
# Flow diagram
Observer.log(:flow, :recv, :async, EventLoop, caller, func)
# Execute the function in a separate process
# Here, we are using the process id also as the promise id
callee =
spawn(fn ->
# Block the callee until the EventLoop has a chance to log the send event
receive do
:spawn ->
Observer.log(:flow, :recv, :spawn, self(), EventLoop, func)
v = apply(func, args)
Observer.log(:flow, :send, :compl, self(), EventLoop, v)
GenServer.call(EventLoop, {:return, self(), v})
end
end)
# Flow diagram
Observer.log(:flow, :send, :spawn, EventLoop, callee, func)
# Unblock
send(callee, :spawn)
new_state =
state
|> State.add_promise(callee)
# Flow diagram
Observer.log(:flow, :send, :return_p, EventLoop, caller, callee)
# Component Diagram
Observer.log(:comp, :exec, caller, :invocation, callee, :invoke)
Observer.log(:comp, :invocation, callee, :promise, callee, :create)
Observer.log(:comp, :invocation, callee, :exec, callee, :create)
{:reply, callee, new_state}
end
def handle_call({:await, promise}, {caller, _} = from, state) do
# Component Diagram
Observer.log(:comp, :exec, caller, :promise, promise, :await)
# Flow diagram
Observer.log(:flow, :recv, :await, EventLoop, caller, promise)
# The central if statement
case State.get_promise(state, promise) do
# Promise pending, defer response to completion
:pending ->
new_state =
state
|> State.add_awaiter(promise, from)
{:noreply, new_state}
# Promise completed, respond immedately
{:completed, result} ->
# Flow diagram
Observer.log(:flow, :send, :return_v, EventLoop, caller, result)
{:reply, result, state}
end
end
def handle_call({:return, callee, result}, {caller, _} = _from, state) do
# Component Diagram
Observer.log(:comp, :exec, caller, :promise, callee, :resolve)
# Flow diagram
Observer.log(:flow, :recv, :compl, EventLoop, caller, result)
# Send the result to all awaiting caller PIDs
Enum.each(State.get_awaiter(state, callee), fn {cid, _} = caller ->
# Flow diagram
Observer.log(:flow, :send, :return_v, EventLoop, cid, result)
GenServer.reply(caller, result)
end)
new_state =
state
|> State.set_promise(callee, result)
{:reply, nil, new_state}
end
end
{:ok, pid} = EventLoop.start_link(nil)
Async Await
defmodule Async do
def invoke(func, args \\ []) do
Observer.log(:flow, :send, :async, self(), EventLoop, func)
p = GenServer.call(EventLoop, {:invoke, func, args})
Observer.log(:flow, :recv, :return_p, self(), EventLoop, p)
p
end
def await(pid) do
Observer.log(:flow, :send, :await, self(), EventLoop, pid)
v = GenServer.call(EventLoop, {:await, pid})
Observer.log(:flow, :recv, :return_v, self(), EventLoop, v)
v
end
end
Application
IO.inspect(self())
outer =
Async.invoke(fn ->
IO.inspect(self())
inner =
Async.invoke(fn ->
IO.inspect(self())
42
end)
v = Async.await(inner)
2 * v
end)
IO.puts(Async.await(outer))
Note Sometimes Kino works, sometime Kino does not work, but I have not investigated when or when not. In case Kino does not work for you, switch out the comments, copy the output, and head over to https://mermaid.live to generate the diagram
Kino.Mermaid.new(MermaidGenerator.generate_flow_diagram())
# IO.puts(MermaidGenerator.generate_flow_diagram())
Note Sometimes Kino works, sometime Kino does not work, but I have not investigated when or when not. In case Kino does not work for you, switch out the comments, copy the output, and head over to https://mermaid.live to generate the diagram
Kino.Mermaid.new(MermaidGenerator.generate_comp_diagram())
# IO.puts(MermaidGenerator.generate_comp_diagram())