Serial Processing
Mix.install([
{:circuits_uart, "~> 1.3"},
{:vega_lite, "~> 0.1.5"},
{:kino_vega_lite, "~> 0.1.7"},
{:kino, "~> 0.8.0"}
])
Introduction
This notebook connects to a serial port and does live processing of the lines received over it (assuming that numbers are transmitted). The following kinds of processing are supported:
- Live plotting of running average.
- Live plotting of a heatmap of time and value axes.
- Live density plot over a window of values.
- Timestamping and logging of each line to a file.
Note: This notebook is designed for ESP32-DEVKIT-C devices, and assumes a bitrate of 115200 baud. This can be changed in the Receiver
and/or Discovery
modules.
Discovery
uarts = Circuits.UART.enumerate()
Reception
Define GenServer to handle connection to a single serial port.
defmodule Receiver do
use GenServer
# interface
def start_link(device, speed) do
state = %{device: device, speed: speed, incoming: []}
GenServer.start(Receiver, state, name: via_tuple(device))
end
def fetch(device) do
{:incoming, entries} = GenServer.call(via_tuple(device), {:fetch})
entries
end
def started?(device) do
case Registry.lookup(:receiver_registry, {__MODULE__, device}) do
[] -> false
_ -> true
end
end
def child_spec(device, speed) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [device, speed]},
type: :worker,
restart: :permanent,
shutdown: 500
}
end
# helpers
defp via_tuple(device) do
{:via, Registry, {:receiver_registry, {__MODULE__, device}}}
end
# callbacks
@impl true
def init(state) do
%{device: device, speed: speed} = state
{:ok, uart_pid} = Circuits.UART.start_link()
:ok =
Circuits.UART.open(uart_pid, device,
speed: speed,
active: true,
framing: Circuits.UART.Framing.Line
)
{:ok, Map.put(state, :pid, uart_pid)}
end
# receive line
@impl true
def handle_info({:circuits_uart, _, payload}, state) do
%{incoming: incoming} = state
# IO.puts("Message: " <> payload)
entry = %{
t: :os.system_time(:milli_seconds),
value: String.to_integer(String.trim_trailing(payload))
}
{:noreply, %{state | incoming: [entry | incoming]}}
end
# pop currently stored lines
@impl true
def handle_call({:fetch}, _from, state) do
%{incoming: lines} = state
{:reply, {:incoming, Enum.reverse(lines)}, %{state | incoming: []}}
end
end
Choose serial port:
kino_device = Kino.Input.select("Device name", uarts |> Enum.map(fn {k, _} -> {k, k} end))
Kino.Layout.grid([kino_device])
{device} = {
Kino.Input.read(kino_device)
}
Automatic Discovery
Define a dynamic supervisor for all Receiver
GenServers.
defmodule ReceiverSupervisor do
use DynamicSupervisor
def start_link(init_arg) do
DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end
def ensure(device, speed) do
case Receiver.started?(device) do
false ->
DynamicSupervisor.start_child(
__MODULE__,
Receiver.child_spec(device, speed)
)
true
_ ->
false
end
end
# callbacks
@impl true
def init(_init_arg) do
DynamicSupervisor.init(strategy: :one_for_one)
end
end
defmodule Discovery do
use GenServer
@sleeptime 1000 * 10
# interface
def start_link(_) do
GenServer.start(Discovery, {})
end
# callbacks
def init(_) do
Process.send_after(self(), :scan, 0)
{:ok, %{}}
end
def handle_info(:scan, state) do
uarts = Circuits.UART.enumerate()
for {device, description} <- uarts do
{match, speed} =
case description do
%{description: "CP2102N USB to UART Bridge Controller", manufacturer: "Silicon Labs"} ->
{true, 115_200}
_ ->
{false, nil}
end
if match do
ReceiverSupervisor.ensure(device, speed)
end
end
Process.send_after(self(), :scan, @sleeptime)
{:noreply, state}
end
end
Define a supervisor to keep ReceiverSupervisor
, our Receiver
registry and our Discovery
service supervised.
defmodule RootSupervisor do
use Supervisor
def start_link(init_arg) do
Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end
@impl true
def init(_init_arg) do
children = [
{ReceiverSupervisor, nil},
{Registry, [name: :receiver_registry, keys: :unique]},
{Discovery, nil}
]
Supervisor.init(children, strategy: :one_for_one)
end
end
Start it up:
{:ok, rootsupervisor_pid} = RootSupervisor.start_link(nil)
rootsupervisor_pid
Processing Suite
alias VegaLite, as: Vl
Code for 2d histogram illustrated as heatmap:
live_heatmap = fn ->
interval = 500
kino =
Vl.new(width: 600, height: 512)
|> Vl.transform(
filter: [
and: [
[field: "t", valid: true],
[field: "value", valid: true]
]
]
)
|> Vl.mark(:rect)
|> Vl.encode_field(:x, "t", type: :quantitative, bin: [maxbins: 16])
|> Vl.encode_field(:y, "value",
type: :quantitative,
bin: [maxbins: 512],
scale: [domain: [0, 4096]],
axis: [values: [0, 1024, 2048, 3072, 4096]]
)
|> Vl.encode(:color, aggregate: :count)
|> Vl.config(view: [stroke: nil])
|> Kino.VegaLite.new()
|> Kino.render()
fun = fn _, _ ->
entries = Receiver.fetch(device)
Kino.VegaLite.push_many(kino, entries, window: 1000)
{:cont, nil}
end
Kino.listen(interval, nil, fun)
end
Code for Live running average:
live_running = fn ->
interval = 200
kino =
Vl.new(width: 400, height: 100)
|> Vl.mark(:line)
|> Vl.encode_field(:x, "t", type: :temporal, title: "Time")
|> Vl.encode_field(:y, "value", type: :quantitative, title: "Value", scale: [zero: false])
|> Vl.encode_field(:color, "type", type: :nominal)
|> Kino.VegaLite.new()
|> Kino.render()
# calculate average of list
avg = fn es ->
{sum, count} = Enum.reduce(es, {0, 0}, fn e, {sum, count} -> {sum + e[:value], count + 1} end)
sum / count
end
# execute window
run_window = fn
[], window, _self ->
{[], window}
[head | tail], window, self ->
t = head[:t]
value = head[:value]
head_window = [head] ++ window
head_entries = [
%{t: t, type: "raw", value: value},
%{t: t, type: "avg(3)", value: avg.(Enum.slice(head_window, 0..2))},
# %{t: t, type: "avg(5)", value: avg.(Enum.slice(head_window, 0..4))},
%{t: t, type: "avg(7)", value: avg.(Enum.slice(head_window, 0..6))}
]
new_window = Enum.slice(head_window, 0..6)
{tail_entries, tail_window} = self.(tail, new_window, self)
{head_entries ++ tail_entries, tail_window}
end
fun = fn _, window ->
entries = Receiver.fetch(device)
{new_entries, new_window} = run_window.(entries, window, run_window)
Kino.VegaLite.push_many(kino, new_entries, window: 1000)
{:cont, new_window}
end
Kino.listen(interval, [], fun)
end
Code for density:
live_density = fn ->
interval = 200
kino =
Vl.new(width: 400, height: 300)
|> Vl.transform(density: "value", bandwidth: 0.5)
|> Vl.mark(:line)
|> Vl.encode_field(:x, "value", type: :quantitative, title: "Value")
|> Vl.encode_field(:y, "density", type: :quantitative)
|> Kino.VegaLite.new()
|> Kino.render()
fun = fn _, nil ->
entries = Receiver.fetch(device)
Kino.VegaLite.push_many(kino, entries, window: 1000)
{:cont, nil}
end
Kino.listen(interval, nil, fun)
end
Code for appending to file (append
interface function not in use):
defmodule SerialToFile do
use GenServer
@sleeptime 1000 * 2
# interface
def start_link(filename, device) do
GenServer.start_link(__MODULE__, {filename, device})
end
def append(pid, entries) do
GenServer.cast(pid, {:append, entries})
end
# callbacks
@impl true
def init({filename, device}) do
Process.send_after(self(), :scan, @sleeptime)
{:ok, file} = File.open(filename, [:append])
{:ok, {file, device}}
end
@impl true
def handle_cast({:entries, entries}, {file, device}) do
write(file, entries)
{:noreply, {file, device}}
end
@impl true
def handle_info(:scan, {file, device}) do
entries = Receiver.fetch(device)
write(file, entries)
Process.send_after(self(), :scan, @sleeptime)
{:noreply, {file, device}}
end
# helpers
defp write(_file, []) do
nil
end
defp write(file, [first | remaining]) do
IO.write(file, "#{first[:t] / 1000},#{first[:value]}\n")
write(file, remaining)
end
end
output_filename = "output_filename.csv"
append_to_file = fn ->
{:ok, pid} = SerialToFile.start_link(output_filename, device)
pid
end
processing_suite = [
%{label: "Do nothing", id: :nothing, fun: fn -> nil end},
%{label: "Live Heatmap visualization", id: :live_heatmap, fun: live_heatmap},
%{label: "Live running average visualization", id: :live_running, fun: live_running},
%{label: "Live density plot visualization", id: :live_density, fun: live_density},
%{label: "Append stream to file", id: :append2file, fun: append_to_file}
]
Processing
kino_choices =
processing_suite
|> Enum.map(fn %{label: label, id: _id, fun: fun} -> {fun, label} end)
input_kino =
Kino.Input.select("Chose your action:", kino_choices)
viz = Kino.Input.read(input_kino)
result = viz.()