Powered by AppSignal & Oban Pro

BatchServing Hooks + LiveView

docs/liveview_batch_progress_demo.livemd

BatchServing Hooks + LiveView

project_root = Path.expand("..", __DIR__)

Application.put_env(:demo, DemoWeb.Endpoint,
  url: [host: "localhost"],
  http: [ip: {127, 0, 0, 1}, port: 5055],
  server: true,
  secret_key_base: String.duplicate("a", 64),
  live_view: [signing_salt: "demo-salt"],
  adapter: Bandit.PhoenixAdapter
)

Mix.install([
  {:bandit, "~> 1.5"},
  {:phoenix, "~> 1.8.4"},
  {:phoenix_live_view, "~> 1.1.0"},
  :jason,
  {:batch_serving, path: project_root}
])

Section

This example shows how to use hooks to track progress when processing many items.

Key idea:

  • Keep BatchServing long-lived and shared.
  • Emit one hook entry per input item
  • Update the UI from hook events as batches complete
defmodule Demo.ProgressServing do
  @behaviour BatchServing

  def serving do
    BatchServing.new(__MODULE__, :ok)
    |> BatchServing.streaming(hooks: [:progress])
  end

  @impl true
  def init(_inline_or_process, _, options) do
    {:ok, %{options: options}}
  end

  @impl true
  def handle_batch(%BatchServing.Batch{values: values}, partition, state) do
    hooks = Enum.at(state.options, partition)[:hooks]

    {:execute,
     fn ->
       # Simulate expensive work.
       Process.sleep(1000)
       output = Enum.map(values, &(&1 * &1))

       if progress_hook = hooks[:progress] do
         progress_hook.(
           Enum.map(values, fn value ->
             %{partition: partition, digit_count: Enum.count(Integer.digits(value))}
             # %{partition: partition, digit_count: value}
           end)
         )
       end

       output
     end, state}
  end
end
defmodule DemoWeb.Live do
  use Phoenix.LiveView, layout: {__MODULE__, :root}

  def render(assigns) do
    ~H"""
    <div style="font-family: system-ui, sans-serif; max-width: 560px; margin: 2rem auto; line-height: 1.4;">
      <h2>Batch Progress Demo</h2>
      <p>Enter how many items to process, then click Start.</p>

      <form phx-submit="start" style="display: flex; gap: 0.5rem; align-items: center; margin: 1rem 0;">
        <input
          type="number"
          name="count"
          min="1"
          value={@count_input}
          disabled={@running}
          style="width: 140px; padding: 0.4rem;"
        />
        <button type="submit" disabled={@running} style="padding: 0.45rem 0.8rem;">
          <%= if @running, do: "Running...", else: "Start" %>
        </button>
      </form>

      <progress value={@processed} max={max(@total, 1)} style="width: 100%; height: 18px;"></progress>
      <p style="margin-top: 0.5rem;">
        <strong><%= @processed %></strong> / <strong><%= @total %></strong>
        ({@percent}%)
      </p>
      <p style="margin-top: 0.5rem;">
        Total Digits Processed <strong><%= @total_digits %></strong>
      </p>

      <%= if @done do %>
        <p style="color: #0b7a39; font-weight: 600;">Done.</p>
      <% end %>

      <ol >
        <li :for={result <- @results}>{result}</li>
      </ol>
    </div>
    """
  end

  def mount(_params, _session, socket) do
    {:ok, reset_state(socket, 30)}
  end

  def reset_state(socket, count_input) do
    assign(socket,
      count_input: count_input,
      running: false,
      done: false,
      total: count_input,
      processed: 0,
      percent: 0,
      total_digits: 0,
      results: []
    )
  end

  def handle_event("start", %{"count" => count_text}, socket) do
    count =
      case Integer.parse(count_text) do
        {n, _} when n > 0 -> n
        _ -> 1
      end

    parent = self()

    Task.start(fn ->
      inputs = Enum.to_list(1..count)

      DemoServing
      |> BatchServing.dispatch_many!(inputs)
      |> Stream.map(&IO.inspect/1)
      |> Enum.reduce(%{processed: 0, total_digits: 0}, fn
        {:progress, items}, acc ->
          processed = acc.processed + Enum.count(items)
          total_digits = acc.total_digits + Enum.sum_by(items, & &1.digit_count)

          acc = %{processed: processed, total_digits: total_digits}
          
          send(parent, {:progress, acc})

          acc
|>IO.inspect()
        {:batch, batch_output}, acc ->
          send(parent, {:results, batch_output})
          acc
      end)

      send(parent, :done)
    end)

    {:noreply,
     reset_state(socket, count)
     |> assign(running: true)}
  end

  def handle_info({:progress, %{processed: processed, total_digits: total_digits}}, socket) do
    percent = trunc(processed * 100 / max(socket.assigns.total, 1))

    {:noreply,
     assign(socket,
       processed: processed,
       percent: percent,
       total_digits: total_digits
     )}
  end

  def handle_info({:results, results}, socket) do
    {:noreply, assign(socket, results: socket.assigns.results ++ results)}
  end

  def handle_info(:done, socket) do
    {:noreply, assign(socket, running: false, done: true, percent: 100)}
  end

  def root(assigns) do
    ~H"""
    <!doctype html>
    <html>
      <head>
        <meta charset="utf-8" />
        <meta name="viewport" content="width=device-width, initial-scale=1" />
      <script src="https://cdn.jsdelivr.net/npm/phoenix@1.8.4/priv/static/phoenix.min.js"></script>
        <script src="https://cdn.jsdelivr.net/npm/phoenix_live_view@1.1.26/priv/static/phoenix_live_view.min.js"></script>
      </head>
      <body>
        {@inner_content}
        <script>
          (() => {
            const liveSocket = new window.LiveView.LiveSocket("/live", window.Phoenix.Socket)
            liveSocket.connect()
          })();
        </script>
      </body>
    </html>
    """
  end
end

defmodule DemoWeb.Router do
  use Phoenix.Router
  import Phoenix.LiveView.Router

  pipeline :browser do
    plug :accepts, ["html"]
  end

  scope "/" do
    pipe_through :browser
    live "/", DemoWeb.Live
  end
end

defmodule DemoWeb.Endpoint do
  use Phoenix.Endpoint, otp_app: :demo
  socket "/live", Phoenix.LiveView.Socket
  plug DemoWeb.Router
end
if Process.whereis(BatchServing.PG) == nil do
  {:ok, _pg} =
    Supervisor.start_link([BatchServing.create_serving_process_group_spec()],
      strategy: :one_for_one
    )
end

if Process.whereis(DemoServing) == nil do
  {:ok, _serving} =
    Supervisor.start_link(
      [
        {BatchServing,
         serving: Demo.ProgressServing.serving(),
         name: DemoServing,
         batch_size: 4,
         batch_timeout: 120,
         partitions: 2}
      ],
      strategy: :one_for_one
    )
end

if Process.whereis(DemoWeb.Endpoint) == nil do
  {:ok, _web} = Supervisor.start_link([DemoWeb.Endpoint], strategy: :one_for_one)
end

"Open http://localhost:5055"