Powered by AppSignal & Oban Pro

BatchServing Hooks + LiveView

docs/liveview_batch_progress_demo.livemd

BatchServing Hooks + LiveView

project_root = Path.expand("..", __DIR__)

Application.put_env(:demo, DemoWeb.Endpoint,
  url: [host: "localhost"],
  http: [ip: {127, 0, 0, 1}, port: 5055],
  server: true,
  secret_key_base: String.duplicate("a", 64),
  live_view: [signing_salt: "demo-salt"],
  adapter: Bandit.PhoenixAdapter
)

Mix.install([
  {:bandit, "~> 1.5"},
  {:phoenix, "~> 1.8.4"},
  {:phoenix_live_view, "~> 1.1.0"},
  :jason,
  {:batch_serving, path: project_root}
])

Section

This example shows how to use hooks to track progress when processing many items.

Key idea:

  • Keep BatchServing long-lived and shared.
  • Emit hook events
  • Update the UI using those hooks
defmodule Demo.ProgressServing do
  @behaviour BatchServing

  def serving do
    BatchServing.new(__MODULE__, :ok)
    |> BatchServing.streaming(hooks: [:progress])
  end

  @impl true
  def init(_type, :ok, [runtime_options | _]) do
    {:ok,
     %{
       hooks: Keyword.get(runtime_options, :hooks, %{}),
       count: 0,
       total: 0
     }}
  end

  @impl true
  def handle_batch(%BatchServing.Batch{values: values}, partition, state) do
    batch_count = Enum.count(values)
    batch_total = Enum.reduce(values, 0, fn value, acc -> acc + value * value end)

    new_state = %{
      state
      | count: state.count + batch_count,
        total: state.total + batch_total
    }

    {:execute,
     fn ->
       # Simulate expensive work.
       Process.sleep(1000)
       output = Enum.map(values, &(&1 * &1))

       if progress_hook = state.hooks[:progress] do
         progress_hook.(%{
           running_average: new_state.total / max(new_state.count, 1),
           number_processed: batch_count,
           partition: partition
         })
       end

       output
     end, new_state}
  end
end
defmodule DemoWeb.Live do
  use Phoenix.LiveView, layout: {__MODULE__, :root}

  def render(assigns) do
    ~H"""
    
      

Batch Progress Demo

Enter how many items to process, then click Start.

<%= if @running, do: "Running...", else: "Start" %>

<%= @processed %> / <%= @total %> ({@percent}%)

Running average <%= @running_average %>

<%= if @done do %>

Done.

<% end %>
  1. {result}
"""
end def mount(_params, _session, socket) do {:ok, reset_state(socket, 30)} end def reset_state(socket, count_input) do assign(socket, count_input: count_input, running: false, done: false, total: count_input, processed: 0, percent: 0, running_average: 0, results: [] ) end def handle_event("start", %{"count" => count_text}, socket) do count = case Integer.parse(count_text) do {n, _} when n > 0 -> n _ -> 1 end parent = self() Task.start(fn -> inputs = Enum.to_list(1..count) DemoServing |> BatchServing.dispatch_many!(inputs) |> Enum.reduce(0, fn {:progress, [partition: _partition, running_average: running_average, number_processed: number_processed] # %{running_average: running_average, number_processed: number_processed} }, acc -> total = acc + number_processed send( parent, {:progress, %{running_average: running_average, number_processed: total}} ) total {:batch, batch_output}, acc -> send(parent, {:results, batch_output}) acc end) send(parent, :done) end) {:noreply, reset_state(socket, count) |> assign(running: true)} end def handle_info( {:progress, %{running_average: running_average, number_processed: number_processed}}, socket ) do percent = trunc(number_processed * 100 / max(socket.assigns.total, 1)) {:noreply, assign(socket, processed: number_processed, percent: percent, running_average: running_average )} end def handle_info({:results, results}, socket) do {:noreply, assign(socket, results: socket.assigns.results ++ results)} end def handle_info(:done, socket) do {:noreply, assign(socket, running: false, done: true, percent: 100)} end def root(assigns) do ~H""" {@inner_content} (() => { const liveSocket = new window.LiveView.LiveSocket("/live", window.Phoenix.Socket) liveSocket.connect() })(); """ end end

defmodule DemoWeb.Router do
  use Phoenix.Router
  import Phoenix.LiveView.Router

  pipeline :browser do
    plug :accepts, ["html"]
  end

  scope "/" do
    pipe_through :browser
    live "/", DemoWeb.Live
  end
end

defmodule DemoWeb.Endpoint do
  use Phoenix.Endpoint, otp_app: :demo
  socket "/live", Phoenix.LiveView.Socket
  plug DemoWeb.Router
end
if Process.whereis(BatchServing.PG) == nil do
  {:ok, _pg} =
    Supervisor.start_link([BatchServing.create_serving_process_group_spec()],
      strategy: :one_for_one
    )
end

if Process.whereis(DemoServing) == nil do
  {:ok, _serving} =
    Supervisor.start_link(
      [
        {BatchServing,
         serving: Demo.ProgressServing.serving(),
         name: DemoServing,
         batch_size: 4,
         batch_timeout: 120,
         partitions: 2}
      ],
      strategy: :one_for_one
    )
end

if Process.whereis(DemoWeb.Endpoint) == nil do
  {:ok, _web} = Supervisor.start_link([DemoWeb.Endpoint], strategy: :one_for_one)
end

"Open http://localhost:5055"