Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

State synchronization with state reducers

articles/state_sync_and_reducers.livemd

State synchronization with state reducers

State reducer

State is an inherent piece to most applications and we have to deal with it on daily basis. Whether it’s some local information or description of the whole running system, it all comes down to data. Let’s consider a simple example:

state = %{stack: []}

As a result of program evaluation, state may get transformed a number of times. Most of the time we operate on the state directly, that is, we modify the data structure to reflect the new situation.

In functional languages like Elixir, we keep the original data intact and produce new data instead, but that’s more of an implementation detail really. The important point is that we get to a new state.

push = fn state, item ->
  update_in(state.stack, &[item | &1])
end

state
|> push.(1)
|> push.(2)

That’s how we generally deal with data, by defining operations as functions. In other words, functions are the API used to transform the data.

This approach does the job if we know all operations upfront, but what if the operations are determined by events occurring over the system lifetime? Events are just another kind of data, so ideally our API would be data-driven. That’s where state reducer comes in!

state = %{stack: []}

# Reducer takes the current state and an operation
# to produce the new transformed state
reducer = fn
  state, {:push, item} ->
    update_in(state.stack, &[item | &1])

  state, :pop ->
    update_in(state.stack, &tl/1)

  state, :reverse ->
    update_in(state.stack, &Enum.reverse/1)
end

state
|> IO.inspect()
|> reducer.({:push, 1})
|> IO.inspect()
|> reducer.({:push, 2})
|> IO.inspect()
|> reducer.({:push, 3})
|> IO.inspect()
|> reducer.(:pop)
|> IO.inspect()
|> reducer.(:reverse)
|> IO.inspect()
|> reducer.(:pop)

In the above approach, each operation is described as a data structure (either {:push, term}, :pop or :reverse), so this time data is the API used to transform the data.

In some sense, we simply added a tiny layer of abstraction translating data (operations) into function calls. However, the difference lies mainly in the way we use and think about the new API.

One important detail about state reducer is that it should be a pure function, but since it’s only supposed to transform the state data structure and we are within a functional language this is the case sort of automatically.

Implications

By modeling state operations as data structures, we can easily send them between processes or systems. That’s exactly what we will explore further into the notebook.

Additionally, with this approach we could keep track of the operations to implement an undo stack.

State reducers in practice

It’s likely that you already use the state reducer pattern quite often! If you’re familiar with Redux for React, that’s exactly the core concept behind it.

The name of Enum.reduce/3 isn’t accidental either! The accumulator is your state, while the enumerable is a series of operations we use to transform the data.

Finally, GenServer encapsulates state and transforms it based on events it receives, which makes it a kind of reducer too. This analogy is a bit loose though, because GenServer implementations generally have side effects.

State synchronization

Now let’s move to the actual problem. Imagine a number of processes (clients/users) that operate together on the same state. Initially every client gets a local copy of the current state, so they are aligned with each other, then they intend to apply modifications to the sate. The question is, how do we ensure all the local states stay in sync?

Let’s have a quick look at two models addressing this problem.

Eventual consistency

In this model the processes modify their local state and communicate changes to all other processes. The local states may differ at a specific point in time, but the goal is to ensure eventual consistency, meaning that eventually all local states get back in sync. The tricky part is that each process may get the peer changes in different order, so we need an algorithm that figures out how to integrate these changes such that the final state ends up the same for everyone.

So far the research produced two categories of algorithms - OT (Operational Transformation) and CRDT (Conflict-free Replicated Data Type). You can find a brief overview of these in this article. This approach surely sounds appealing, however these algorithms don’t easily generalize to arbitrary data structures and operations, and can be quite complex on top of that.

Serialized changes

As outlined above, the main challenge comes from the fact that processes may receive peer changes in different order, so an obvious solution would be to enforce a specific order of changes (serialize them). In terms of implementation all we need to do is pass all changes through an intermediate process that forwards each change to all client processes. The intermediate process handles only a single message at a time, so the changes are implicitly ordered and consequently every client process receives exactly the same stream of changes.

Naturally, the simplicity of this approach has its own pitfalls. All changes need to go through the intermediate process before their result is reflected in the local state, which is not viable in cases like text collaboration, where the UI must update immediately on every keystroke. Also, enforcing the order doesn’t eliminate race conditions, imagine two people removing an item from one-item stack at the same time - we get two :pop changes and no matter which comes first, the second is invalid. Sometimes applying one change may invalidate the intent of other changes applied afterwards.

