Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Queues

reading/queues.livemd

Queues

Mix.install([
  {:jason, "~> 1.4"},
  {:kino, "~> 0.9", override: true},
  {:youtube, github: "brooklinjazz/youtube"},
  {:hidden_cell, github: "brooklinjazz/hidden_cell"},
  {:benchee, "~> 1.1"}
])

Navigation

Home Report An Issue Custom AssertionsWorker Pools

Review Questions

  • What are the front and back of a queue?
  • What is $O(1)$ performance vs $O(n)$ performance?
  • What is the ideal vs worst-case performance of an amortized queue? When does the worst-case condition occur?
  • How do queues vs double-ended queues (DEQs) differ in how they rebalance a queue?

Overview

FIFO Queues

A FIFO (First-In-First-Out) queue is a type of data structure that stores elements in the order they were added and retrieves them in the same order. In a FIFO queue, the first element added to the queue is the first element to be removed. It works like a line of people in a store, where the first person to arrive is the first person to be served.

Elements in a queue are inserted (enqueued) to the back (also called the rear) and elements are removed (dequeued) from the front.

LIFO Queues

A LIFO Queue (Last-In-First-Out) is a type of data structure that stores elements and retrieves the most recently stored element. It is also known as a Stack.

We push elements onto the top of the stack and pop them off of the top of the stack.

Double Ended Queues

A Double-Ended Queue (Deque) is a type of data structure that allows the performant insertion and removal of elements from both ends of the queue.

Priority Queue

A Priority queue stores elements with an associated priority level. Elements are removed in order of priority.

A minimum priority queue retrieves elements with the lowest priority level, and a maximum priority queue retrieves elements with the highest priority level.

For example, a doctors office might be considered a priority queue, as patients with more severe issues are treated first.

Amortized $O(1)$

We use Big O Notation to describe how the performance of an algorithm changes as the data set grows. $O(1)$ means the algorithm’s execution time remains constant, regardless of the size of the data set. $O(n)$ means the algorithm’s execution time grows proportionally to the size of the data set.

Both our Queue and Double-Ended Queue have an amortized $O(1)$ performance. On average, their performance is constant, but they may have a $O(n)$ performance in the worst-case scenario.

Naive Implementation

Implementing a naive algorithm involves developing a straightforward solution to a problem, which may not necessarily be the most efficient or optimal solution. A naive algorithm is usually the first algorithm that comes to mind when thinking about a problem, and it is often easy to comprehend and execute. Nevertheless, it may not be the most effective solution for larger or more complex input sizes, especially when compared to other more sophisticated algorithms. Naive solutions are also sometimes referred to as a brute-force algorithm.

Stack

A stack is a fairly simple data structure to implement, as we can implement a highly performance version of a stack using only a list.

defmodule Stack do
  def new do
    []
  end

  def push(stack, element) do
    [element | stack]
  end

  def pop(stack) do
    {hd(stack), tl(stack)}
  end
end

The head of the list is the top of the stack.

Elements are pushed onto the top of the stack and removed from the top of the stack.

stack =
  Stack.new()
  |> Stack.push(1)
  |> Stack.push(2)
  |> Stack.push(3)
Stack.pop(stack)

Naive Queue

For our naive solution, we can represent our queue as a list.

queue = []

We can simply prepend elements into the list when we want to insert elements. This has $O(1)$ performance.

queue = [1 | queue]
queue = [2 | queue]
queue = [3 | queue]

However, we must reverse the list to remove the first element in our queue. Then, we also need to re-reverse the remaining elements to put the queue back in order. Reversing a list has $O(n)$ performance, which isn’t ideal.

[value | reversed_queue] = Enum.reverse(queue)
{value, Enum.reverse(reversed_queue)}

Here’s that made into a NaiveQueue module.

defmodule NaiveQueue do
  def new, do: []

  def enqueue(queue, el) do
    [el | queue]
  end

  def dequeue([]), do: {nil, []}

  def dequeue(queue) do
    [value | reversed_queue] = Enum.reverse(queue)
    {value, Enum.reverse(reversed_queue)}
  end
