Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Async Await - With Visualization

async_await_with_visualization.livemd

Async Await - With Visualization

Mix.install([
  {:kino, "~> 0.12.0"}
])

Introduction

Please see my blog post The Mechanics of Async Await for details 🏴‍☠️

Observer

defmodule Observer do
  use GenServer

  def start_link(_opts \\ []) do
    GenServer.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init(state) do
    {:ok, state}
  end

  def log(type, kind, send, recv, args \\ nil, loop \\ nil) do
    GenServer.call(__MODULE__, {:event, type, kind, send, recv, args, loop})
  end

  def get do
    GenServer.call(__MODULE__, :get)
  end

  def handle_call({:event, _type, _kind, _send, _recv, _args, _loop} = event, _from, state) do
    {:reply, nil, state ++ [event]}
  end

  def handle_call(:get, _from, state) do
    {:reply, state, state}
  end
end
{:ok, pid} = Observer.start_link(nil)

Visualization

defmodule MermaidGenerator do
  def generate_comp_diagram do
    events =
      Enum.filter(Observer.get(), fn {:event, part, _type, _kind, _send, _recv, _args} ->
        part == :comp
      end)

    part =
      events
      |> Enum.flat_map(fn {:event, :comp, source_type, source_name, target_type, target_name,
                           _rel} ->
        [{source_type, source_name}, {target_type, target_name}]
      end)
      |> Enum.uniq()
      |> Enum.map(fn {type, name} = typeAndName ->
        "  #{hash(typeAndName)}(#{inspect(type)} #{inspect(name)})\n"
      end)

    body =
      Enum.map(events, fn {:event, :comp, source_type, source_name, target_type, target_name, rel} ->
        id1 = hash({source_type, source_name})
        id2 = hash({target_type, target_name})
        "  #{id1}--#{rel}-->#{id2}\n"
      end)

    ["graph TD;\n" | part ++ body]
    |> Enum.join()
  end

  def generate_flow_diagram() do
    events =
      Enum.filter(Observer.get(), fn {:event, part, _type, _kind, _send, _recv, _args} ->
        part == :flow
      end)

    part =
      events
      |> Enum.flat_map(fn {:event, _part, _type, _kind, send, recv, _args} -> [send, recv] end)
      |> Enum.uniq()
      |> Enum.map(fn name -> "  participant #{hash(name)} as #{sanitize(name)}\n" end)

    body =
      Enum.map(events, fn {:event, part, type, kind, send, recv, args} ->
        format_interaction(part, type, kind, send, recv, args)
      end)

    ["sequenceDiagram\n" | part ++ body]
    |> Enum.join()
  end

  defp format_interaction(:flow, :send, kind, send, recv, args) do
    "  #{hash(send)}->>#{hash(recv)}:#{kind} #{generate_message(:flow, :send, kind, send, recv, args)}\n"
  end

  defp format_interaction(:flow, :recv, kind, recv, send, args) do
    "  #{hash(recv)}-->>#{hash(recv)}:#{kind} #{generate_message(:flow, :recv, kind, send, recv, args)}\n"
  end

  defp generate_message(:flow, _type, :async, _source, _target, args) do
    info = :erlang.fun_info(args)
    name = Keyword.get(info, :index)
    "Function(FID.#{name})"
  end

  defp generate_message(:flow, _type, :return_p, _source, _target, args) do
    "Promise(#{sanitize(args)})"
  end

  defp generate_message(:flow, _type, :await, _source, _target, args) do
    "Promise(#{sanitize(args)})"
  end

  defp generate_message(:flow, _type, :return_v, _source, _target, args) do
    "Value(#{sanitize(args)})"
  end

  defp generate_message(:flow, _type, :spawn, _source, _target, args) do
    info = :erlang.fun_info(args)
    name = Keyword.get(info, :index)
    "Function(FID.#{name})"
  end

  defp generate_message(:flow, _type, :compl, _source, _target, args) do
    "Value(#{sanitize(args)})"
  end

  defp generate_message(_a, _b, _c, _d, _e, _f) do
    "None"
  end

  def hash(v) do
    "P#{:erlang.phash2(v)}"
  end

  defp sanitize(pid) do
    pid
    |> inspect
    |> String.replace(~r/[#<>]/, "")
  end
end

Event Loop

defmodule State do
  defstruct promises: %{}, awaiters: %{}

  # Initializes a new state
  def new(), do: %__MODULE__{}

  # Retrieves promise
  def get_promise(state, pid) do
    Map.get(state.promises, pid)
  end

  # Retrieves awaiters for a promise
  def get_awaiter(state, pid) do
    Map.get(state.awaiters, pid, [])
  end

  # Adds a new promise
  def add_promise(state, pid) do
    new_promises = Map.put(state.promises, pid, :pending)
    new_awaiters = Map.put(state.awaiters, pid, [])
    %State{state | promises: new_promises, awaiters: new_awaiters}
  end

  # Adds a caller to the awaiters list of a promise
  def add_awaiter(state, pid, caller) do
    new_awaiters = Map.put(state.awaiters, pid, [caller | Map.get(state.awaiters, pid, [])])
    %State{state | awaiters: new_awaiters}
  end

  # Marks a promise as completed
  def set_promise(state, pid, result) do
    new_promises = Map.put(state.promises, pid, {:completed, result})
    new_awaiters = Map.put(state.awaiters, pid, [])
    %State{state | promises: new_promises, awaiters: new_awaiters}
  end
end
defmodule EventLoop do
  use GenServer

  alias State

  def start_link(_opts \\ []) do
    GenServer.start_link(__MODULE__, State.new(), name: __MODULE__)
  end

  def init(state) do
    {:ok, state}
  end

  def handle_call({:invoke, func, args}, {caller, _} = _from, state) do
    # Flow diagram
    Observer.log(:flow, :recv, :async, EventLoop, caller, func)
    # Execute the function in a separate process
    # Here, we are using the process id also as the promise id 
    callee =
      spawn(fn ->
        # Block the callee until the EventLoop has a chance to log the send event
        receive do
          :spawn ->
            Observer.log(:flow, :recv, :spawn, self(), EventLoop, func)
            v = apply(func, args)
            Observer.log(:flow, :send, :compl, self(), EventLoop, v)
            GenServer.call(EventLoop, {:return, self(), v})
        end
      end)

    # Flow diagram
    Observer.log(:flow, :send, :spawn, EventLoop, callee, func)
    # Unblock
    send(callee, :spawn)

    new_state =
      state
      |> State.add_promise(callee)

    # Flow diagram
    Observer.log(:flow, :send, :return_p, EventLoop, caller, callee)
    # Component Diagram
    Observer.log(:comp, :exec, caller, :invocation, callee, :invoke)
    Observer.log(:comp, :invocation, callee, :promise, callee, :create)
    Observer.log(:comp, :invocation, callee, :exec, callee, :create)

    {:reply, callee, new_state}
  end

  def handle_call({:await, promise}, {caller, _} = from, state) do
    # Component Diagram
    Observer.log(:comp, :exec, caller, :promise, promise, :await)
    # Flow diagram
    Observer.log(:flow, :recv, :await, EventLoop, caller, promise)
    # The central if statement
    case State.get_promise(state, promise) do
      # Promise pending, defer response to completion
      :pending ->
        new_state =
          state
          |> State.add_awaiter(promise, from)

        {:noreply, new_state}

      # Promise completed, respond immedately
      {:completed, result} ->
        # Flow diagram
        Observer.log(:flow, :send, :return_v, EventLoop, caller, result)
        {:reply, result, state}
    end
  end

  def handle_call({:return, callee, result}, {caller, _} = _from, state) do
    # Component Diagram
    Observer.log(:comp, :exec, caller, :promise, callee, :resolve)
    # Flow diagram
    Observer.log(:flow, :recv, :compl, EventLoop, caller, result)
    # Send the result to all awaiting caller PIDs
    Enum.each(State.get_awaiter(state, callee), fn {cid, _} = caller ->
      # Flow diagram
      Observer.log(:flow, :send, :return_v, EventLoop, cid, result)
      GenServer.reply(caller, result)
    end)

    new_state =
      state
      |> State.set_promise(callee, result)

    {:reply, nil, new_state}
  end
end
{:ok, pid} = EventLoop.start_link(nil)

Async Await

defmodule Async do
  def invoke(func, args \\ []) do
    Observer.log(:flow, :send, :async, self(), EventLoop, func)
    p = GenServer.call(EventLoop, {:invoke, func, args})
    Observer.log(:flow, :recv, :return_p, self(), EventLoop, p)
    p
  end

  def await(pid) do
    Observer.log(:flow, :send, :await, self(), EventLoop, pid)
    v = GenServer.call(EventLoop, {:await, pid})
    Observer.log(:flow, :recv, :return_v, self(), EventLoop, v)
    v
  end
end

Application

IO.inspect(self())

outer =
  Async.invoke(fn ->
    IO.inspect(self())

    inner =
      Async.invoke(fn ->
        IO.inspect(self())

        42
      end)

    v = Async.await(inner)

    2 * v
  end)

IO.puts(Async.await(outer))

Note Sometimes Kino works, sometime Kino does not work, but I have not investigated when or when not. In case Kino does not work for you, switch out the comments, copy the output, and head over to https://mermaid.live to generate the diagram

Kino.Mermaid.new(MermaidGenerator.generate_flow_diagram())
# IO.puts(MermaidGenerator.generate_flow_diagram())

Note Sometimes Kino works, sometime Kino does not work, but I have not investigated when or when not. In case Kino does not work for you, switch out the comments, copy the output, and head over to https://mermaid.live to generate the diagram

Kino.Mermaid.new(MermaidGenerator.generate_comp_diagram())
# IO.puts(MermaidGenerator.generate_comp_diagram())