BatchServing Hooks + LiveView
project_root = Path.expand("..", __DIR__)
Application.put_env(:demo, DemoWeb.Endpoint,
url: [host: "localhost"],
http: [ip: {127, 0, 0, 1}, port: 5055],
server: true,
secret_key_base: String.duplicate("a", 64),
live_view: [signing_salt: "demo-salt"],
adapter: Bandit.PhoenixAdapter
)
Mix.install([
{:bandit, "~> 1.5"},
{:phoenix, "~> 1.8.4"},
{:phoenix_live_view, "~> 1.1.0"},
:jason,
{:batch_serving, path: project_root}
])
Section
This example shows how to use hooks to track progress when processing many items.
Key idea:
-
Keep
BatchServinglong-lived and shared. - Emit one hook entry per input item
- Update the UI from hook events as batches complete
defmodule Demo.ProgressServing do
@behaviour BatchServing
def serving do
BatchServing.new(__MODULE__, :ok)
|> BatchServing.streaming(hooks: [:progress])
end
@impl true
def init(_inline_or_process, _, options) do
{:ok, %{options: options}}
end
@impl true
def handle_batch(%BatchServing.Batch{values: values}, partition, state) do
hooks = Enum.at(state.options, partition)[:hooks]
{:execute,
fn ->
# Simulate expensive work.
Process.sleep(1000)
output = Enum.map(values, &(&1 * &1))
if progress_hook = hooks[:progress] do
progress_hook.(
Enum.map(values, fn value ->
%{partition: partition, digit_count: Enum.count(Integer.digits(value))}
# %{partition: partition, digit_count: value}
end)
)
end
output
end, state}
end
end
defmodule DemoWeb.Live do
use Phoenix.LiveView, layout: {__MODULE__, :root}
def render(assigns) do
~H"""
<div style="font-family: system-ui, sans-serif; max-width: 560px; margin: 2rem auto; line-height: 1.4;">
<h2>Batch Progress Demo</h2>
<p>Enter how many items to process, then click Start.</p>
<form phx-submit="start" style="display: flex; gap: 0.5rem; align-items: center; margin: 1rem 0;">
<input
type="number"
name="count"
min="1"
value={@count_input}
disabled={@running}
style="width: 140px; padding: 0.4rem;"
/>
<button type="submit" disabled={@running} style="padding: 0.45rem 0.8rem;">
<%= if @running, do: "Running...", else: "Start" %>
</button>
</form>
<progress value={@processed} max={max(@total, 1)} style="width: 100%; height: 18px;"></progress>
<p style="margin-top: 0.5rem;">
<strong><%= @processed %></strong> / <strong><%= @total %></strong>
({@percent}%)
</p>
<p style="margin-top: 0.5rem;">
Total Digits Processed <strong><%= @total_digits %></strong>
</p>
<%= if @done do %>
<p style="color: #0b7a39; font-weight: 600;">Done.</p>
<% end %>
<ol >
<li :for={result <- @results}>{result}</li>
</ol>
</div>
"""
end
def mount(_params, _session, socket) do
{:ok, reset_state(socket, 30)}
end
def reset_state(socket, count_input) do
assign(socket,
count_input: count_input,
running: false,
done: false,
total: count_input,
processed: 0,
percent: 0,
total_digits: 0,
results: []
)
end
def handle_event("start", %{"count" => count_text}, socket) do
count =
case Integer.parse(count_text) do
{n, _} when n > 0 -> n
_ -> 1
end
parent = self()
Task.start(fn ->
inputs = Enum.to_list(1..count)
DemoServing
|> BatchServing.dispatch_many!(inputs)
|> Stream.map(&IO.inspect/1)
|> Enum.reduce(%{processed: 0, total_digits: 0}, fn
{:progress, items}, acc ->
processed = acc.processed + Enum.count(items)
total_digits = acc.total_digits + Enum.sum_by(items, & &1.digit_count)
acc = %{processed: processed, total_digits: total_digits}
send(parent, {:progress, acc})
acc
|>IO.inspect()
{:batch, batch_output}, acc ->
send(parent, {:results, batch_output})
acc
end)
send(parent, :done)
end)
{:noreply,
reset_state(socket, count)
|> assign(running: true)}
end
def handle_info({:progress, %{processed: processed, total_digits: total_digits}}, socket) do
percent = trunc(processed * 100 / max(socket.assigns.total, 1))
{:noreply,
assign(socket,
processed: processed,
percent: percent,
total_digits: total_digits
)}
end
def handle_info({:results, results}, socket) do
{:noreply, assign(socket, results: socket.assigns.results ++ results)}
end
def handle_info(:done, socket) do
{:noreply, assign(socket, running: false, done: true, percent: 100)}
end
def root(assigns) do
~H"""
<!doctype html>
<html>
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<script src="https://cdn.jsdelivr.net/npm/phoenix@1.8.4/priv/static/phoenix.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/phoenix_live_view@1.1.26/priv/static/phoenix_live_view.min.js"></script>
</head>
<body>
{@inner_content}
<script>
(() => {
const liveSocket = new window.LiveView.LiveSocket("/live", window.Phoenix.Socket)
liveSocket.connect()
})();
</script>
</body>
</html>
"""
end
end
defmodule DemoWeb.Router do
use Phoenix.Router
import Phoenix.LiveView.Router
pipeline :browser do
plug :accepts, ["html"]
end
scope "/" do
pipe_through :browser
live "/", DemoWeb.Live
end
end
defmodule DemoWeb.Endpoint do
use Phoenix.Endpoint, otp_app: :demo
socket "/live", Phoenix.LiveView.Socket
plug DemoWeb.Router
end
if Process.whereis(BatchServing.PG) == nil do
{:ok, _pg} =
Supervisor.start_link([BatchServing.create_serving_process_group_spec()],
strategy: :one_for_one
)
end
if Process.whereis(DemoServing) == nil do
{:ok, _serving} =
Supervisor.start_link(
[
{BatchServing,
serving: Demo.ProgressServing.serving(),
name: DemoServing,
batch_size: 4,
batch_timeout: 120,
partitions: 2}
],
strategy: :one_for_one
)
end
if Process.whereis(DemoWeb.Endpoint) == nil do
{:ok, _web} = Supervisor.start_link([DemoWeb.Endpoint], strategy: :one_for_one)
end
"Open http://localhost:5055"