end

We can performantly insert (enqueue) items into our queue.

queue =
  NaiveQueue.new()
  |> NaiveQueue.enqueue(1)
  |> NaiveQueue.enqueue(2)
  |> NaiveQueue.enqueue(3)

We can also remove (dequeue) elements from our queue, but this has $O(n)$ performance.

{value, queue} = NaiveQueue.dequeue(queue)
{value, queue} = NaiveQueue.dequeue(queue)
{value, queue} = NaiveQueue.dequeue(queue)

We’ve handled having an empty queue by returning nil.

{value, queue} = NaiveQueue.dequeue(queue)

Amortized Queue

We can model our queue as a list with the front elements and the back elements, allowing us to optimize our queue by having two separate list heads.

Elements are prepended to the back list, and removed from the front list.

flowchart
subgraph back
  6 --> 5 --> 4 --> 3 --> 2 --> 1
end

When we remove (dequeue) an element, we can reverse the elements in the back list and store them in the front list. Reversing the list causes our worst-case performance of $O(n)$. However, future removals will have $O(1)$ performance.

flowchart
  subgraph front
    direction LR
    1 --> 2 --> 3 --> 4 --> 5 --> 6
  end

Implementation

We can store each list in a tuple to implement our front and back lists.

# {back, Front}
{[], []}

When we insert elements, we prepend them to the back list.

# Enqueue 1, 2, 3
{[3, 2, 1], []}

When we remove elements, we grab the head of the front list. When the front list is empty, we’ll reverse our back list to make the front list. This is our worst-case $O(n)$ performance scenario.

# Dequeue
{[], [1, 2, 3]}

After reversing the back list, it’s an $O(1)$ operation to grab elements from the front of our queue.

{back, front} = {[], [1, 2, 3]}

[removed | front] = front

{removed, front}

Here’s that put into a module. We’ve handled the best and worst-case scenarios. We’ve also handled returning nil when the queue is empty.

defmodule AmortizedQueue do
  def new, do: {[], []}

  def enqueue({back, front}, el) do
    {[el | back], front}
  end

  def dequeue({[], []} = queue), do: {nil, queue}
  # best-case scenario O(1)
  def dequeue({back, [el | front]}), do: {el, {back, front}}

  # worst-case scenario O(n)
  def dequeue({back, []}) do
    [removed | front] = Enum.reverse(back)
    {removed, {[], front}}
  end
end

We enqueue items on the back list.

queue =
  AmortizedQueue.new()
  |> AmortizedQueue.enqueue(1)
  |> AmortizedQueue.enqueue(2)
  |> AmortizedQueue.enqueue(3)

Then if there are no elements in the front list we reverse the back list. While this has $O(n)$ performance, we only have to do it when the front list is empty.

{value, queue} = AmortizedQueue.dequeue(queue)

Subsequent operations will be $O(1)$.

{value, queue} = AmortizedQueue.dequeue(queue)
{value, queue} = AmortizedQueue.dequeue(queue)

If the entire queue is empty, we simply return nil.

{value, queue} = AmortizedQueue.dequeue(queue)

Double Ended Queue

Similar to our amortized queue, we can represent a double-ended queue with both a back list and a front list.

Since we can perform operations on either end, we need to optimize for removing/inserting from both the back and the front of our DEQ. Therefore, it would be unwise to reverse the entirety of either list as we did with our amortized queue.

Instead, a performant approach when either the back or the front list is empty is to balance our queue into two halves.

For example, if we insert six elements to our back list:

flowchart
subgraph back
  direction RL
  1 --> 2 --> 3 --> 4 --> 5 --> 6
end

We would split the list in half to make the back [6, 5, 4] and our front [1, 2, 3].

flowchart
  subgraph front
    direction LR
    1 --> 2 --> 3
  end

  subgraph back
    direction LR
    6 --> 5 --> 4
  end

