⛩️ Dojo - Shifu
Mix.install([
{:kino, "~> 0.12.3"},
{:kino_vega_lite, "~> 0.1.11"},
{:phoenix_pubsub, "~> 2.1"},
{:phoenix, "~> 1.7"},
{:jason, "~> 1.4"},
{:phoenix_live_view, "~> 0.20.14"},
{:plug_cowboy, "~> 2.7"}
])
Conn
# :global.registered_names()
# list of the node names (with the erlang port) on your machine (Erlang Port Mapper Daemon
IO.inspect(Node.self())
Node.set_cookie(:enterthedojo)
:erl_epmd.names()
Configure Dojo
host = "localhost"
#
Application.put_env(:phoenix, :json_library, Jason)
Application.put_env(:dojo, Dojo.Endpoint,
url: [host: host],
http: [
ip: {0, 0, 0, 0, 0, 0, 0, 0},
port: 4000,
transport_options: [socket_opts: [:inet6]]
],
server: true,
live_view: [signing_salt: :crypto.strong_rand_bytes(8) |> Base.encode16()],
secret_key_base: :crypto.strong_rand_bytes(32) |> Base.encode16(),
pubsub_server: Dojo.PubSub
)
defmodule Dojo.Layouts do
use Phoenix.Component
def render("live.html", assigns) do
~H"""
const ImageInput = {
mounted(){
const DROP_CLASSES = ["bg-blue-100", "border-blue-300"]
this.boundHeight = parseInt(this.el.dataset.height)
this.boundWidth = parseInt(this.el.dataset.width)
this.inputEl = this.el.querySelector(`#${this.el.id}-input`)
this.previewEl = this.el.querySelector(`#${this.el.id}-preview`)
this.el.addEventListener("click", e => this.inputEl.click())
this.inputEl.addEventListener("change", e => this.loadFile(event.target.files))
this.el.addEventListener("dragover", e => {
e.stopPropagation()
e.preventDefault()
e.dataTransfer.dropEffect = "copy"
})
this.el.addEventListener("drop", e => {
e.stopPropagation()
e.preventDefault()
this.loadFile(e.dataTransfer.files)
})
this.el.addEventListener("dragenter", e => this.el.classList.add(...DROP_CLASSES))
this.el.addEventListener("drop", e => this.el.classList.remove(...DROP_CLASSES))
this.el.addEventListener("dragleave", e => {
if(!this.el.contains(e.relatedTarget)){ this.el.classList.remove(...DROP_CLASSES) }
})
},
loadFile(files){
const file = files && files[0]
if(!file){ return }
const reader = new FileReader()
reader.onload = (readerEvent) => {
const imgEl = document.createElement("img")
imgEl.addEventListener("load", (loadEvent) => {
this.setPreview(imgEl)
const blob = this.canvasToBlob(this.toCanvas(imgEl))
this.upload("image", [blob])
})
imgEl.src = readerEvent.target.result
}
reader.readAsDataURL(file)
},
setPreview(imgEl){
const previewImgEl = imgEl.cloneNode()
previewImgEl.style.maxHeight = "100%"
this.previewEl.replaceChildren(previewImgEl)
},
toCanvas(imgEl){
// We resize the image, such that it fits in the configured height x width, but
// keep the aspect ratio. We could also easily crop, pad or squash the image, if desired
const canvas = document.createElement("canvas")
const ctx = canvas.getContext("2d")
const widthScale = this.boundWidth / imgEl.width
const heightScale = this.boundHeight / imgEl.height
const scale = Math.min(widthScale, heightScale)
canvas.width = Math.round(imgEl.width * scale)
canvas.height = Math.round(imgEl.height * scale)
ctx.drawImage(imgEl, 0, 0, imgEl.width, imgEl.height, 0, 0, canvas.width, canvas.height)
return canvas
},
canvasToBlob(canvas){
const imageData = canvas.getContext("2d").getImageData(0, 0, canvas.width, canvas.height)
const buffer = this.imageDataToRGBBuffer(imageData)
const meta = new ArrayBuffer(8)
const view = new DataView(meta)
view.setUint32(0, canvas.height, false)
view.setUint32(4, canvas.width, false)
return new Blob([meta, buffer], {type: "application/octet-stream"})
},
imageDataToRGBBuffer(imageData){
const pixelCount = imageData.width * imageData.height
const bytes = new Uint8ClampedArray(pixelCount * 3)
for(let i = 0; i < pixelCount; i++) {
bytes[i * 3] = imageData.data[i * 4]
bytes[i * 3 + 1] = imageData.data[i * 4 + 1]
bytes[i * 3 + 2] = imageData.data[i * 4 + 2]
}
return bytes.buffer
}
}
const liveSocket = new LiveView.LiveSocket("/live", Phoenix.Socket, {hooks: {ImageInput}})
liveSocket.connect()
<%= @inner_content %>
"""
end
end
defmodule Dojo.ErrorView do
def render(_, _), do: "error"
end
defmodule Dojo.Router do
use Phoenix.Router
import Phoenix.LiveView.Router
pipeline :browser do
plug(:accepts, ["html"])
end
scope "/", Dojo do
pipe_through(:browser)
live("/", BookOneLive, :index)
end
end
defmodule Dojo.Endpoint do
use Phoenix.Endpoint, otp_app: :dojo
socket("/live", Phoenix.LiveView.Socket)
plug(Dojo.Router)
end
defmodule Dojo.Disciple do
defstruct name: "bruce lee", action: "building", node: "@localhost"
end
defmodule Dojo.PubSub do
@moduledoc """
Publish Subscriber Pattern
"""
alias Phoenix.PubSub
def subscribe(topic, opts \\ []) do
PubSub.subscribe(Dojo.PubSub, topic, opts)
end
def unsubscribe(topic) do
PubSub.unsubscribe(Dojo.PubSub, topic)
end
def publish({:ok, message}, event, topics) when is_list(topics) do
topics
|> Enum.map(fn topic -> publish(message, event, topic) end)
{:ok, message}
end
def publish({:ok, message}, event, topic) do
PubSub.broadcast(Dojo.PubSub, topic, {__MODULE__, event, message})
{:ok, message}
end
def publish(message, event, topics) when is_list(topics) do
topics |> Enum.map(fn topic -> publish(message, event, topic) end)
message
end
def publish(message, event, topic) when not is_nil(topic) do
PubSub.broadcast(Dojo.PubSub, topic, {__MODULE__, event, message})
message
end
end
defmodule Dojo.Gate do
use Phoenix.Tracker
# disciple tracker ::: quis custodiet ipsos custodes
def start_link(opts) do
opts = Keyword.merge([name: __MODULE__], opts)
Phoenix.Tracker.start_link(__MODULE__, opts, opts)
end
def init(opts) do
server = Keyword.fetch!(opts, :pubsub_server)
{:ok, %{pubsub_server: server, node_name: Phoenix.PubSub.node_name(server)}}
end
def handle_diff(diff, state) do
for {topic, {joins, leaves}} <- diff do
for {_key, meta} <- joins do
Task.start(fn ->
msg = {:join, topic, Map.put(meta, :topic, topic)}
# each tracker takes care of its own node
Phoenix.PubSub.direct_broadcast!(state.node_name, state.pubsub_server, topic, msg)
end)
end
for {_key, meta} <- leaves do
Task.start(fn ->
msg = {:leave, topic, Map.put(meta, :topic, topic)}
Phoenix.PubSub.direct_broadcast!(state.node_name, state.pubsub_server, topic, msg)
end)
end
end
{:ok, state}
end
def track(pid, topic, %Dojo.Disciple{name: username, action: state, node: node}) do
case Phoenix.Tracker.track(__MODULE__, pid, topic, username, %{
action: state,
name: username,
node: node,
online_at: System.os_time(:second)
}) do
{:ok, _ref} = resp ->
resp
{:error, {:already_tracked, _, _, _}} ->
Phoenix.Tracker.update(__MODULE__, pid, topic, username, %{
action: state,
name: username,
node: node,
online_at: System.os_time(:second)
})
end
end
def get_by_key(topic, key) do
Phoenix.Tracker.get_by_key(__MODULE__, topic, key)
end
def list(topic, timeout \\ 5000) do
__MODULE__
|> Phoenix.Tracker.Shard.name_for_topic(topic, pool_size())
|> GenServer.call({:list, topic}, timeout)
|> Phoenix.Tracker.State.get_by_topic(topic)
end
def list_users(topic),
do: Enum.map(list(topic), fn {_k, meta} -> Map.put(meta, :topic, topic) end)
defp pool_size() do
[{:pool_size, size}] = :ets.lookup(__MODULE__, :pool_size)
size
end
end
defmodule Dojo.Class do
def join(pid, book, disciple) do
topic = "class:" <> book
Dojo.Gate.track(pid, topic, disciple)
end
def whereis(username, book) do
Dojo.Gate.get_by_key("class:" <> book, username)
end
def listen(book) do
topic = "class:" <> book
Dojo.PubSub.subscribe(topic)
end
end
defmodule Dojo.BookOneLive do
use Phoenix.LiveView, layout: {Dojo.Layouts, :live}
def mount(_params, _session, socket) do
Dojo.Class.listen("book1")
dis =
Dojo.Gate.list_users("class:book1")
|> Enum.into(%{}, fn %{name: name} = dis -> {name, dis} end)
{:ok,
socket
|> assign(label: nil, running: false, task_ref: nil, disciples: dis)
|> allow_upload(:image,
accept: :any,
max_entries: 1,
max_file_size: 300_000,
progress: &handle_progress/3,
auto_upload: true
)}
end
def render(assigns) do
~H"""
Dojo Book 1
Run your friends model!
Enum.sort_by(&(elem(&1, 1).online_at), :desc)} class="w-64 h-64 text-white bg-sky-700 border-2 border-custom shadow cursor-pointer hover:border-red-500 transition-colors duration-200 ease-in-out inline-block mb-4 flex items-center justify-center">
<%= name %>
<%= dis.node %>
<.image_input id="image" upload={@uploads.image} height={224} width={224} />
<%= if @running do %>
<.spinner />
<% else %>
Output:
<%= @label || "Not running" %>
<% end %>
"""
end
defp image_input(assigns) do
~H"""
<.live_file_input upload={@upload} class="hidden" />
Drag an image file here or click to open file browser
"""
end
defp spinner(assigns) do
~H"""
"""
end
def handle_progress(:image, entry, socket) do
if entry.done? do
socket
|> consume_uploaded_entries(:image, fn meta, _ -> {:ok, File.read!(meta.path)} end)
|> case do
[binary] ->
image = decode_as_tensor(binary)
task =
Task.async(fn ->
# run thru student kernels
Nx.Serving.batched_run(PhoenixDemo.Serving, image)
end)
{:noreply, assign(socket, running: true, task_ref: task.ref)}
[] ->
{:noreply, socket}
end
else
{:noreply, socket}
end
end
defp decode_as_tensor(<>) do
data |> Nx.from_binary(:u8) |> Nx.reshape({height, width, 3})
end
# We need phx-change and phx-submit on the form for live uploads
def handle_event("noop", %{}, socket) do
{:noreply, socket}
end
def handle_info(
{:join, "class:book1", %{name: name} = disciple},
%{assigns: %{disciples: d}} = socket
) do
{:noreply,
socket
|> assign(:disciples, Map.put(d, name, disciple))}
end
def handle_info(
{:leave, "class:book1", %{name: name}},
%{assigns: %{disciples: d}} = socket
) do
{:noreply,
socket
|> assign(:disciples, Map.delete(d, name))}
end
def handle_info({ref, result}, %{assigns: %{task_ref: ref}} = socket) do
Process.demonitor(ref, [:flush])
%{predictions: [%{label: label}]} = result
{:noreply, assign(socket, label: label, running: false)}
end
end
{:ok, _} =
Supervisor.start_link(
[
{Phoenix.PubSub, name: Dojo.PubSub},
{Dojo.Gate,
[
name: Dojo.Gate,
pubsub_server: Dojo.PubSub,
pool_size: :erlang.system_info(:schedulers_online)
]},
Dojo.Endpoint
],
strategy: :one_for_one
)
defmodule Dojo.Module do
defmacro generate(named, func) do
quote do
module_name = Module.concat([Dojo, Mat, unquote(named)])
defmodule module_name do
def hello do
unquote(func)
# "Hello from #{unquote(name)} module"
end
end
end
end
end