Playing with priority queues
The PriorityQueue
data structure
The PriorityQueue
module represents the pure functional data structure. It uses Erlang’s :gb_trees
and :queue
data structures.
defmodule PriorityQueue do
defstruct size: 0, tree: nil
def new, do: %__MODULE__{size: 0, tree: :gb_trees.empty()}
def empty?(%__MODULE__{size: 0}), do: true
def empty?(%__MODULE__{}), do: false
@doc ~S"""
Returns the size of a `PriorityQueue
## Examples
iex> PriorityQueue.new()
...> |> PriorityQueue.size()
0
iex> PriorityQueue.new()
...> |> PriorityQueue.push(1, "Some item")
...> |> PriorityQueue.size()
1
"""
def size(%__MODULE__{size: n}), do: n
defp push_into_queue(pq = %__MODULE__{size: n, tree: tree}, priority, value, operation)
when is_integer(priority) do
tree =
case :gb_trees.lookup(priority, tree) do
{:value, queue} ->
queue =
case operation do
:regular -> :queue.in(value, queue)
:requeue -> :queue.in_r(value, queue)
end
:gb_trees.update(priority, queue, tree)
:none ->
:gb_trees.insert(priority, :queue.in(value, :queue.new()), tree)
end
%{pq | size: n + 1, tree: tree}
end
@doc ~S"""
Appends values into a `PriorityQueue`.
## Examples
iex> queue =
...> PriorityQueue.new()
...> |> PriorityQueue.push(2, 3) # [{2, 3}]
...> |> PriorityQueue.push(1, 1) # [{1, 1}, {2, 3}]
...> |> PriorityQueue.push(3, 5) # [{1, 1}, {2, 3}, {3, 5}]
...> |> PriorityQueue.push_r(2, 2) # [{1, 1}, {2, 2}, {2, 3}, {3, 5}]
...> |> PriorityQueue.push(2, 4) # [{1, 1}, {2, 2}, {2, 3}, {2, 4}, {3, 5}]
iex> { result, queue } = queue |> PriorityQueue.pop(); result
{ :value, 1, 1 }
iex> { result, queue } = queue |> PriorityQueue.pop(); result
{ :value, 2, 2 }
iex> { result, queue } = queue |> PriorityQueue.pop(); result
{ :value, 2, 3 }
iex> { result, queue } = queue |> PriorityQueue.pop(); result
{ :value, 2, 4 }
iex> { result, _queue } = queue |> PriorityQueue.pop(); result
{ :value, 3, 5 }
"""
def push(pq = %__MODULE__{}, priority, value) when is_integer(priority) do
push_into_queue(pq, priority, value, :regular)
end
def push_r(pq = %__MODULE__{}, priority, value) when is_integer(priority) do
push_into_queue(pq, priority, value, :requeue)
end
@doc ~S"""
Returns the pairs of a `PriorityQueue`.
## Examples
iex> queue =
...> PriorityQueue.new()
...> |> PriorityQueue.push(2, "Lower prio")
...> |> PriorityQueue.push(1, "Higher prio")
...> |> PriorityQueue.push(3, :very_low_prio)
...> |> PriorityQueue.push(2, { :job, "Some other thing in 2" })
iex> { result, queue } = queue |> PriorityQueue.pop(); result
{ :value, 1, "Higher prio" }
iex> { result, queue } = queue |> PriorityQueue.pop(); result
{ :value, 2, "Lower prio" }
iex> { result, queue } = queue |> PriorityQueue.pop(); result
{ :value, 2, { :job, "Some other thing in 2" } }
iex> { result, _queue } = queue |> PriorityQueue.pop(); result
{ :value, 3, :very_low_prio }
iex> { result, _queue } = PriorityQueue.new()
...> |> PriorityQueue.pop()
iex> result
:none
"""
def pop(pq = %__MODULE__{size: 0}), do: {:none, pq}
def pop(%__MODULE__{size: n, tree: tree}) do
{priority, queue, tree} = :gb_trees.take_smallest(tree)
{{:value, value}, queue} = :queue.out(queue)
tree =
case :queue.is_empty(queue) do
false -> :gb_trees.enter(priority, queue, tree)
true -> tree
end
{{:value, priority, value}, %__MODULE__{size: n - 1, tree: tree}}
end
@doc ~S"""
Returns the keys of a `PriorityQueue
## Examples
iex> PriorityQueue.new()
...> |> PriorityQueue.push(2, "Lower prio")
...> |> PriorityQueue.push(1, "Higher prio")
...> |> PriorityQueue.keys()
[ 1, 2 ]
"""
def keys(%__MODULE__{tree: tree}), do: :gb_trees.keys(tree)
defp values(pq = %__MODULE__{}, list) do
case pq |> pop() do
{:none, _} -> :lists.reverse(list)
{{:value, _priority, value}, pq} -> values(pq, [value | list])
end
end
@doc ~S"""
Returns the values of a `PriorityQueue
## Examples
iex> PriorityQueue.new()
...> |> PriorityQueue.push(2, "Lower prio")
...> |> PriorityQueue.push(1, "Higher prio")
...> |> PriorityQueue.values()
[ "Higher prio", "Lower prio" ]
"""
def values(pq = %__MODULE__{}) do
values(pq, [])
end
defp pairs(pq = %__MODULE__{}, list) do
case pq |> pop() do
{:none, _} -> :lists.reverse(list)
{{:value, priority, value}, pq} -> pairs(pq, [{priority, value} | list])
end
end
@doc ~S"""
Returns the pairs of a `PriorityQueue
## Examples
iex> PriorityQueue.new()
...> |> PriorityQueue.push(2, "Lower prio")
...> |> PriorityQueue.push(1, "Higher prio")
...> |> PriorityQueue.push(3, :very_low_prio)
...> |> PriorityQueue.push(2, { :job, "Some other thing in 2" })
...> |> PriorityQueue.pairs()
[
{ 1, "Higher prio" },
{ 2, "Lower prio" },
{ 2, { :job, "Some other thing in 2" } },
{ 3, :very_low_prio }
]
iex> PriorityQueue.new()
...> |> PriorityQueue.push(100, 9)
...> |> PriorityQueue.push(1, 1)
...> |> PriorityQueue.push(2, 5)
...> |> PriorityQueue.push(1, 2)
...> |> PriorityQueue.push(2, 6)
...> |> PriorityQueue.push(1, 3)
...> |> PriorityQueue.push(10, 8)
...> |> PriorityQueue.push(1, 4)
...> |> PriorityQueue.push(2, 7)
...> |> PriorityQueue.pairs()
[
{1, 1}, {1, 2}, {1, 3}, {1, 4},
{2, 5}, {2, 6}, {2, 7},
{10, 8},
{100, 9}
]
"""
def pairs(pq = %__MODULE__{}) do
pairs(pq, [])
end
end
PriorityQueue.new()
|> PriorityQueue.push(100, "Hello")
|> PriorityQueue.push(1, "Robert")
|> PriorityQueue.push(2, 5)
|> PriorityQueue.push(1, "Dolev")
|> PriorityQueue.push(2, 6)
|> PriorityQueue.push(1, 3)
|> PriorityQueue.push(10, 8)
|> PriorityQueue.push(1, 4)
|> PriorityQueue.push(5, 7)
|> PriorityQueue.pairs()
The GenServer
wrapped around the PriorityQueue
data structure
The following module wraps the functional data structure in a GenServer
.
-
Example on how to use
{:noreply, ...}
in ahandle_call()
callback.
defmodule PriorityQueueServer do
use GenServer
require Logger
# Client
def start_link(), do: GenServer.start_link(__MODULE__, PriorityQueue.new())
def push(pid, priority, value), do: GenServer.cast(pid, {:push, priority, value})
# :infinity,
def pop(pid, timeout \\ 5000), do: GenServer.call(pid, :pop, timeout)
# Server
@impl true
def init(state = %PriorityQueue{}), do: {:ok, state}
@impl true
def handle_cast({:push, priority, value}, old_state_priority_queue) do
new_state = PriorityQueue.push(old_state_priority_queue, priority, value)
{:noreply, new_state}
end
@impl true
def handle_call(:pop, _from, old_state = %PriorityQueue{}) do
{popped_item, new_state} = PriorityQueue.pop(old_state)
{:reply, popped_item, new_state}
end
end
{:ok, pid1} = PriorityQueueServer.start_link()
PriorityQueueServer.push(pid1, 10, "Hello")
:sys.get_state(pid1)
PriorityQueueServer.pop(pid1)
defmodule PriorityQueueServerWithConfirmation do
use GenServer
require Logger
defstruct pq: nil, in_flight: nil
# Client
def start_link(),
do:
GenServer.start_link(__MODULE__, %__MODULE__{pq: PriorityQueue.new(), in_flight: Map.new()})
def push(pid, priority, value), do: GenServer.cast(pid, {:push, priority, value})
# :infinity,
def pop(pid, timeout \\ 5000), do: GenServer.call(pid, {:pop, timeout})
def finished(pid, ref), do: GenServer.call(pid, {:finished, ref})
def length(pid), do: GenServer.call(pid, :length)
# Server
@impl true
def init(state = %__MODULE__{}), do: {:ok, state}
@impl true
def handle_cast({:push, priority, value}, state = %__MODULE__{pq: pq}) do
state = %{state | pq: PriorityQueue.push(pq, priority, value)}
{:noreply, state}
end
@impl true
def handle_call({:pop, timeout}, _from, state = %__MODULE__{pq: pq, in_flight: in_flight}) do
{item, new_pq} = PriorityQueue.pop(pq)
case item do
:none ->
{:reply, :none, state}
{:value, priority, value} ->
ref = make_ref()
in_flight = Map.put(in_flight, ref, {priority, value})
state = %{state | pq: new_pq, in_flight: in_flight}
# Schedule an event to check whether the item has been processed, and re-inject if needed
Process.send_after(self(), {:reschedule, ref}, timeout)
{:reply, {:value, priority, value, ref}, state}
end
end
@impl true
def handle_call({:finished, ref}, _from, state = %__MODULE__{in_flight: in_flight}) do
case Map.pop(in_flight, ref) do
{nil, _} ->
{:reply, :not_found, state}
{{_priority, _value}, in_flight} ->
{:reply, :ok, %{state | in_flight: in_flight}}
end
end
def handle_call(:length, _from, state = %__MODULE__{pq: pq}) do
{:reply, PriorityQueue.size(pq), state}
end
@impl true
def handle_info({:reschedule, ref}, state = %__MODULE__{pq: pq, in_flight: in_flight}) do
#
# After a given time, check whether the queue item with the given reference is still in-flight, and re-enqueue.
#
case Map.pop(in_flight, ref) do
{nil, _} ->
# If the reference isn't there any longer...
{:noreply, state}
{{priority, value}, in_flight} ->
# The reference hasn't been handled in due time, so we re-enqueue it.
state = %{state | pq: PriorityQueue.push_r(pq, priority, value), in_flight: in_flight}
{:noreply, state}
end
end
end
defmodule MyTask do
alias PriorityQueueServerWithConfirmation, as: Queue
require Logger
def launch(name) do
{:ok, pid} = Queue.start_link()
Process.register(pid, name)
end
def send_data(name) do
Queue.push(name, 100, 9)
Queue.push(name, 1, 1)
Queue.push(name, 2, 5)
Queue.push(name, 1, 2)
Queue.push(name, 2, 6)
Queue.push(name, 1, 3)
Queue.push(name, 10, 8)
Queue.push(name, 1, 4)
Queue.push(name, 2, 7)
end
def fetch_data(name) do
case Queue.pop(name) do
:none ->
IO.puts("Queue #{name} is empty")
{:value, prio, val, ref} ->
IO.puts("Fetched #{val} with prio #{prio} from queue #{name}")
Queue.finished(name, ref)
fetch_data(name)
end
end
defp pairs(pid, list) do
case pid |> Queue.pop() do
:none ->
list |> :lists.reverse()
{:value, priority, value, ref} ->
Queue.finished(pid, ref)
pairs(pid, [{priority, value} | list])
end
end
def pairs(pid), do: pid |> pairs([])
end
name = :ups
MyTask.launch(name)
PriorityQueueServerWithConfirmation.push(name, 2, "Greetings, Byron")
PriorityQueueServerWithConfirmation.pop(name)
:sys.get_state(name)
MyTask.pairs(name)
case PriorityQueueServerWithConfirmation.pop(name) do
{:value, priority, value, ref} ->
IO.puts("Received #{inspect(value)} with prio #{priority} (handle: #{inspect(ref)})")
IO.puts("Before: #{inspect(:sys.get_state(name))}")
PriorityQueueServerWithConfirmation.finished(name, ref)
IO.puts("After: #{inspect(:sys.get_state(name))}")
:none ->
:none
end
# :fedex |> MyTask.send_data()
PriorityQueueServerWithConfirmation.push(name, 100, 9)
PriorityQueueServerWithConfirmation.push(name, 1, 1)
PriorityQueueServerWithConfirmation.push(name, 2, 5)
PriorityQueueServerWithConfirmation.push(name, 1, 2)
PriorityQueueServerWithConfirmation.push(name, 2, 6)
PriorityQueueServerWithConfirmation.push(name, 1, 3)
PriorityQueueServerWithConfirmation.push(name, 10, 8)
PriorityQueueServerWithConfirmation.push(name, 1, 4)
PriorityQueueServerWithConfirmation.push(name, 2, 7)
IO.puts("There are now #{PriorityQueueServerWithConfirmation.length(name)} items in the queue")
Fetch data from the GenServer
:fedex |> MyTask.pairs()
Put some data into the GenServer
pid = :fedex
[
{1, "Top prio #{DateTime.utc_now()}"},
{10, "Some batch crap #{DateTime.utc_now()}"},
{5, "Medium stuff #{DateTime.utc_now()}"},
{1, "Yet another important one #{DateTime.utc_now()}"}
]
|> Enum.map(fn {priority, value} -> pid |> PriorityQueueServer.push(priority, value) end)
Show currently enqueued items
pid |> :sys.get_state() |> IO.inspect(label: :full_state)
pid |> :sys.get_state() |> Map.get(:priority_queue) |> PriorityQueue.pairs()
case pid |> PriorityQueueServer.pop() do
{:value, priority, value} ->
IO.puts("Received #{inspect(value)} with prio #{priority}")
:ok
other ->
IO.puts("Received #{inspect(other)}")
end
pid |> :sys.get_state() |> Map.get(:priority_queue) |> PriorityQueue.pairs()