With all that said, these limitations may not necessarily be relevant, in which case this model is a perfect fit, because it’s simple, not limited to specific data structures and ensures the local states change in exactly the same way over time.

Serialized changes and state reducer

This whole discussion on changes/operations applied to state sounds like state reducer, doesn’t it? In the serialized changes model, each process starts with a state and then keeps applying operations to it as they come. In other words, each process keeps reducing the state upon subsequent operations.

Now back to the code!

Synchronized state

Let’s define a data structure representing the piece of state we need to keep in sync, along with the reduce method handling a well defined set of operations. That’s essentially what we did in the first section, just a bit more organized.

Note: we are going to use the term “data” instead of “state” to avoid confusion with GenServer state later on.

defmodule Data do
  defstruct stack: []

  @derive Inspect

  @type t :: %__MODULE__{stack: list()}

  @type operation :: {:push, term()} | :pop | :reverse

  @doc """
  Returns initial data.
  """
  @spec new() :: t()
  def new(), do: %__MODULE__{}

  @doc """
  Applies the change specified by `operation` to `data`.
  """
  @spec reduce(t(), operation()) :: t()
  def reduce(data, operation)

  def reduce(data, {:push, item}) do
    update_in(data.stack, &[item | &1])
  end

  def reduce(data, :pop) do
    update_in(data.stack, &tl/1)
  end

  def reduce(data, :reverse) do
    update_in(data.stack, &Enum.reverse/1)
  end
end

Server

Now we need a process for serializing the operations. It’s gonna be the source of truth for our data and all operations will go through this process to implicitly impose an order of the operations. Since it serves as a communication proxy for all the client processes, we are going to just call it a server.

In our example the server will do the minimal job of tracking the clients and broadcasting operations. In a real use case it could additionally coordinate some work, given its central role.

defmodule Server do
  use GenServer

  def start_link() do
    GenServer.start_link(__MODULE__, [])
  end

  @doc """
  Returns the current data and subscribes the caller
  to change operations.
  """
  def join(server) do
    GenServer.call(server, :join)
  end

  @doc """
  Notifies the server about intended change, which
  gets broadcasted to all the clients.
  """
  def submit_operation(server, operation) do
    GenServer.cast(server, {:submit_operation, operation})
  end

  @impl true
  def init(_opts) do
    {:ok, %{clients: [], data: Data.new()}}
  end

  @impl true
  def handle_call(:join, {pid, _}, state) do
    Process.monitor(pid)
    {:reply, state.data, %{state | clients: [pid | state.clients]}}
  end

  @impl true
  def handle_cast({:submit_operation, operation}, state) do
    # Send the operation to all the subscribed clients
    broadcast_operation(operation, state.clients)
    # Keep the updated data on the server, so that every
    # new client can get the latest data
    state = update_in(state.data, &Data.reduce(&1, operation))

    [
      :blue,
      "Server #{inspect(self())} received #{inspect(operation)}, " <>
        "new data: #{inspect(state.data)}"
    ]
    |> IO.ANSI.format()
    |> IO.puts()

    {:noreply, state}
  end

  @impl true
  def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
    {:noreply, %{state | clients: List.delete(state.clients, pid)}}
  end

  defp broadcast_operation(operation, clients) do
    # In a production app we would use a pubsub instead
    for pid <- clients do
      send(pid, {:operation, operation})
    end
  end
end

Client

Finally, we need a number of concurrent clients with a local copy of the data that we want to keep in sync.

In our example we imitate a client process by exposing a submit_operation function to simulate the client making a change to the data. The client processes are specific to given use case, for instance these could be LiveView processes willing to keep some state in sync and the operations would correspond to user interactions.

