KvQueue - A distributed durable queue with erlfdb
Mix.install([
{:erlfdb, "~> 0.2.2"}
])
Introduction
The FoundationDB Layer Concept means that our query execution is detached from our storage servers, allowing a database operator to dynamically scale compute nodes according to current query workloads. However, there is also an advantage for application developers: when combined with transactions, a Layer allows us to create powerful data structures as building blocks for an application.
In this tutorial, we’ll create a Layer that is a performant durable queue. Then, we’ll consume items from this queue using a gen_server
.
Our queue will prioritize throughput of the producer. This is a common design choice for a durable queue because it ensures our producer can quickly move on to other operations in our application, without costly locks or conflicts to slow it down.
With transactional isolation guarantees from FDB, we can create as many parallel and distributed producers and consumers as we like; a given message will always be consumed exactly once.
Start a sandbox
First, we’ll create a sandbox database that we can freely write to and delete from. erlfdb_sandbox
starts a single fdbserver process and writes data to a temporary directory. It may take a few seconds the first time you execute it.
Db = erlfdb_sandbox:open().
Defining the Queue
We’ll define a standard FIFO queue with push and pop operations. Our main challenge is laying out the queue onto FDB key-value pairs. It’s up to us to design a keyspace that will fit our design specifications. So, let’s discuss it now.
Key-value binary format
Our keyspace will use the FDB Tuple Layer for constructing the key format. We’ll document our keys and values with this simple notation:
{} => ソ
Queue name
We’re ready to talk keys! First, we’ll hard-code the name of our queue to <<"q">>
. This makes our queue a singleton in the database, which keeps our code concise for this tutorial. The data structures you create should allow a developer to specify certain details like the name to avoid key conflicts.
All our key-values will be of the form:
{q, ...} => ソ
Queue length
Next, in order to keep track of the length of the queue, we’ll keep track of counter values called <<"npush">>
and <<"npop">>
, which are the total number of pushes and pops executed,
respectively. Splitting the counters is an important part of how the consumer works, which we’ll get to later.
{q, npush} => ソ
{q, npop} => ソ
At any given time, the length of the queue is equal to -
. We’ll have to make sure these counters are safely managed so that there are no race conditions. (We don’t want negative values!)
Queue items
Finally, keys for items in the queue include <<"val">>
in the key:
{q, val, } => ソ
where is a placeholder for a versionstamp, and
is the content of item stored in the queue.
A Versionstamp is a unique monotinically increasing integer generated by FDB at commit-time. We have to use special client-side functions that operate on “incomplete versionstamps” because the client cannot know what the value of the versionstamp is before the commit.
Using a versionstamp for pushing items onto the queue ensures that there is no key contention for parallel pushes, which greatly increases producer throughput.
Code time!
Once you have a good grasp of the keyspace, writing the code becomes pretty straightforward!
-module(kv_queue).
-export([
delete/1,
push/2,
consume_k/2,
pop/1,
pop_k/2,
len/1,
queue/1,
watch/1
]).
-define(NAME, <<"q">>).
-define(VAL, <<"val">>).
-define(NPOP, <<"npop">>).
-define(NPUSH, <<"npush">>).
% This is known as an "incomplete versionstamp"
-define(VS(Tx), {
versionstamp,
16#ffffffffffffffff,
16#ffff,
erlfdb:get_next_tx_id(Tx)
}).
delete(Tx) ->
{S, E} = erlfdb_tuple:range({?NAME}),
erlfdb:clear_range(Tx, S, E).
push(Tx, Val) ->
Key = erlfdb_tuple:pack_vs({?NAME, ?VAL, ?VS(Tx)}),
erlfdb:set_versionstamped_key(Tx, Key, Val),
erlfdb:add(Tx, erlfdb_tuple:pack({?NAME, ?NPUSH}), 1).
pop(Tx) ->
case pop_k(Tx, 1) of
{_, [Val]} -> {ok, Val};
{_, []} -> error
end.
pop_k(Tx, K) ->
{QS, QE} = erlfdb_tuple:range({?NAME, ?VAL}),
case erlfdb:get_range(Tx, QS, QE, [{limit, K}, {wait, true}]) of
[] ->
{{error, empty}, []};
KVs=[{S, _}|_] ->
N = length(KVs),
{E, _} = lists:last(KVs),
erlfdb:clear_range(Tx, S, erlfdb_key:strinc(E)),
erlfdb:add(Tx, erlfdb_tuple:pack({?NAME, ?NPOP}), N),
Status = if N == K -> ok; true -> {error, empty} end,
{Status, [ V || {_, V} <- KVs ]}
end.
len(Tx) ->
F = [
erlfdb:get(Tx, erlfdb_tuple:pack({?NAME, ?NPUSH})),
erlfdb:get(Tx, erlfdb_tuple:pack({?NAME, ?NPOP}))
],
[Npush, Npop] = [ decode_as_int(X, 0) || X <- erlfdb:wait_for_all(F)],
Npush - Npop.
queue(Tx) ->
{QS, QE} = erlfdb_tuple:range({?NAME, ?VAL}),
KVs = erlfdb:get_range(Tx, QS, QE, [{wait, true}]),
[ V || {_, V} <- KVs ].
decode_as_int(not_found, Default) -> Default;
decode_as_int(Val, _Default) -> binary:decode_unsigned(Val, little).
consume_k(Tx, K) ->
case pop_k(Tx, K) of
{ok, Vals} ->
{Vals, undefined};
{{error, empty}, Vals} ->
{Vals, watch(Tx)}
end.
watch(Tx) ->
erlfdb:watch(Tx, erlfdb_tuple:pack({?NAME, ?NPUSH})).
Functions push/2
and pop_k/2
are the two most important functions to focus on for now.
We’ll discuss consume_k/2
, and watch/1
later, when we define our consumer.
Queue push
When we push an item onto the queue, we want to make sure that there is no key contention. So,
-
We insert the item into the
val
keyspace with an incomplete versionstamp in the key. This ensures thatn
parallel transactions will getn
unique versionstamps and insert into the queue in the order of the serialized commits, with no key conflicts. -
We do an atomic add on the
npush
key. FDB atomic operations are CRDTs. That is, FDB can resolve parallel atomic operations without key conflicts. An interesting consequence of this is that our client does not know the value of the counter when incrementing it.
Queue pop
When we pop k
items from the front of the queue, we want to make sure we safely delete them in the same transaction.
-
A
get_range
withlimit
will read up tok
items in the range and then stop. -
When values are found, we use
clear_range
carefully to only touch those we’ve retrieved, and increment thenpop
counter accordingly. - Smart return values will help our consumer know when to keep consuming and when to wait.
Try it out!
We can push any item represented by a binary()
onto the queue. If you execute this multiple times, a <<"hello">>
item will be added each time.
erlfdb:transactional(Db, fun(Tx) -> kv_queue:push(Tx, <<"hello">>) end).
We can view the entire queue at once. This is not how we will be consuming items, but it’s nice to see them all at the same time.
erlfdb:transactional(Db, fun kv_queue:queue/1).
We can pop a single item from the front of the queue. If you execute this until the queue is empty, you’ll receive error
as the response. That just means that the queue is empty.
erlfdb:transactional(Db, fun kv_queue:pop/1).
If you’ve emptied the queue, the length will be 0
. Feel free to go back and add some more items.
erlfdb:transactional(Db, fun kv_queue:len/1).
Defining the Consumer
Our consumer is a gen_server
. We’ll start any number of them, and each one will monitor the queue for items. There is no coordination between consumers. They will each try to empty the queue as quickly as possible, and then wait for a signal to start once again.
The consumer will be configurable to consume K
items from a single transaction. Increasing K
increases consumer throughput quite dramatically.
Note: consumers will have lots of key conflicts with each other because they are all vying for the items at the front of the queue. Other clever optimizations can be done to reduce conflicts, but we’re skipping that for now.
-module(kv_consumer).
-export([
start_link/1,
init/1,
handle_cast/2,
handle_call/3,
handle_info/2
]).
-behaviour(gen_server).
start_link(InitArgs) ->
gen_server:start_link(?MODULE, InitArgs, []).
init([Db, K]) ->
gen_server:cast(self(), consume),
{ok, #{db => Db, k => K, watch => undefined}}.
handle_cast(consume, S) ->
#{db := Db, k := K} = S,
{Items, Watch} =
erlfdb:transactional(Db,
fun(Tx) -> kv_queue:consume_k(Tx, K) end
),
[ log("Consumed items ~p~n", [Items]) || Items =/= [] ],
case Watch of
undefined ->
gen_server:cast(self(), consume);
_ ->
log("Waiting for push~n", [])
end,
{noreply, S#{watch => Watch}}.
handle_call(_Call, _From, _S) ->
erlang:error(function_clause).
handle_info({Ref, ready}, S=#{watch := {erlfdb_future, Ref, _}}) ->
gen_server:cast(self(), consume),
{noreply, S#{watch => undefined}};
handle_info(_Info, S) ->
{noreply, S}.
log(F, A) ->
io:format("~p: " ++ F, [self()|A]).
Watch for pushes
Up above we skipped over the watch/2
function. Here it shines. Via consume_k/2
, when the queue is empty, the consumer receives a “watch” on the npush
counter. Whenever the npush
key’s value is changed, FDB resolves all watches on that key: each process that created a watch receives a {reference(), ready}
message on its message queue.
Remember earlier we decided to track the queue length with two counters instead of one? This is why: we don’t want our consumers to be signaled by other consumers; we only want to know when new items are available. By splitting the queue length into npush
and npop
, we allow watch creation only on npush
.
The consumer uses the watch to enter an idle state while the queue is empty, so we don’t have to continuously poll for the existence of queue items.
In summary, consume_k/2
will attempt to consume K
items from the front of the queue. If any number less than K
is found, then a watch is created, and the consumer goes into an idle state.
Processing queue items
It’s critically important to do all processing of queue items outside of the FDB transaction. For our simple case, we’re just logging the items, but notice that our log is not within the erlfdb:transactional/2
function. Because a transaction function can execute multiple times for a single commit (e.g. when there are key conflicts), side-effects inside transactions are quite dangerous.
Supervisor
This is a standard simple_one_for_one
supervisor. Nothing special about it. We’re defining it here to help us keep track of all the consumers we’re about to start.
-module(kv_consumer_sup).
-behaviour(supervisor).
-export([start_link/0, start_child/2, terminate_all_children/0, init/1]).
start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []).
start_child(Db, K) ->
supervisor:start_child(?MODULE, [[Db, K]]).
terminate_all_children() ->
[ supervisor:terminate_child(?MODULE, X)
|| {_, X, _, _} <- supervisor:which_children(?MODULE) ].
init([]) ->
SupFlags = #{strategy => simple_one_for_one},
ChildSpec = #{
id => kv_consumer,
start => {kv_consumer, start_link, []}
},
{ok, {SupFlags, [ChildSpec]}}.
Let’s get our supervisor running.
kv_consumer_sup:start_link().
Now, we can start any number of consumers we want. You can even create them with different K
values if you’d like. If your queue has any items in it, the consumer will process them immediately once it’s started.
kv_consumer_sup:start_child(Db, 1).
Defining the Producer
Finally, we need a bit of code that will throw items onto the queue with reckless abandon. Good thing we have BEAM.
When you execute this, you should expect the producer to finish before the consumers, assuming you’ve kept the consumers’ K
value in the single digits. Either way, the producer should be very quick because there are no key conflicts.
You’ll see the log output from each consumer in the meantime, and a “Waiting for push” log when each goes idle.
PushFun = fun(X) ->
erlfdb:transactional(Db, fun(Tx) -> kv_queue:push(Tx, integer_to_binary(X)) end)
end,
Seq = lists:seq(1, 1000),
timer:tc(fun() ->
Self = self(),
[ spawn(fun() -> PushFun(X), Self ! ok end) || X <- Seq ],
(fun
Recv(0) -> ok;
Recv(N) -> receive ok -> Recv(N-1) end
end)(length(Seq))
end).
Cleanup (optional)
To cleanup, you can stop the supervisor and delete the queue.
catch(gen_server:stop(whereis(kv_consumer_sup))),
erlfdb:transactional(Db, fun kv_queue:delete/1).