Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Playing with priority queues

elixir/priority_queue/demo.livemd

Playing with priority queues

The PriorityQueue data structure

The PriorityQueue module represents the pure functional data structure. It uses Erlang’s :gb_trees and :queue data structures.

defmodule PriorityQueue do
  defstruct size: 0, tree: nil

  def new, do: %__MODULE__{size: 0, tree: :gb_trees.empty()}

  def empty?(%__MODULE__{size: 0}), do: true
  def empty?(%__MODULE__{}), do: false

  @doc ~S"""
  Returns the size of a `PriorityQueue

  ## Examples

      iex> PriorityQueue.new()
      ...>   |> PriorityQueue.size()
      0

      iex> PriorityQueue.new()
      ...>   |> PriorityQueue.push(1, "Some item")
      ...>   |> PriorityQueue.size()
      1

  """
  def size(%__MODULE__{size: n}), do: n

  defp push_into_queue(pq = %__MODULE__{size: n, tree: tree}, priority, value, operation)
       when is_integer(priority) do
    tree =
      case :gb_trees.lookup(priority, tree) do
        {:value, queue} ->
          queue =
            case operation do
              :regular -> :queue.in(value, queue)
              :requeue -> :queue.in_r(value, queue)
            end

          :gb_trees.update(priority, queue, tree)

        :none ->
          :gb_trees.insert(priority, :queue.in(value, :queue.new()), tree)
      end

    %{pq | size: n + 1, tree: tree}
  end

  @doc ~S"""
  Appends values into a `PriorityQueue`.

  ## Examples

      iex> queue =
      ...>   PriorityQueue.new()
      ...>   |> PriorityQueue.push(2, 3) # [{2, 3}]
      ...>   |> PriorityQueue.push(1, 1) # [{1, 1}, {2, 3}]
      ...>   |> PriorityQueue.push(3, 5) # [{1, 1}, {2, 3}, {3, 5}]
      ...>   |> PriorityQueue.push_r(2, 2) # [{1, 1}, {2, 2}, {2, 3}, {3, 5}]
      ...>   |> PriorityQueue.push(2, 4) # [{1, 1}, {2, 2}, {2, 3}, {2, 4}, {3, 5}]
      iex> { result, queue } = queue |> PriorityQueue.pop(); result
      { :value, 1, 1 }
      iex> { result, queue } = queue |> PriorityQueue.pop(); result
      { :value, 2, 2 }
      iex> { result, queue } = queue |> PriorityQueue.pop(); result
      { :value, 2, 3 }
      iex> { result, queue } = queue |> PriorityQueue.pop(); result
      { :value, 2, 4 }
      iex> { result, _queue } = queue |> PriorityQueue.pop(); result
      { :value, 3, 5 }
  """
  def push(pq = %__MODULE__{}, priority, value) when is_integer(priority) do
    push_into_queue(pq, priority, value, :regular)
  end

  def push_r(pq = %__MODULE__{}, priority, value) when is_integer(priority) do
    push_into_queue(pq, priority, value, :requeue)
  end

  @doc ~S"""
  Returns the pairs of a `PriorityQueue`.

  ## Examples

      iex> queue =
      ...>   PriorityQueue.new()
      ...>   |> PriorityQueue.push(2, "Lower prio")
      ...>   |> PriorityQueue.push(1, "Higher prio")
      ...>   |> PriorityQueue.push(3, :very_low_prio)
      ...>   |> PriorityQueue.push(2, { :job, "Some other thing in 2" })
      iex> { result, queue } = queue |> PriorityQueue.pop(); result
      { :value, 1, "Higher prio" }
      iex> { result, queue } = queue |> PriorityQueue.pop(); result
      { :value, 2, "Lower prio" }
      iex> { result, queue } = queue |> PriorityQueue.pop(); result
      { :value, 2, { :job, "Some other thing in 2" } }
      iex> { result, _queue } = queue |> PriorityQueue.pop(); result
      { :value, 3, :very_low_prio }

      iex> { result, _queue } = PriorityQueue.new()
      ...> |> PriorityQueue.pop()
      iex> result
      :none
  """
  def pop(pq = %__MODULE__{size: 0}), do: {:none, pq}

  def pop(%__MODULE__{size: n, tree: tree}) do
    {priority, queue, tree} = :gb_trees.take_smallest(tree)
    {{:value, value}, queue} = :queue.out(queue)

    tree =
      case :queue.is_empty(queue) do
        false -> :gb_trees.enter(priority, queue, tree)
        true -> tree
      end

    {{:value, priority, value}, %__MODULE__{size: n - 1, tree: tree}}
  end

  @doc ~S"""
  Returns the keys of a `PriorityQueue

  ## Examples

      iex> PriorityQueue.new()
      ...> |> PriorityQueue.push(2, "Lower prio")
      ...> |> PriorityQueue.push(1, "Higher prio")
      ...> |> PriorityQueue.keys()

      [ 1, 2 ]

  """
  def keys(%__MODULE__{tree: tree}), do: :gb_trees.keys(tree)

  defp values(pq = %__MODULE__{}, list) do
    case pq |> pop() do
      {:none, _} -> :lists.reverse(list)
      {{:value, _priority, value}, pq} -> values(pq, [value | list])
    end
  end

  @doc ~S"""
  Returns the values of a `PriorityQueue

  ## Examples

      iex> PriorityQueue.new()
      ...> |> PriorityQueue.push(2, "Lower prio")
      ...> |> PriorityQueue.push(1, "Higher prio")
      ...> |> PriorityQueue.values()

      [ "Higher prio", "Lower prio" ]

  """
  def values(pq = %__MODULE__{}) do
    values(pq, [])
  end

  defp pairs(pq = %__MODULE__{}, list) do
    case pq |> pop() do
      {:none, _} -> :lists.reverse(list)
      {{:value, priority, value}, pq} -> pairs(pq, [{priority, value} | list])
    end
  end

  @doc ~S"""
  Returns the pairs of a `PriorityQueue

  ## Examples

      iex> PriorityQueue.new()
      ...> |> PriorityQueue.push(2, "Lower prio")
      ...> |> PriorityQueue.push(1, "Higher prio")
      ...> |> PriorityQueue.push(3, :very_low_prio)
      ...> |> PriorityQueue.push(2, { :job, "Some other thing in 2" })
      ...> |> PriorityQueue.pairs()

      [
        { 1, "Higher prio" },
        { 2, "Lower prio" },
        { 2, { :job, "Some other thing in 2" } },
        { 3, :very_low_prio }
      ]

      iex> PriorityQueue.new()
      ...> |> PriorityQueue.push(100, 9)
      ...> |> PriorityQueue.push(1, 1)
      ...> |> PriorityQueue.push(2, 5)
      ...> |> PriorityQueue.push(1, 2)
      ...> |> PriorityQueue.push(2, 6)
      ...> |> PriorityQueue.push(1, 3)
      ...> |> PriorityQueue.push(10, 8)
      ...> |> PriorityQueue.push(1, 4)
      ...> |> PriorityQueue.push(2, 7)
      ...> |> PriorityQueue.pairs()

      [
        {1, 1}, {1, 2}, {1, 3}, {1, 4},
        {2, 5}, {2, 6}, {2, 7},
        {10, 8},
        {100, 9}
      ]
  """
  def pairs(pq = %__MODULE__{}) do
    pairs(pq, [])
  end
