BatchServing Hooks + LiveView
project_root = Path.expand("..", __DIR__)
Application.put_env(:demo, DemoWeb.Endpoint,
url: [host: "localhost"],
http: [ip: {127, 0, 0, 1}, port: 5055],
server: true,
secret_key_base: String.duplicate("a", 64),
live_view: [signing_salt: "demo-salt"],
adapter: Bandit.PhoenixAdapter
)
Mix.install([
{:bandit, "~> 1.5"},
{:phoenix, "~> 1.8.4"},
{:phoenix_live_view, "~> 1.1.0"},
:jason,
{:batch_serving, path: project_root}
])
Section
This example shows how to use hooks to track progress when processing many items.
Key idea:
-
Keep
BatchServinglong-lived and shared. - Emit one hook entry per input item
- Update the UI from hook events as batches complete
defmodule Demo.ProgressServing do
@behaviour BatchServing
def serving do
BatchServing.new(__MODULE__, :ok)
|> BatchServing.streaming(hooks: [:progress])
end
@impl true
def init(_inline_or_process, _, options) do
{:ok, %{options: options}}
end
@impl true
def handle_batch(%BatchServing.Batch{values: values}, partition, state) do
hooks = Enum.at(state.options, partition)[:hooks]
{:execute,
fn ->
# Simulate expensive work.
Process.sleep(1000)
output = Enum.map(values, &(&1 * &1))
if progress_hook = hooks[:progress] do
progress_hook.(
Enum.map(values, fn value ->
%{partition: partition, digit_count: Enum.count(Integer.digits(value))}
# %{partition: partition, digit_count: value}
end)
)
end
output
end, state}
end
end
defmodule DemoWeb.Live do
use Phoenix.LiveView, layout: {__MODULE__, :root}
def render(assigns) do
~H"""
Batch Progress Demo
Enter how many items to process, then click Start.
<%= if @running, do: "Running...", else: "Start" %>
<%= @processed %> / <%= @total %>
({@percent}%)
Total Digits Processed <%= @total_digits %>
<%= if @done do %>
Done.
<% end %>
- {result}
"""
end
def mount(_params, _session, socket) do
{:ok, reset_state(socket, 30)}
end
def reset_state(socket, count_input) do
assign(socket,
count_input: count_input,
running: false,
done: false,
total: count_input,
processed: 0,
percent: 0,
total_digits: 0,
results: []
)
end
def handle_event("start", %{"count" => count_text}, socket) do
count =
case Integer.parse(count_text) do
{n, _} when n > 0 -> n
_ -> 1
end
parent = self()
Task.start(fn ->
inputs = Enum.to_list(1..count)
DemoServing
|> BatchServing.dispatch_many!(inputs)
|> Stream.map(&IO.inspect/1)
|> Enum.reduce(%{processed: 0, total_digits: 0}, fn
{:progress, items}, acc ->
processed = acc.processed + Enum.count(items)
total_digits = acc.total_digits + Enum.sum_by(items, & &1.digit_count)
acc = %{processed: processed, total_digits: total_digits}
send(parent, {:progress, acc})
acc
|>IO.inspect()
{:batch, batch_output}, acc ->
send(parent, {:results, batch_output})
acc
end)
send(parent, :done)
end)
{:noreply,
reset_state(socket, count)
|> assign(running: true)}
end
def handle_info({:progress, %{processed: processed, total_digits: total_digits}}, socket) do
percent = trunc(processed * 100 / max(socket.assigns.total, 1))
{:noreply,
assign(socket,
processed: processed,
percent: percent,
total_digits: total_digits
)}
end
def handle_info({:results, results}, socket) do
{:noreply, assign(socket, results: socket.assigns.results ++ results)}
end
def handle_info(:done, socket) do
{:noreply, assign(socket, running: false, done: true, percent: 100)}
end
def root(assigns) do
~H"""
{@inner_content}
(() => {
const liveSocket = new window.LiveView.LiveSocket("/live", window.Phoenix.Socket)
liveSocket.connect()
})();
"""
end
end
defmodule DemoWeb.Router do
use Phoenix.Router
import Phoenix.LiveView.Router
pipeline :browser do
plug :accepts, ["html"]
end
scope "/" do
pipe_through :browser
live "/", DemoWeb.Live
end
end
defmodule DemoWeb.Endpoint do
use Phoenix.Endpoint, otp_app: :demo
socket "/live", Phoenix.LiveView.Socket
plug DemoWeb.Router
end
if Process.whereis(BatchServing.PG) == nil do
{:ok, _pg} =
Supervisor.start_link([BatchServing.create_serving_process_group_spec()],
strategy: :one_for_one
)
end
if Process.whereis(DemoServing) == nil do
{:ok, _serving} =
Supervisor.start_link(
[
{BatchServing,
serving: Demo.ProgressServing.serving(),
name: DemoServing,
batch_size: 4,
batch_timeout: 120,
partitions: 2}
],
strategy: :one_for_one
)
end
if Process.whereis(DemoWeb.Endpoint) == nil do
{:ok, _web} = Supervisor.start_link([DemoWeb.Endpoint], strategy: :one_for_one)
end
"Open http://localhost:5055"