Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Example: Debounce & Throttle

debounce_and_throttle.livemd

Example: Debounce & Throttle

Mix.install([
  {:protean, path: "./"},
  :phoenix_pubsub
])
Resolving Hex dependencies...
Dependency resolution completed:
New:
  phoenix_pubsub 2.1.1
* Getting phoenix_pubsub (Hex package)
==> phoenix_pubsub
Compiling 11 files (.ex)
Generated phoenix_pubsub app
==> protean
Compiling 18 files (.ex)
Generated protean app
:ok

Introduction

Debounce and throttle are techniques used to control streams of events, often in the assigns of user interfaces. Both are used to limit the flow of events.

  • Debounce limits events by buffering incoming events and only emitting the latest one after a certain amount of time has passed with no new events. Consider a real-time search field, for instance: the user starts typing, and each character emits a change. Instead of starting the search, canceling, and starting over on every keypress, you could debounce the events, only emitting a change (and triggering the search) after the user has stopped typing for a certain amount of time.
  • Throttle emits the first event immediately, but then waits a certain amount of time before emitting another. This turns a potentially rapid stream of events into a constant interval (so long as events are coming faster than the timeout period).

Debounce

defmodule Examples.Debounce do
  use Protean

  @type assigns :: %{
          timeout: non_neg_integer()
        }

  @machine [
    initial: :waiting,
    assigns: [
      timeout: 1_000
    ],
    states: [
      atomic(:waiting,
        on: [
          match(_, target: :debouncing)
        ]
      ),
      atomic(:debouncing,
        after: [
          delay(:timeout, target: :waiting, actions: :reply_with_latest_event)
        ],
        on: [
          match(_, target: :debouncing, internal: false)
        ]
      )
    ]
  ]

  @impl true
  def delay(:timeout, %{assigns: %{timeout: t}}, _), do: t

  @impl true
  def handle_action(:reply_with_latest_event, context, event) do
    {:reply, event, context}
  end
end
{:module, Examples.Debounce, <<70, 79, 82, 49, 0, 0, 36, ...>>, {:handle_action, 3}}

Our debounce machine relies on delayed- and self-transitions.

We start in a waiting state, awaiting any event. When the first event comes in, we switch to a debouncing state. debouncing will define a delayed transition based on whatever timeout value is set in the machine assigns. If the delay goes by, we emit the latest event with a :reply tuple.

However, if any events come in while we’re debouncing, we do an external self-transition, resetting the timeout:

debouncing: [
  # After the delay specified by the `delay(:timeout, context, event)`
  # callback, reply with the last event and transition back to waiting.
  after: [
    delay: :timeout,
    actions: :reply_last,
    target: "waiting"
  ],
  on: [
    # Transitioning to yourself is an internal transition by default,
    # which means that entry and exit actions aren't re-executed. Setting
    # internal: false makes it an external transition, so it's as if we
    # are leaving the debouncing state, which cancels the timer, and then
    # re-entering it, starting it again.
    match(_, target: "debouncing", actions: :set_last, internal: false)
  ]
]

Let’s spin one up and give it a whirl.

{:ok, debounce, id} = Protean.start_machine(Examples.Debounce)
Protean.subscribe(id, filter: :replies)
:ok

Our debouncer is emitting debounced events as answers, so we’ve subscribed only to them (as opposed to every state transition). This will put messages in our mainbox.

Now, let’s send some events. We expect our debounce machine to send us one on our subscription 1000ms after the last one we send.

Enum.each(1..5, fn _ ->
  Protean.send(debounce, DateTime.utc_now())
  :timer.sleep(250)
end)

receive do
  {_id, _context, [event]} ->
    [event: event, now: DateTime.utc_now()]
after
  5_000 -> IO.inspect(:nothing)
end
[event: ~U[2022-08-16 18:13:06.768085Z], now: ~U[2022-08-16 18:13:07.769273Z]]

