Denver Elixir Meetup - Nx + LiveView
Introduction
- Paulo Valente
- R&D @ DockYard
- Mantenedor @ elixir-nx e elixir-grpc
- GitHub/Twitter: polvalente
In this talk, we’ll do a step-by-step walkthrough on how to add an AI feature to an already existing Phoenix LiveView app.
Along the way, we’ll discuss some architecture decisions and client-server communication optimizations.
As the example, we’ll use an Image Classification model with Bumblebee.
Setup
-
Create the Phoenix project:
mix phx.new elixir_days - Add the dependencies:
def deps do
[
# ...,
{:stb_image, "~> 0.6.8"},
{:bumblebee, "~> 0.5"},
{:exla, "~> 0.7.2"} # requer Make e gcc
]
end
-
Add the
:nxconfiguration toruntime.exs
config :nx, :default_backend, EXLA.Backend
config :nx, :default_defn_options, [compiler: EXLA]
Creating the LiveView
Let’s introduce a new webpage at /image_classification.
The page will capture images from the webcam, send them to the server, and return a ranked classification on which object is being captured.
# elixir_days_web/live/image_classification_live.ex
defmodule ElixirDaysWeb.ImageClassificationLive do
use ElixirDaysWeb, :live_view
def mount(_params, _session, socket) do
{:ok,
assign(socket,
frames: [],
predictions: %{},
total_bytes: 0,
total_seconds: 0,
start_time: System.monotonic_time(:second)
)}
end
def render(assigns) do
~H"""
<%= for prediction <- Enum.sort_by(@predictions, & &1.score, :desc) do %>
<%= prediction.label %>
<%= prediction.score %>
<% end %>
Total bytes: <%= "#{:erlang.float_to_binary(@total_bytes / 1_000_000, decimals: 2)} MB" %>
Total seconds: <%= @total_seconds %>
Average Data Rate: <%= if(@total_seconds == 0,
do: "0",
else: :erlang.float_to_binary(@total_bytes / (@total_seconds * 1_000), decimals: 2)
) <> " KB/s" %>
"""
end
def handle_event("frame", %{"frame" => frame_data}, socket) do
# Process the frame data here (e.g., store or analyze)
image = StbImage.read_binary!(Base.decode64!(frame_data))
%{predictions: classification} = Nx.Serving.batched_run(ImageClassifierServing, image)
classification =
Enum.map(classification, fn %{score: score, label: label} ->
if score > 0.1 do
score = trunc(score * 10 ** 3)
[label | _] = String.split(label, ",", parts: 2)
%{confident?: score > 0.5, score: "#{div(score, 10)}.#{rem(score, 10)}%", label: label}
else
%{confident?: false, score: "-", label: "-"}
end
end)
{:noreply,
assign(socket,
predictions: classification,
total_bytes: socket.assigns.total_bytes + byte_size(frame_data),
total_seconds: System.monotonic_time(:second) - socket.assigns.start_time
)}
end
end
# elixir_days_web/router.ex
# ...
scope "/", ElixirDaysWeb do
pipe_through(:browser)
# ...
live("/image_classification", ImageClassificationLive)
end
# ...
assets/js/webcam_hook.js
export const WebcamHookMount = (hook) => {
const fps = 1;
const interval = 1000 / fps;
function captureFrame(video) {
const canvas = document.createElement("canvas");
canvas.width = video.videoWidth;
canvas.height = video.videoHeight;
const context = canvas.getContext("2d");
context.drawImage(video, 0, 0, canvas.width, canvas.height);
const dataUrl = canvas.toDataURL("image/png");
const frameData = dataUrl.replace(/^data:image\/(png|jpeg);base64,/, "");
// Push the frame to LiveView
hook.pushEvent("frame", { frame: frameData });
}
const video = document.getElementById("webcam");
if (navigator.mediaDevices.getUserMedia) {
navigator.mediaDevices
.getUserMedia({ video: true })
.then(function (stream) {
video.srcObject = stream;
setInterval(captureFrame, interval, video);
})
.catch(function (error) {
console.log("Something went wrong!");
});
}
};
assets/js/app.js
...
import { WebcamHookMount } from "./webcam_hook";
WebcamHook = {
mounted() {
WebcamHookMount(this);
},
};
...
let liveSocket = new LiveSocket("/live", Socket, {
longPollFallbackMs: 2500,
params: { _csrf_token: csrfToken },
hooks: { WebcamHook },
});
...
Optimizing data transfer
We can add more client-side processing, and a little bit on the server side, so that the overall data transfered is reduced by an order of magnitude.
Instead of:
const dataUrl = canvas.toDataURL("image/png");
const frameData = dataUrl.replace(/^data:image\/(png|jpeg);base64,/, "");
hook.pushEvent("frame", { frame: frameData });
We can use:
import pako from "pako";
...
// Create a second canvas for resizing
const resizedCanvas = document.createElement("canvas");
const resizedContext = resizedCanvas.getContext("2d");
const targetWidth = 244;
const targetHeight = 244 * (canvas.height / canvas.width);
// Draw the original canvas onto the resized canvas
resizedContext.drawImage(
canvas,
0,
0,
canvas.width,
canvas.height,
0,
0,
targetWidth,
targetHeight
);
const dataUrl = resizedCanvas.toDataURL("image/png");
const frameData = dataUrl.replace(/^data:image\/(png|jpeg);base64,/, "");
// Convert the base64 string to a binary string
const binaryString = atob(frameData);
// Convert the binary string to a byte array
const byteArray = new Uint8Array(binaryString.length);
for (let i = 0; i < binaryString.length; i++) {
byteArray[i] = binaryString.charCodeAt(i);
}
// Compress the byte array using pako
const compressedData = pako.deflate(byteArray);
// Convert the compressed byte array to a base64 string
const compressedBase64 = btoa(
Array.from(compressedData)
.map((char) => String.fromCharCode(char))
.join("")
);
// Push the frame to LiveView
hook.pushEvent("frame", { frame: compressedBase64 });
This code reduced the image size to the effective size that the ResNet model will be using in the end. Then, we use the pako library to compress the data even more.
To decode on the server side, instead of:
frame_data = Base.decode64!(frame_data)
image = StbImage.read_binary!(frame_data)
We’ll use:
frame_data = Base.decode64!(frame_data)
z = :zlib.open()
:ok = :zlib.inflateInit(z)
decompressed_data = :zlib.inflate(z, frame_data)
:ok = :zlib.inflateEnd(z)
:zlib.close(z)
decompressed_data = IO.iodata_to_binary(decompressed_data)
image = StbImage.read_binary!(decompressed_data)
With that, we get a reduction on the bandwidth from the order of ~500KB/s to the order of ~50KB/s, without any performance loss.
On to distributed Nx.Serving
To make the code distributed, in a way that we have one or more web nodes and a gpu node that does the data processing heavy lifting, we can take advantage of BEAM clustering.
We just need to change our supervision tree to accomodate for optionally initializing the Nx.Serving, and Nx will take care of the rest after we connect the two BEAM nodes.
camera_serving =
if Application.get_env(:elixir_days, :start_camera_serving) do
[
{Nx.Serving,
name: ImageClassifierServing, serving: serving, batch_size: 10, batch_timeout: 100}
]
else
[]
end
children =
[
ElixirDaysWeb.Telemetry
] ++
camera_serving ++
[
{DNSCluster, query: Application.get_env(:elixir_days, :dns_cluster_query) || :ignore},
{Phoenix.PubSub, name: ElixirDays.PubSub},
# Start the Finch HTTP client for sending emails
{Finch, name: ElixirDays.Finch},
# Start a worker by calling: ElixirDays.Worker.start_link(arg)
# {ElixirDays.Worker, arg},
# Start to serve requests, typically the last entry
ElixirDaysWeb.Endpoint
]