By splitting our queue, we’ll have access to the head of our back and our front list, so inserting/removing operations will have $O(1)$ on either side of the queue.

Here’s that put into a module.

defmodule DoubleEndedQueue do
  def new, do: {[], []}

  def insert_back({back, front}, el), do: {[el | back], front}

  def insert_front({back, front}, el), do: {back, [el | front]}

  def remove_back({[], []} = queue), do: {nil, queue}

  # best-case scenario O(1)
  def remove_back({[removed | back], front}), do: {removed, {back, front}}

  # worse-case scenario O(n)
  def remove_back({[], front}) do
    middle_index = div(length(front), 2)
    {half_front, reversed_back} = Enum.split(front, middle_index)
    [removed | back] = Enum.reverse(reversed_back)
    {removed, {back, half_front}}
  end

  def remove_front({[], []} = queue), do: {nil, queue}

  # best-case scenario O(1)
  def remove_front({back, [removed | front]}), do: {removed, {back, front}}

  # worst-case scenario O(n)
  def remove_front({back, []}) do
    middle_index = div(length(back), 2)
    {half_back, reversed_front} = Enum.split(back, middle_index)
    [removed | front] = Enum.reverse(reversed_front)
    {removed, {half_back, front}}
  end
end

Now we can performantly insert/remove elements to the back or the front.

deq =
  DoubleEndedQueue.new()
  |> DoubleEndedQueue.insert_back(1)
  |> DoubleEndedQueue.insert_back(2)
  |> DoubleEndedQueue.insert_back(3)
  |> IO.inspect()

{value, deq} = DoubleEndedQueue.remove_back(deq) |> IO.inspect()
{value, deq} = DoubleEndedQueue.remove_back(deq) |> IO.inspect()
{value, deq} = DoubleEndedQueue.remove_back(deq) |> IO.inspect()
{value, deq} = DoubleEndedQueue.remove_back(deq)
deq =
  DoubleEndedQueue.new()
  |> DoubleEndedQueue.insert_front(1)
  |> DoubleEndedQueue.insert_front(2)
  |> DoubleEndedQueue.insert_front(3)
  |> IO.inspect()

{value, deq} = DoubleEndedQueue.remove_front(deq) |> IO.inspect()
{value, deq} = DoubleEndedQueue.remove_front(deq) |> IO.inspect()
{value, deq} = DoubleEndedQueue.remove_front(deq) |> IO.inspect()
{value, deq} = DoubleEndedQueue.remove_front(deq)

We rebalance the queue when we remove an element from an empty front list.

deq =
  DoubleEndedQueue.new()
  |> DoubleEndedQueue.insert_back(1)
  |> DoubleEndedQueue.insert_back(2)
  |> DoubleEndedQueue.insert_back(3)
  |> DoubleEndedQueue.insert_back(4)
  |> DoubleEndedQueue.insert_back(5)
  |> DoubleEndedQueue.insert_back(6)
  |> IO.inspect()

DoubleEndedQueue.remove_front(deq)

The same applies when we remove elements from an empty back list. We split the front to rebalance our queue.

deq =
  DoubleEndedQueue.new()
  |> DoubleEndedQueue.insert_front(6)
  |> DoubleEndedQueue.insert_front(5)
  |> DoubleEndedQueue.insert_front(4)
  |> DoubleEndedQueue.insert_front(3)
  |> DoubleEndedQueue.insert_front(2)
  |> DoubleEndedQueue.insert_front(1)
  |> IO.inspect()

DoubleEndedQueue.remove_back(deq)

We’ve also handled removing elements from either end when both the back and the front are empty by returning nil and an empty DEQ.

DoubleEndedQueue.new()
|> DoubleEndedQueue.remove_back()
DoubleEndedQueue.new()
|> DoubleEndedQueue.remove_front()

Double Ended Queue (Tracking Size)

To split our DEQ, we need to calculate the queue size. Unfortunately, calculating the length of a list is an expensive operation $O(n)$.

