Example: Debounce & Throttle
Mix.install([
{:protean, path: "./"},
:phoenix_pubsub
])
Resolving Hex dependencies...
Dependency resolution completed:
New:
phoenix_pubsub 2.1.1
* Getting phoenix_pubsub (Hex package)
==> phoenix_pubsub
Compiling 11 files (.ex)
Generated phoenix_pubsub app
==> protean
Compiling 18 files (.ex)
Generated protean app
:ok
Introduction
Debounce and throttle are techniques used to control streams of events, often in the assigns of user interfaces. Both are used to limit the flow of events.
-
Debounce limits events by buffering incoming events and only emitting the latest one after a certain amount of time has passed with no new events. Consider a real-time search field, for instance: the user starts typing, and each character emits a
change
. Instead of starting the search, canceling, and starting over on every keypress, you could debounce the events, only emitting a change (and triggering the search) after the user has stopped typing for a certain amount of time. - Throttle emits the first event immediately, but then waits a certain amount of time before emitting another. This turns a potentially rapid stream of events into a constant interval (so long as events are coming faster than the timeout period).
Debounce
defmodule Examples.Debounce do
use Protean
@type assigns :: %{
timeout: non_neg_integer()
}
@machine [
initial: :waiting,
assigns: [
timeout: 1_000
],
states: [
atomic(:waiting,
on: [
match(_, target: :debouncing)
]
),
atomic(:debouncing,
after: [
delay(:timeout, target: :waiting, actions: :reply_with_latest_event)
],
on: [
match(_, target: :debouncing, internal: false)
]
)
]
]
@impl true
def delay(:timeout, %{assigns: %{timeout: t}}, _), do: t
@impl true
def handle_action(:reply_with_latest_event, context, event) do
{:reply, event, context}
end
end
{:module, Examples.Debounce, <<70, 79, 82, 49, 0, 0, 36, ...>>, {:handle_action, 3}}
Our debounce machine relies on delayed- and self-transitions.
We start in a waiting
state, awaiting any event. When the first event comes in, we switch to a debouncing
state. debouncing
will define a delayed transition based on whatever timeout value is set in the machine assigns. If the delay goes by, we emit the latest event with a :reply
tuple.
However, if any events come in while we’re debouncing
, we do an external self-transition, resetting the timeout:
debouncing: [
# After the delay specified by the `delay(:timeout, context, event)`
# callback, reply with the last event and transition back to waiting.
after: [
delay: :timeout,
actions: :reply_last,
target: "waiting"
],
on: [
# Transitioning to yourself is an internal transition by default,
# which means that entry and exit actions aren't re-executed. Setting
# internal: false makes it an external transition, so it's as if we
# are leaving the debouncing state, which cancels the timer, and then
# re-entering it, starting it again.
match(_, target: "debouncing", actions: :set_last, internal: false)
]
]
Let’s spin one up and give it a whirl.
{:ok, debounce, id} = Protean.start_machine(Examples.Debounce)
Protean.subscribe(id, filter: :replies)
:ok
Our debouncer is emitting debounced events as answers, so we’ve subscribed only to them (as opposed to every state transition). This will put messages in our mainbox.
Now, let’s send some events. We expect our debounce machine to send us one on our subscription 1000ms after the last one we send.
Enum.each(1..5, fn _ ->
Protean.send(debounce, DateTime.utc_now())
:timer.sleep(250)
end)
receive do
{_id, _context, [event]} ->
[event: event, now: DateTime.utc_now()]
after
5_000 -> IO.inspect(:nothing)
end
[event: ~U[2022-08-16 18:13:06.768085Z], now: ~U[2022-08-16 18:13:07.769273Z]]
By default, our debouncer is waiting for 1,000ms since the last event before emitting it, but we can control that behavior by providing a different :timeout
in the machine’s assigns.
{:ok, debounce_500ms, id_500ms} =
Protean.start_machine(Examples.Debounce, assigns: %{timeout: 500})
Protean.subscribe(id_500ms, filter: :replies)
:ok
# Same block as before, except using pid_500ms and events every 250ms
Enum.each(1..5, fn _ ->
Protean.send(debounce_500ms, DateTime.utc_now())
:timer.sleep(250)
end)
receive do
{_id, _context, [event]} ->
[event: event, now: DateTime.utc_now()]
after
5_000 -> IO.inspect(:nothing)
end
[event: ~U[2022-08-16 18:13:52.028072Z], now: ~U[2022-08-16 18:13:52.529290Z]]
Throttle (WIP)
Throttle works a bit differently, emitting a constant stream of events, but no more frequently than specified by :timeout
. This means we should receive an event immediately, and then the latest event every :timeout
milliseconds.
defmodule Examples.Throttle do
use Protean
alias Protean.Action
@type assigns :: %{
timeout: non_neg_integer(),
last_event: term()
}
@machine [
initial: :waiting,
assigns: [
timeout: 1_000,
last_event: nil
],
on: [
match(_, actions: :set_last)
],
states: [
atomic(:waiting,
always: transition(target: :throttling, guard: :has_event?)
),
atomic(:throttling,
entry: :reply_and_clear_last,
after: [
delay(:timeout,
guard: :has_event?,
target: :throttling,
internal: false
),
delay(:timeout,
guard: {:not, :has_event?},
target: :throttling,
internal: true
)
]
)
]
]
@impl true
def delay(:timeout, %{assigns: %{timeout: t}}, _), do: t
@impl true
def guard(:has_event?, %{assigns: %{last_event: nil}}, _), do: false
def guard(:has_event?, _, _), do: true
@impl true
def handle_action(:set_last, context, event) do
Action.assign(context, :last_event, event)
end
def handle_action(:reply_last, context, _) do
{:reply, context.assigns.last_event, context}
end
def handle_action(:reply_and_clear_last, context, _) do
{:reply, context.assigns.last_event, Action.assign(context, :last_event, nil)}
end
end
{:module, Examples.Throttle, <<70, 79, 82, 49, 0, 0, 49, ...>>, {:handle_action, 3}}
{:ok, pid} = Examples.Throttle.start_link()
ref = Protean.subscribe(pid, :replies)
#Reference<0.2994576954.2563768321.243585>
Task.async(fn ->
Enum.each(1..5, fn _ ->
Protean.send(pid, DateTime.utc_now())
:timer.sleep(500)
end)
end)
Enum.each(1..4, fn _ ->
receive do
{:state, _, {_context, message}} ->
time = DateTime.utc_now()
IO.inspect({message, at: time}, label: "received")
after
1_500 -> :nothing
end
end)
received: {[~U[2022-08-12 23:10:55.869569Z]], [at: ~U[2022-08-12 23:10:55.869726Z]]}
received: {[~U[2022-08-12 23:10:56.369774Z]], [at: ~U[2022-08-12 23:10:56.871015Z]]}
received: {[~U[2022-08-12 23:10:57.371776Z]], [at: ~U[2022-08-12 23:10:57.871956Z]]}
received: {[~U[2022-08-12 23:10:57.872757Z]], [at: ~U[2022-08-12 23:10:58.872945Z]]}
:ok
Process.info(self(), :messages)
{:messages,
[
{#Reference<0.2994576954.2563833857.243590>, :ok},
{:DOWN, #Reference<0.2994576954.2563833857.243590>, :process, #PID<0.621.0>, :normal}
]}