State synchronization with state reducers
State reducer
State is an inherent piece to most applications and we have to deal with it on daily basis. Whether it’s some local information or description of the whole running system, it all comes down to data. Let’s consider a simple example:
state = %{stack: []}
As a result of program evaluation, state may get transformed a number of times. Most of the time we operate on the state directly, that is, we modify the data structure to reflect the new situation.
In functional languages like Elixir, we keep the original data intact and produce new data instead, but that’s more of an implementation detail really. The important point is that we get to a new state.
push = fn state, item ->
update_in(state.stack, &[item | &1])
end
state
|> push.(1)
|> push.(2)
That’s how we generally deal with data, by defining operations as functions. In other words, functions are the API used to transform the data.
This approach does the job if we know all operations upfront, but what if the operations are determined by events occurring over the system lifetime? Events are just another kind of data, so ideally our API would be data-driven. That’s where state reducer comes in!
state = %{stack: []}
# Reducer takes the current state and an operation
# to produce the new transformed state
reducer = fn
state, {:push, item} ->
update_in(state.stack, &[item | &1])
state, :pop ->
update_in(state.stack, &tl/1)
state, :reverse ->
update_in(state.stack, &Enum.reverse/1)
end
state
|> IO.inspect()
|> reducer.({:push, 1})
|> IO.inspect()
|> reducer.({:push, 2})
|> IO.inspect()
|> reducer.({:push, 3})
|> IO.inspect()
|> reducer.(:pop)
|> IO.inspect()
|> reducer.(:reverse)
|> IO.inspect()
|> reducer.(:pop)
In the above approach, each operation is described as a data
structure (either {:push, term}
, :pop
or :reverse
), so
this time data is the API used to transform the data.
In some sense, we simply added a tiny layer of abstraction translating data (operations) into function calls. However, the difference lies mainly in the way we use and think about the new API.
One important detail about state reducer is that it should be a pure function, but since it’s only supposed to transform the state data structure and we are within a functional language this is the case sort of automatically.
Implications
By modeling state operations as data structures, we can easily send them between processes or systems. That’s exactly what we will explore further into the notebook.
Additionally, with this approach we could keep track of the operations to implement an undo stack.
State reducers in practice
It’s likely that you already use the state reducer pattern quite often! If you’re familiar with Redux for React, that’s exactly the core concept behind it.
The name of Enum.reduce/3
isn’t accidental either! The accumulator
is your state, while the enumerable is a series of operations
we use to transform the data.
Finally, GenServer
encapsulates state and transforms it
based on events it receives, which makes it a kind of reducer
too. This analogy is a bit loose though, because GenServer
implementations generally have side effects.
State synchronization
Now let’s move to the actual problem. Imagine a number of processes (clients/users) that operate together on the same state. Initially every client gets a local copy of the current state, so they are aligned with each other, then they intend to apply modifications to the sate. The question is, how do we ensure all the local states stay in sync?
Let’s have a quick look at two models addressing this problem.
Eventual consistency
In this model the processes modify their local state and communicate changes to all other processes. The local states may differ at a specific point in time, but the goal is to ensure eventual consistency, meaning that eventually all local states get back in sync. The tricky part is that each process may get the peer changes in different order, so we need an algorithm that figures out how to integrate these changes such that the final state ends up the same for everyone.
So far the research produced two categories of algorithms - OT (Operational Transformation) and CRDT (Conflict-free Replicated Data Type). You can find a brief overview of these in this article. This approach surely sounds appealing, however these algorithms don’t easily generalize to arbitrary data structures and operations, and can be quite complex on top of that.
Serialized changes
As outlined above, the main challenge comes from the fact that processes may receive peer changes in different order, so an obvious solution would be to enforce a specific order of changes (serialize them). In terms of implementation all we need to do is pass all changes through an intermediate process that forwards each change to all client processes. The intermediate process handles only a single message at a time, so the changes are implicitly ordered and consequently every client process receives exactly the same stream of changes.
Naturally, the simplicity of this approach has its own pitfalls. All changes
need to go through the intermediate process before their result is reflected
in the local state, which is not viable in cases like text collaboration,
where the UI must update immediately on every keystroke. Also, enforcing the
order doesn’t eliminate race conditions, imagine two people removing an item
from one-item stack at the same time - we get two :pop
changes and no matter
which comes first, the second is invalid. Sometimes applying one change may
invalidate the intent of other changes applied afterwards.
With all that said, these limitations may not necessarily be relevant, in which case this model is a perfect fit, because it’s simple, not limited to specific data structures and ensures the local states change in exactly the same way over time.
Serialized changes and state reducer
This whole discussion on changes/operations applied to state sounds like state reducer, doesn’t it? In the serialized changes model, each process starts with a state and then keeps applying operations to it as they come. In other words, each process keeps reducing the state upon subsequent operations.
Now back to the code!
Synchronized state
Let’s define a data structure representing the piece of state we need
to keep in sync, along with the reduce
method handling a well defined
set of operations. That’s essentially what we did in the first section,
just a bit more organized.
Note: we are going to use the term “data” instead of “state” to avoid
confusion with GenServer
state later on.
defmodule Data do
defstruct stack: []
@derive Inspect
@type t :: %__MODULE__{stack: list()}
@type operation :: {:push, term()} | :pop | :reverse
@doc """
Returns initial data.
"""
@spec new() :: t()
def new(), do: %__MODULE__{}
@doc """
Applies the change specified by `operation` to `data`.
"""
@spec reduce(t(), operation()) :: t()
def reduce(data, operation)
def reduce(data, {:push, item}) do
update_in(data.stack, &[item | &1])
end
def reduce(data, :pop) do
update_in(data.stack, &tl/1)
end
def reduce(data, :reverse) do
update_in(data.stack, &Enum.reverse/1)
end
end
Server
Now we need a process for serializing the operations. It’s gonna be the source of truth for our data and all operations will go through this process to implicitly impose an order of the operations. Since it serves as a communication proxy for all the client processes, we are going to just call it a server.
In our example the server will do the minimal job of tracking the clients and broadcasting operations. In a real use case it could additionally coordinate some work, given its central role.
defmodule Server do
use GenServer
def start_link() do
GenServer.start_link(__MODULE__, [])
end
@doc """
Returns the current data and subscribes the caller
to change operations.
"""
def join(server) do
GenServer.call(server, :join)
end
@doc """
Notifies the server about intended change, which
gets broadcasted to all the clients.
"""
def submit_operation(server, operation) do
GenServer.cast(server, {:submit_operation, operation})
end
@impl true
def init(_opts) do
{:ok, %{clients: [], data: Data.new()}}
end
@impl true
def handle_call(:join, {pid, _}, state) do
Process.monitor(pid)
{:reply, state.data, %{state | clients: [pid | state.clients]}}
end
@impl true
def handle_cast({:submit_operation, operation}, state) do
# Send the operation to all the subscribed clients
broadcast_operation(operation, state.clients)
# Keep the updated data on the server, so that every
# new client can get the latest data
state = update_in(state.data, &Data.reduce(&1, operation))
[
:blue,
"Server #{inspect(self())} received #{inspect(operation)}, " <>
"new data: #{inspect(state.data)}"
]
|> IO.ANSI.format()
|> IO.puts()
{:noreply, state}
end
@impl true
def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
{:noreply, %{state | clients: List.delete(state.clients, pid)}}
end
defp broadcast_operation(operation, clients) do
# In a production app we would use a pubsub instead
for pid <- clients do
send(pid, {:operation, operation})
end
end
end
Client
Finally, we need a number of concurrent clients with a local copy of the data that we want to keep in sync.
In our example we imitate a client process by exposing
a submit_operation
function to simulate the client
making a change to the data. The client processes are
specific to given use case, for instance these could be
LiveView processes willing to keep some state in sync
and the operations would correspond to user interactions.
defmodule Client do
use GenServer
def start_link(server, opts \\ []) do
opts = put_in(opts[:server], server)
GenServer.start_link(__MODULE__, opts)
end
@doc """
Originates a change operation from the given client.
"""
def submit_operation(client, operation) do
GenServer.cast(client, {:submit_operation, operation})
end
@impl true
def init(opts) do
server = opts[:server]
color = opts[:color] || :light_black
current_data = Server.join(server)
{:ok, %{server: server, color: color, data: current_data}}
end
@impl true
def handle_cast({:submit_operation, operation}, state) do
Server.submit_operation(state.server, operation)
{:noreply, state}
end
@impl true
def handle_info({:operation, operation}, state) do
# Upon receiving an operation, we apply it to the local data
state = update_in(state.data, &Data.reduce(&1, operation))
[
state.color,
"Client #{inspect(self())} received #{inspect(operation)}, " <>
"new data: #{inspect(state.data)}"
]
|> IO.ANSI.format()
|> IO.puts()
{:noreply, state}
end
end
With these three pieces ready, let’s start a server with some clients:
{:ok, server} = Server.start_link()
{:ok, client_g} = Client.start_link(server, color: :green)
{:ok, client_y} = Client.start_link(server, color: :yellow)
And we can simulate change operations being submitted by the clients.
client_g |> Client.submit_operation({:push, 1})
client_y |> Client.submit_operation({:push, 2})
client_y |> Client.submit_operation({:push, 3})
client_y |> Client.submit_operation(:reverse)
client_g |> Client.submit_operation(:pop)
client_y |> Client.submit_operation(:pop)
If we look closely at the output, we may notice that the server receives operations in different order than the function calls above, that’s because they are sent by concurrent clients. However, the server, and consequently all the clients, get and process operations in the exact same order, ending up with the same data!
Race conditions
As mentioned eralier, with this approach we are still open to race conditions. In our example, imagine the following scenario:
Data.new()
# The stack has just a single element
|> Data.reduce({:push, 1})
# Two clients send a :pop change at the same time,
# it seems valid for both of them at that point
|> Data.reduce(:pop)
|> Data.reduce(:pop)
The kinds of race conditions and whether they are acceptable depend on the specific use case. The simplest solution is to validate incoming operations and ignore those that no longer make sense for the latest data.
Implementation-wise we can modify Data.reduce/2
to return
either {:ok, data}
or :error
.
defmodule DataWithValidation do
defstruct stack: []
@derive Inspect
@type t :: %__MODULE__{stack: list()}
@type operation :: {:push, term()} | :pop | :reverse
@doc """
Returns initial data.
"""
@spec new() :: t()
def new(), do: %__MODULE__{}
@doc """
Applies the change specified by `operation` to `data`.
"""
@spec reduce(t(), operation()) :: {:ok, t()} | :error
def reduce(data, operation)
def reduce(data, {:push, item}) do
{:ok, update_in(data.stack, &[item | &1])}
end
def reduce(data, :pop) do
# In this case using "with" is an overkill,
# but this generally makes for a neat pattern
# if the validation involves multiple checks
with true <- data.stack != [] do
{:ok, update_in(data.stack, &tl/1)}
else
_ -> :error
end
end
def reduce(data, :reverse) do
{:ok, update_in(data.stack, &Enum.reverse/1)}
end
end
alias DataWithValidation, as: Data
{:ok, data} = Data.new() |> Data.reduce({:push, 1})
# The first :pop operation is valid and updates the data
{:ok, data} = Data.reduce(data, :pop)
# The second :pop operation is invalid becuase the stack
# is empty, so we get an error instead
:error = Data.reduce(data, :pop)
The server/clients would then try applying the operation
to current data and ignore it if the result is an :error
.
Livebook
Livebook is a collaborative web application - multiple users can edit the same notebook at the same time. You can verify this yourself by opening the same session in two browser tabs and applying changes in either of them.
When it comes to the implementation, each session (notebook page) has a server process coordinating the work and multiple LiveView processes - one per each browser tab. All of these processes keep a local data with the notebook content and additional evaluation information.
To keep the data in sync Livebook uses the synchronization pattern outlined above for all of the operations. The only exception is editing cell content, which is solved using Operational Transformation to provide proper conflict resolution in simultaneous text editing. However, the information and events necessary for OT are also modeled as part of the data with corresponding operations.
Feel free to have a look at the source code for more insights.
Final notes
In this notebook we covered the state reducer pattern, which provides a data-centric API to state transformation. Then, we discussed the problem of state synchronization along with some approaches we may take to takle it. Finally, we implemented synchronized state by serializing concurrent operations and using state reducer for applying the updates.