BatchServing Hooks + LiveView
project_root = Path.expand("..", __DIR__)
Application.put_env(:demo, DemoWeb.Endpoint,
url: [host: "localhost"],
http: [ip: {127, 0, 0, 1}, port: 5055],
server: true,
secret_key_base: String.duplicate("a", 64),
live_view: [signing_salt: "demo-salt"],
adapter: Bandit.PhoenixAdapter
)
Mix.install([
{:bandit, "~> 1.5"},
{:phoenix, "~> 1.8.4"},
{:phoenix_live_view, "~> 1.1.0"},
:jason,
{:batch_serving, path: project_root}
])
Section
This example shows how to use hooks to track progress when processing many items.
Key idea:
-
Keep
BatchServinglong-lived and shared. - Emit hook events
- Update the UI using those hooks
defmodule Demo.ProgressServing do
@behaviour BatchServing
def serving do
BatchServing.new(__MODULE__, :ok)
|> BatchServing.streaming(hooks: [:progress])
end
@impl true
def init(_type, :ok, [runtime_options | _]) do
{:ok,
%{
hooks: Keyword.get(runtime_options, :hooks, %{}),
count: 0,
total: 0
}}
end
@impl true
def handle_batch(%BatchServing.Batch{values: values}, partition, state) do
batch_count = Enum.count(values)
batch_total = Enum.reduce(values, 0, fn value, acc -> acc + value * value end)
new_state = %{
state
| count: state.count + batch_count,
total: state.total + batch_total
}
{:execute,
fn ->
# Simulate expensive work.
Process.sleep(1000)
output = Enum.map(values, &(&1 * &1))
if progress_hook = state.hooks[:progress] do
progress_hook.(%{
running_average: new_state.total / max(new_state.count, 1),
number_processed: batch_count,
partition: partition
})
end
output
end, new_state}
end
end
defmodule DemoWeb.Live do
use Phoenix.LiveView, layout: {__MODULE__, :root}
def render(assigns) do
~H"""
Batch Progress Demo
Enter how many items to process, then click Start.
<%= if @running, do: "Running...", else: "Start" %>
<%= @processed %> / <%= @total %>
({@percent}%)
Running average <%= @running_average %>
<%= if @done do %>
Done.
<% end %>
- {result}
"""
end
def mount(_params, _session, socket) do
{:ok, reset_state(socket, 30)}
end
def reset_state(socket, count_input) do
assign(socket,
count_input: count_input,
running: false,
done: false,
total: count_input,
processed: 0,
percent: 0,
running_average: 0,
results: []
)
end
def handle_event("start", %{"count" => count_text}, socket) do
count =
case Integer.parse(count_text) do
{n, _} when n > 0 -> n
_ -> 1
end
parent = self()
Task.start(fn ->
inputs = Enum.to_list(1..count)
DemoServing
|> BatchServing.dispatch_many!(inputs)
|> Enum.reduce(0, fn
{:progress,
[partition: _partition, running_average: running_average, number_processed: number_processed]
# %{running_average: running_average, number_processed: number_processed}
}, acc ->
total = acc + number_processed
send(
parent,
{:progress, %{running_average: running_average, number_processed: total}}
)
total
{:batch, batch_output}, acc ->
send(parent, {:results, batch_output})
acc
end)
send(parent, :done)
end)
{:noreply,
reset_state(socket, count)
|> assign(running: true)}
end
def handle_info(
{:progress, %{running_average: running_average, number_processed: number_processed}},
socket
) do
percent = trunc(number_processed * 100 / max(socket.assigns.total, 1))
{:noreply,
assign(socket,
processed: number_processed,
percent: percent,
running_average: running_average
)}
end
def handle_info({:results, results}, socket) do
{:noreply, assign(socket, results: socket.assigns.results ++ results)}
end
def handle_info(:done, socket) do
{:noreply, assign(socket, running: false, done: true, percent: 100)}
end
def root(assigns) do
~H"""
{@inner_content}
(() => {
const liveSocket = new window.LiveView.LiveSocket("/live", window.Phoenix.Socket)
liveSocket.connect()
})();
"""
end
end
defmodule DemoWeb.Router do
use Phoenix.Router
import Phoenix.LiveView.Router
pipeline :browser do
plug :accepts, ["html"]
end
scope "/" do
pipe_through :browser
live "/", DemoWeb.Live
end
end
defmodule DemoWeb.Endpoint do
use Phoenix.Endpoint, otp_app: :demo
socket "/live", Phoenix.LiveView.Socket
plug DemoWeb.Router
end
if Process.whereis(BatchServing.PG) == nil do
{:ok, _pg} =
Supervisor.start_link([BatchServing.create_serving_process_group_spec()],
strategy: :one_for_one
)
end
if Process.whereis(DemoServing) == nil do
{:ok, _serving} =
Supervisor.start_link(
[
{BatchServing,
serving: Demo.ProgressServing.serving(),
name: DemoServing,
batch_size: 4,
batch_timeout: 120,
partitions: 2}
],
strategy: :one_for_one
)
end
if Process.whereis(DemoWeb.Endpoint) == nil do
{:ok, _web} = Supervisor.start_link([DemoWeb.Endpoint], strategy: :one_for_one)
end
"Open http://localhost:5055"