To avoid this operation, instead of computing the size of the queue, we can keep track of it whenever we add or remove an element.

Here’s an alternative DEQ implementation using a struct that tracks the right list, the left list, and the total size of the DEQ.

The worst-case scenario is still $O(n)$. However, it’s overall more performant because it requires fewer traversals through the list.

defmodule DoubleEndedQueueStruct do
  defstruct back: [], front: [], size: 0
  def new, do: %__MODULE__{}

  def insert_back(queue, el) do
    %{queue | back: [el | queue.back], size: queue.size + 1}
  end

  def insert_front(queue, el) do
    %{queue | front: [el | queue.front], size: queue.size + 1}
  end

  def remove_back(%__MODULE__{back: [], front: []} = queue) do
    {nil, queue}
  end

  def remove_back(%__MODULE__{back: [], front: front} = queue) do
    mid = div(queue.size, 2)
    {half_front, reversed_back} = Enum.split(front, mid)
    [removed | back] = Enum.reverse(reversed_back)
    {removed, %{queue | back: back, front: half_front, size: queue.size - 1}}
  end

  def remove_back(%__MODULE__{back: [removed | back]} = queue) do
    {removed, %{queue | back: back, size: queue.size - 1}}
  end

  def remove_front(%__MODULE__{front: [], back: []} = queue) do
    {nil, queue}
  end

  def remove_front(%__MODULE__{front: []} = queue) do
    mid = div(queue.size, 2)
    {back_half, reversed_front} = Enum.split(queue.back, mid)
    [removed | front] = Enum.reverse(reversed_front)
    {removed, %{queue | front: front, back: back_half, size: queue.size - 1}}
  end

  def remove_front(%__MODULE__{front: [removed | front]} = queue) do
    {removed, %{queue | front: front, size: queue.size - 1}}
  end
end

While the structure representing the queue has changed to optimize performance, the behavior remains the same.

deq =
  DoubleEndedQueueStruct.new()
  |> DoubleEndedQueueStruct.insert_front(6)
  |> DoubleEndedQueueStruct.insert_front(5)
  |> DoubleEndedQueueStruct.insert_front(4)
  |> DoubleEndedQueueStruct.insert_front(3)
  |> DoubleEndedQueueStruct.insert_front(2)
  |> DoubleEndedQueueStruct.insert_front(1)
  |> IO.inspect()

DoubleEndedQueueStruct.remove_back(deq)

Erlang Queue

Erlang provides a :queue module with further performance optimizations for a DEQ.

It provides the following functions for inserting and removing elements.

  • in/2 insert an element to the back of the queue.
  • out/2 removes an element from the front of the queue.

Then there are “reversed” versions which perform the opposite operations of a normal queue.

  • in_r/2 insert an element to the front of the queue.
  • out_r/1 remove an element from the back of the queue.
q = :queue.new()

The erlang :queue module inserts an element to the front when it’s empty. This is a small optimization, so we don’t have to reverse the front list the first time we remove elements from it.

q = :queue.in(1, q) |> IO.inspect()
# Notice 1 Is Moved To The Front Of The Queue.
q = :queue.in(2, q) |> IO.inspect()
q = :queue.in(3, q) |> IO.inspect()
q = :queue.in(4, q) |> IO.inspect()
q = :queue.in(5, q) |> IO.inspect()
q = :queue.in(6, q) |> IO.inspect()

It similarly rebalances the lists when either is empty.

{value, q} = :queue.out(q)

After rebalancing, elements are removed from the front, which has $O(1)$ performance.

{value, q} = :queue.out(q)
{value, q} = :queue.out(q)

Concurrent Queues

The :queue erlang module is notorious for having a difficult-to-read interface. This quote is taken directly from the :queue documentation.

> The “Okasaki API” is inspired by “Purely Functional Data Structures” by Chris Okasaki. It regards queues as lists. This API is by many regarded as strange and avoidable. For example, many reverse operations have lexically reversed names, some with more readable but perhaps less understandable aliases.