end
PriorityQueue.new()
|> PriorityQueue.push(100, "Hello")
|> PriorityQueue.push(1, "Robert")
|> PriorityQueue.push(2, 5)
|> PriorityQueue.push(1, "Dolev")
|> PriorityQueue.push(2, 6)
|> PriorityQueue.push(1, 3)
|> PriorityQueue.push(10, 8)
|> PriorityQueue.push(1, 4)
|> PriorityQueue.push(5, 7)
|> PriorityQueue.pairs()

The GenServer wrapped around the PriorityQueue data structure

The following module wraps the functional data structure in a GenServer.

  • Example on how to use {:noreply, ...} in a handle_call() callback.
defmodule PriorityQueueServer do
  use GenServer
  require Logger

  # Client
  def start_link(), do: GenServer.start_link(__MODULE__, PriorityQueue.new())

  def push(pid, priority, value), do: GenServer.cast(pid, {:push, priority, value})

  # :infinity,
  def pop(pid, timeout \\ 5000), do: GenServer.call(pid, :pop, timeout)

  # Server
  @impl true
  def init(state = %PriorityQueue{}), do: {:ok, state}

  @impl true
  def handle_cast({:push, priority, value}, old_state_priority_queue) do
    new_state = PriorityQueue.push(old_state_priority_queue, priority, value)
    {:noreply, new_state}
  end

  @impl true
  def handle_call(:pop, _from, old_state = %PriorityQueue{}) do
    {popped_item, new_state} = PriorityQueue.pop(old_state)
    {:reply, popped_item, new_state}
  end