defmodule Client do
  use GenServer

  def start_link(server, opts \\ []) do
    opts = put_in(opts[:server], server)
    GenServer.start_link(__MODULE__, opts)
  end

  @doc """
  Originates a change operation from the given client.
  """
  def submit_operation(client, operation) do
    GenServer.cast(client, {:submit_operation, operation})
  end

  @impl true
  def init(opts) do
    server = opts[:server]
    color = opts[:color] || :light_black
    current_data = Server.join(server)
    {:ok, %{server: server, color: color, data: current_data}}
  end

  @impl true
  def handle_cast({:submit_operation, operation}, state) do
    Server.submit_operation(state.server, operation)
    {:noreply, state}
  end

  @impl true
  def handle_info({:operation, operation}, state) do
    # Upon receiving an operation, we apply it to the local data
    state = update_in(state.data, &amp;Data.reduce(&amp;1, operation))

    [
      state.color,
      "Client #{inspect(self())} received #{inspect(operation)}, " <>
        "new data: #{inspect(state.data)}"
    ]
    |> IO.ANSI.format()
    |> IO.puts()

    {:noreply, state}
  end
end

With these three pieces ready, let’s start a server with some clients:

{:ok, server} = Server.start_link()
{:ok, client_g} = Client.start_link(server, color: :green)
{:ok, client_y} = Client.start_link(server, color: :yellow)

And we can simulate change operations being submitted by the clients.

client_g |> Client.submit_operation({:push, 1})
client_y |> Client.submit_operation({:push, 2})
client_y |> Client.submit_operation({:push, 3})
client_y |> Client.submit_operation(:reverse)
client_g |> Client.submit_operation(:pop)
client_y |> Client.submit_operation(:pop)

If we look closely at the output, we may notice that the server receives operations in different order than the function calls above, that’s because they are sent by concurrent clients. However, the server, and consequently all the clients, get and process operations in the exact same order, ending up with the same data!

Race conditions

As mentioned eralier, with this approach we are still open to race conditions. In our example, imagine the following scenario:

Data.new()
# The stack has just a single element
|> Data.reduce({:push, 1})
# Two clients send a :pop change at the same time,
# it seems valid for both of them at that point
|> Data.reduce(:pop)
|> Data.reduce(:pop)

The kinds of race conditions and whether they are acceptable depend on the specific use case. The simplest solution is to validate incoming operations and ignore those that no longer make sense for the latest data.

Implementation-wise we can modify Data.reduce/2 to return either {:ok, data} or :error.

defmodule DataWithValidation do
  defstruct stack: []

  @derive Inspect

  @type t :: %__MODULE__{stack: list()}

  @type operation :: {:push, term()} | :pop | :reverse

  @doc """
  Returns initial data.
  """
  @spec new() :: t()
  def new(), do: %__MODULE__{}

  @doc """
  Applies the change specified by `operation` to `data`.
  """
  @spec reduce(t(), operation()) :: {:ok, t()} | :error
  def reduce(data, operation)

  def reduce(data, {:push, item}) do
    {:ok, update_in(data.stack, &amp;[item | &amp;1])}
  end

  def reduce(data, :pop) do
    # In this case using "with" is an overkill,
    # but this generally makes for a neat pattern
    # if the validation involves multiple checks
    with true <- data.stack != [] do
      {:ok, update_in(data.stack, &amp;tl/1)}
    else
      _ -> :error
    end
  end

  def reduce(data, :reverse) do
    {:ok, update_in(data.stack, &amp;Enum.reverse/1)}
  end
end
alias DataWithValidation, as: Data

{:ok, data} = Data.new() |> Data.reduce({:push, 1})

# The first :pop operation is valid and updates the data
{:ok, data} = Data.reduce(data, :pop)

# The second :pop operation is invalid becuase the stack
# is empty, so we get an error instead
:error = Data.reduce(data, :pop)

The server/clients would then try applying the operation to current data and ignore it if the result is an :error.

Livebook

Livebook is a collaborative web application - multiple users can edit the same notebook at the same time. You can verify this yourself by opening the same session in two browser tabs and applying changes in either of them.

When it comes to the implementation, each session (notebook page) has a server process coordinating the work and multiple LiveView processes - one per each browser tab. All of these processes keep a local data with the notebook content and additional evaluation information.

To keep the data in sync Livebook uses the synchronization pattern outlined above for all of the operations. The only exception is editing cell content, which is solved using Operational Transformation to provide proper conflict resolution in simultaneous text editing. However, the information and events necessary for OT are also modeled as part of the data with corresponding operations.

Feel free to have a look at the source code for more insights.

Final notes

In this notebook we covered the state reducer pattern, which provides a data-centric API to state transformation. Then, we discussed the problem of state synchronization along with some approaches we may take to takle it. Finally, we implemented synchronized state by serializing concurrent operations and using state reducer for applying the updates.