So folks have created libraries for queues and DEQs that improve the interface and leverage the power of concurrency.

For example, the OPQ library by fredwu leverages both the :queue module and GenStage to enable worker pools to handle work in a queue concurrently.

This is like a shared lineup, where multiple workers handle a single line of customers.

The number of workers determines the number of jobs in the queue that can be handled concurrently.

flowchart LR
subgraph Queue
  1
  2
  3
  4[4]
end
subgraph Worker Pool
1 -->  W1[Worker1] 
2 --> Worker2
3 --> Worker3
end

When the worker finishes, it becomes available for the next job in the queue.

flowchart LR
subgraph Queue
  4
end

subgraph Worker Pool
4 --> W1[Worker1] 
Worker2
Worker3
end

Diving into this library is beyond the scope of this material. Still, knowing how to scale queues to handle thousands (if not millions) of concurrent actions is useful. For example, many applications such as ticket-purchasing or streaming sites experience the thundering herd problem, where many interactions occur simultaneously.

The thundering herd problem happens when a lot of computer processes or threads are waiting for the same thing causing a big surge of activity that can slow down or even crash the system.

It’s called the thundering herd problem because it’s like a stampede of animals reacting to a sudden disturbance. Avoiding this problem requires techniques like load balancing, caching, and queuing to manage the flow of requests and prevent the system from becoming overwhelmed.

Naive Priority Queue

Typically priority queues are implemented using a tree structure. However, for our naive solution we’ll use a list.

Elements in our priority queue will be a tuple with the associated priority.

priority_queue = [{"critical injury", 1}, {"moderate injury", 2}, {"minor illness", 3}]

There are two main ways to implement a priority queue using a single list.

  1. Store the elements in an unsorted order and retrieve the priority element.
  2. Store the elements in a sorted order and retrieve the first element.

Solution 1 optimizes for storing elements, because we can insert elements at the head of the list and retrieving them is more difficult.

Solution 2 optimizes for retrieving elements because we can remove elements at the head of the list and storing them is more difficult.

We’ll choose solution 2, because we want to opimize for retrieving elements, rather than storing them.

We’ve also chosen to implement this as a minimum priority queue, where lower values are prioritized.

defmodule PriorityQueue do
  def new do
    []
  end

  def add([], new_element), do: [new_element]

  def add(queue, new_element) do
    # find the index of the first element with a greater priority value.
    index =
      Enum.find_index(queue, fn current_element ->
        priority_of(new_element) < priority_of(current_element)
      end)

    # insert at the back of the list if there is no index.
    List.insert_at(queue, index || -1, new_element)
  end

  def next(queue) do
    hd(queue)
  end

  def remove(queue) do
    tl(queue)
  end

  defp priority_of(element) do
    elem(element, 1)
  end
end

Elements will be added in order of priority (low -> high).

# 4231 -> 1234

priority_queue =
  PriorityQueue.new()
  |> PriorityQueue.add({"minor illness", 4})
  |> PriorityQueue.add({"moderate sprain", 2})
  |> PriorityQueue.add({"severe injury", 3})
  |> PriorityQueue.add({"critical injury", 1})

We can view the next element to be removed.

PriorityQueue.next(priority_queue)

Then remove that element to return the rest of the priority queue.

PriorityQueue.remove(priority_queue)

Further Reading

Consider the following resource(s) to deepen your understanding of the topic.

Commit Your Progress

DockYard Academy now recommends you use the latest Release rather than forking or cloning our repository.

Run git status to ensure there are no undesirable changes. Then run the following in your command line from the curriculum folder to commit your progress.

$ git add .
$ git commit -m "finish Queues reading"
$ git push

We’re proud to offer our open-source curriculum free of charge for anyone to learn from at their own pace.

We also offer a paid course where you can learn from an instructor alongside a cohort of your peers. We will accept applications for the June-August 2023 cohort soon.

Navigation

Home Report An Issue Custom AssertionsWorker Pools