end
{:ok, pid1} = PriorityQueueServer.start_link()
PriorityQueueServer.push(pid1, 10, "Hello")
:sys.get_state(pid1)
PriorityQueueServer.pop(pid1)
defmodule PriorityQueueServerWithConfirmation do
  use GenServer
  require Logger

  defstruct pq: nil, in_flight: nil

  # Client
  def start_link(),
    do:
      GenServer.start_link(__MODULE__, %__MODULE__{pq: PriorityQueue.new(), in_flight: Map.new()})

  def push(pid, priority, value), do: GenServer.cast(pid, {:push, priority, value})

  # :infinity,
  def pop(pid, timeout \\ 5000), do: GenServer.call(pid, {:pop, timeout})

  def finished(pid, ref), do: GenServer.call(pid, {:finished, ref})

  def length(pid), do: GenServer.call(pid, :length)

  # Server
  @impl true
  def init(state = %__MODULE__{}), do: {:ok, state}

  @impl true
  def handle_cast({:push, priority, value}, state = %__MODULE__{pq: pq}) do
    state = %{state | pq: PriorityQueue.push(pq, priority, value)}
    {:noreply, state}
  end

  @impl true
  def handle_call({:pop, timeout}, _from, state = %__MODULE__{pq: pq, in_flight: in_flight}) do
    {item, new_pq} = PriorityQueue.pop(pq)

    case item do
      :none ->
        {:reply, :none, state}

      {:value, priority, value} ->
        ref = make_ref()
        in_flight = Map.put(in_flight, ref, {priority, value})
        state = %{state | pq: new_pq, in_flight: in_flight}

        # Schedule an event to check whether the item has been processed, and re-inject if needed
        Process.send_after(self(), {:reschedule, ref}, timeout)

        {:reply, {:value, priority, value, ref}, state}
    end
  end

  @impl true
  def handle_call({:finished, ref}, _from, state = %__MODULE__{in_flight: in_flight}) do
    case Map.pop(in_flight, ref) do
      {nil, _} ->
        {:reply, :not_found, state}

      {{_priority, _value}, in_flight} ->
        {:reply, :ok, %{state | in_flight: in_flight}}
    end
  end

  def handle_call(:length, _from, state = %__MODULE__{pq: pq}) do
    {:reply, PriorityQueue.size(pq), state}
  end

  @impl true
  def handle_info({:reschedule, ref}, state = %__MODULE__{pq: pq, in_flight: in_flight}) do
    #
    # After a given time, check whether the queue item with the given reference is still in-flight, and re-enqueue.
    #
    case Map.pop(in_flight, ref) do
      {nil, _} ->
        # If the reference isn't there any longer...
        {:noreply, state}

      {{priority, value}, in_flight} ->
        # The reference hasn't been handled in due time, so we re-enqueue it.
        state = %{state | pq: PriorityQueue.push_r(pq, priority, value), in_flight: in_flight}
        {:noreply, state}
    end
  end
