Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Async Await

async_await.livemd

Async Await

Mix.install([])

Introduction

Please see my blog post The Mechanics of Async Await for details 🏴‍☠️

Event Loop

defmodule State do
  defstruct promises: %{}, awaiters: %{}

  # Initializes a new state
  def new(), do: %__MODULE__{}

  # Retrieves promise
  def get_promise(state, pid) do
    Map.get(state.promises, pid)
  end

  # Retrieves awaiters for a promise
  def get_awaiter(state, pid) do
    Map.get(state.awaiters, pid, [])
  end

  # Adds a new promise
  def add_promise(state, pid) do
    new_promises = Map.put(state.promises, pid, :pending)
    new_awaiters = Map.put(state.awaiters, pid, [])
    %State{state | promises: new_promises, awaiters: new_awaiters}
  end

  # Adds a caller to the awaiters list of a promise
  def add_awaiter(state, pid, caller) do
    new_awaiters = Map.put(state.awaiters, pid, [caller | Map.get(state.awaiters, pid, [])])
    %State{state | awaiters: new_awaiters}
  end

  # Marks a promise as completed
  def set_promise(state, pid, result) do
    new_promises = Map.put(state.promises, pid, {:completed, result})
    new_awaiters = Map.put(state.awaiters, pid, [])
    %State{state | promises: new_promises, awaiters: new_awaiters}
  end
end
defmodule EventLoop do
  use GenServer

  alias State

  def start_link(_opts \\ []) do
    GenServer.start_link(__MODULE__, State.new(), name: __MODULE__)
  end

  def init(state) do
    {:ok, state}
  end

  def handle_call({:invoke, func, args}, _, state) do
    # Here, we are using the process id also as the promise id 
    callee =
      spawn(fn ->
        GenServer.call(EventLoop, {:return, self(), apply(func, args)})
      end)

    new_state =
      state
      |> State.add_promise(callee)

    {:reply, callee, new_state}
  end

  def handle_call({:await, promise}, from, state) do
    # The central if statement
    case State.get_promise(state, promise) do
      # Promise pending, defer response to completion
      :pending ->
        new_state =
          state
          |> State.add_awaiter(promise, from)

        {:noreply, new_state}

      # Promise completed, respond immedately
      {:completed, result} ->
        {:reply, result, state}
    end
  end

  def handle_call({:return, callee, result}, _, state) do
    Enum.each(State.get_awaiter(state, callee), fn caller ->
      GenServer.reply(caller, result)
    end)

    new_state =
      state
      |> State.set_promise(callee, result)

    {:reply, nil, new_state}
  end
end
{:ok, pid} = EventLoop.start_link(nil)

Async Await

defmodule Async do
  def invoke(func, args \\ []) do
    GenServer.call(EventLoop, {:invoke, func, args})
  end

  def await(pid) do
    GenServer.call(EventLoop, {:await, pid})
  end
end

Application

IO.inspect(self())

outer =
  Async.invoke(fn ->
    IO.inspect(self())

    inner =
      Async.invoke(fn ->
        IO.inspect(self())

        42
      end)

    v = Async.await(inner)

    2 * v
  end)

IO.puts(Async.await(outer))