By default, our debouncer is waiting for 1,000ms since the last event before emitting it, but we can control that behavior by providing a different :timeout in the machine’s assigns.

{:ok, debounce_500ms, id_500ms} =
  Protean.start_machine(Examples.Debounce, assigns: %{timeout: 500})

Protean.subscribe(id_500ms, filter: :replies)
:ok
# Same block as before, except using pid_500ms and events every 250ms
Enum.each(1..5, fn _ ->
  Protean.send(debounce_500ms, DateTime.utc_now())
  :timer.sleep(250)
end)

receive do
  {_id, _context, [event]} ->
    [event: event, now: DateTime.utc_now()]
after
  5_000 -> IO.inspect(:nothing)
end
[event: ~U[2022-08-16 18:13:52.028072Z], now: ~U[2022-08-16 18:13:52.529290Z]]

Throttle (WIP)

Throttle works a bit differently, emitting a constant stream of events, but no more frequently than specified by :timeout. This means we should receive an event immediately, and then the latest event every :timeout milliseconds.

defmodule Examples.Throttle do
  use Protean
  alias Protean.Action

  @type assigns :: %{
          timeout: non_neg_integer(),
          last_event: term()
        }

  @machine [
    initial: :waiting,
    assigns: [
      timeout: 1_000,
      last_event: nil
    ],
    on: [
      match(_, actions: :set_last)
    ],
    states: [
      atomic(:waiting,
        always: transition(target: :throttling, guard: :has_event?)
      ),
      atomic(:throttling,
        entry: :reply_and_clear_last,
        after: [
          delay(:timeout,
            guard: :has_event?,
            target: :throttling,
            internal: false
          ),
          delay(:timeout,
            guard: {:not, :has_event?},
            target: :throttling,
            internal: true
          )
        ]
      )
    ]
  ]

  @impl true
  def delay(:timeout, %{assigns: %{timeout: t}}, _), do: t

  @impl true
  def guard(:has_event?, %{assigns: %{last_event: nil}}, _), do: false
  def guard(:has_event?, _, _), do: true

  @impl true
  def handle_action(:set_last, context, event) do
    Action.assign(context, :last_event, event)
  end

  def handle_action(:reply_last, context, _) do
    {:reply, context.assigns.last_event, context}
  end

  def handle_action(:reply_and_clear_last, context, _) do
    {:reply, context.assigns.last_event, Action.assign(context, :last_event, nil)}
  end
end
{:module, Examples.Throttle, <<70, 79, 82, 49, 0, 0, 49, ...>>, {:handle_action, 3}}
{:ok, pid} = Examples.Throttle.start_link()
ref = Protean.subscribe(pid, :replies)
#Reference<0.2994576954.2563768321.243585>
Task.async(fn ->
  Enum.each(1..5, fn _ ->
    Protean.send(pid, DateTime.utc_now())
    :timer.sleep(500)
  end)
end)

Enum.each(1..4, fn _ ->
  receive do
    {:state, _, {_context, message}} ->
      time = DateTime.utc_now()
      IO.inspect({message, at: time}, label: "received")
  after
    1_500 -> :nothing
  end
end)
received: {[~U[2022-08-12 23:10:55.869569Z]], [at: ~U[2022-08-12 23:10:55.869726Z]]}
received: {[~U[2022-08-12 23:10:56.369774Z]], [at: ~U[2022-08-12 23:10:56.871015Z]]}
received: {[~U[2022-08-12 23:10:57.371776Z]], [at: ~U[2022-08-12 23:10:57.871956Z]]}
received: {[~U[2022-08-12 23:10:57.872757Z]], [at: ~U[2022-08-12 23:10:58.872945Z]]}
:ok
Process.info(self(), :messages)
{:messages,
 [
   {#Reference<0.2994576954.2563833857.243590>, :ok},
   {:DOWN, #Reference<0.2994576954.2563833857.243590>, :process, #PID<0.621.0>, :normal}
 ]}