end
defmodule MyTask do
  alias PriorityQueueServerWithConfirmation, as: Queue
  require Logger

  def launch(name) do
    {:ok, pid} = Queue.start_link()
    Process.register(pid, name)
  end

  def send_data(name) do
    Queue.push(name, 100, 9)
    Queue.push(name, 1, 1)
    Queue.push(name, 2, 5)
    Queue.push(name, 1, 2)
    Queue.push(name, 2, 6)
    Queue.push(name, 1, 3)
    Queue.push(name, 10, 8)
    Queue.push(name, 1, 4)
    Queue.push(name, 2, 7)
  end

  def fetch_data(name) do
    case Queue.pop(name) do
      :none ->
        IO.puts("Queue #{name} is empty")

      {:value, prio, val, ref} ->
        IO.puts("Fetched #{val} with prio #{prio} from queue #{name}")
        Queue.finished(name, ref)
        fetch_data(name)
    end
  end

  defp pairs(pid, list) do
    case pid |> Queue.pop() do
      :none ->
        list |> :lists.reverse()

      {:value, priority, value, ref} ->
        Queue.finished(pid, ref)
        pairs(pid, [{priority, value} | list])
    end
  end

  def pairs(pid), do: pid |> pairs([])
end
name = :ups
MyTask.launch(name)
PriorityQueueServerWithConfirmation.push(name, 2, "Greetings, Byron")
PriorityQueueServerWithConfirmation.pop(name)
:sys.get_state(name)
MyTask.pairs(name)
case PriorityQueueServerWithConfirmation.pop(name) do
  {:value, priority, value, ref} ->
    IO.puts("Received #{inspect(value)} with prio #{priority} (handle: #{inspect(ref)})")
    IO.puts("Before: #{inspect(:sys.get_state(name))}")
    PriorityQueueServerWithConfirmation.finished(name, ref)
    IO.puts("After: #{inspect(:sys.get_state(name))}")

  :none ->
    :none
end
# :fedex |> MyTask.send_data()

PriorityQueueServerWithConfirmation.push(name, 100, 9)
PriorityQueueServerWithConfirmation.push(name, 1, 1)
PriorityQueueServerWithConfirmation.push(name, 2, 5)
PriorityQueueServerWithConfirmation.push(name, 1, 2)
PriorityQueueServerWithConfirmation.push(name, 2, 6)
PriorityQueueServerWithConfirmation.push(name, 1, 3)
PriorityQueueServerWithConfirmation.push(name, 10, 8)
PriorityQueueServerWithConfirmation.push(name, 1, 4)
PriorityQueueServerWithConfirmation.push(name, 2, 7)
IO.puts("There are now #{PriorityQueueServerWithConfirmation.length(name)} items in the queue")

Fetch data from the GenServer

:fedex |> MyTask.pairs()

Put some data into the GenServer

pid = :fedex

[
  {1, "Top prio #{DateTime.utc_now()}"},
  {10, "Some batch crap  #{DateTime.utc_now()}"},
  {5, "Medium stuff  #{DateTime.utc_now()}"},
  {1, "Yet another important one  #{DateTime.utc_now()}"}
]
|> Enum.map(fn {priority, value} -> pid |> PriorityQueueServer.push(priority, value) end)

Show currently enqueued items

pid |> :sys.get_state() |> IO.inspect(label: :full_state)

pid |> :sys.get_state() |> Map.get(:priority_queue) |> PriorityQueue.pairs()
case pid |> PriorityQueueServer.pop() do
  {:value, priority, value} ->
    IO.puts("Received #{inspect(value)} with prio #{priority}")
    :ok

  other ->
    IO.puts("Received #{inspect(other)}")
end

pid |> :sys.get_state() |> Map.get(:priority_queue) |> PriorityQueue.pairs()