Analog Replay
Mix.install([
{:circuits_uart, "~> 1.3"},
{:vega_lite, "~> 0.1.5"},
{:kino_vega_lite, "~> 0.1.7"},
{:kino, "~> 0.8.0"}
])
Discovery
kino_filename = Kino.Input.text("Filename", default: "/data/data/sample_static.csv")
filename = Kino.Input.read(kino_filename)
File.exists?(filename)
Historical Replayer
defmodule Receiver do
use GenServer
# interface
# def start_link(filename) do
# GenServer.start_link(__MODULE__, {filename})
# end
def start_link(filename) do
GenServer.start(Receiver, {filename}, name: via_tuple(filename))
end
def fetch(filename) do
{:incoming, entries} = GenServer.call(via_tuple(filename), {:fetch})
entries
end
def started?(filename) do
case Registry.lookup(:receiver_registry, {__MODULE__, filename}) do
[] -> false
_ -> true
end
end
def child_spec(filename) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [filename]},
type: :worker,
restart: :permanent,
shutdown: 500
}
end
# callbacks
@impl true
def init({filename}) do
# {:ok, file} = File.open(filename, [])
entries =
filename
|> File.stream!()
|> Stream.map(&String.trim_trailing/1)
|> Stream.map(fn line -> String.split(line, ",") end)
|> Stream.map(fn [t | [value | _]] ->
{elem(Float.parse(t), 0) * 1000, elem(Integer.parse(value), 0)}
end)
|> Enum.into([])
[{tfirst, _} | _] = entries
tnow = :os.system_time(:milli_seconds)
offset = tnow - tfirst
register_timeout(offset, entries)
{:ok, {entries, offset, []}}
end
@impl true
def handle_info(:scan, {incoming, offset, outgoing}) do
t = :os.system_time(:milli_seconds) - offset
{beforet, aftert} = timesplit(t, incoming, outgoing)
register_timeout(offset, incoming)
{:noreply, {aftert, offset, beforet}}
end
@impl true
def handle_call({:fetch}, _from, {incoming, offset, outgoing}) do
{:reply, {:incoming, Enum.reverse(outgoing)}, {incoming, offset, []}}
end
# helpers
defp via_tuple(device) do
{:via, Registry, {:receiver_registry, {__MODULE__, device}}}
end
defp register_timeout(offset, incoming) do
[{t, _} | _] = incoming
sleeptime =
case Kernel.trunc(:os.system_time(:milli_seconds) - t - offset) do
t when t > 0 ->
t
_ ->
1
end
Process.send_after(self(), :scan, sleeptime)
end
defp timesplit(threshold, [{t, value} | tail], before) when threshold > t do
timesplit(Kernel.trunc(threshold), tail, [%{t: t / 1000, value: value}] ++ before)
end
defp timesplit(_, l, before) do
{before, l}
end
end
{:ok, registry_pid} = Registry.start_link(name: :receiver_registry, keys: :unique)
{:ok, receiver_pid} = Receiver.start_link(filename)
l = Receiver.fetch(filename)
Automatic Discovery
We are skipping automatic discovery in this livebook.
Visualization
alias VegaLite, as: Vl
Code for 2d histogram illustrated as heatmap:
live_heatmap = fn ->
interval = 500
kino =
Vl.new(width: 600, height: 512)
|> Vl.transform(
filter: [
and: [
[field: "t", valid: true],
[field: "value", valid: true]
]
]
)
|> Vl.mark(:rect)
|> Vl.encode_field(:x, "t", type: :quantitative, bin: [maxbins: 16])
|> Vl.encode_field(:y, "value",
type: :quantitative,
bin: [maxbins: 512],
scale: [domain: [0, 4096]],
axis: [values: [0, 1024, 2048, 3072, 4096]]
)
|> Vl.encode(:color, aggregate: :count)
|> Vl.config(view: [stroke: nil])
|> Kino.VegaLite.new()
|> Kino.render()
fun = fn x ->
entries = Receiver.fetch(filename)
# IO.puts(entries)
Kino.VegaLite.push_many(kino, entries, window: 1000)
{:cont, x + 0.1}
end
Kino.VegaLite.periodically(kino, interval, 0, fun)
end
Code for Live running average:
live_running = fn ->
interval = 200
kino =
Vl.new(width: 400, height: 100)
|> Vl.mark(:line)
|> Vl.encode_field(:x, "t", type: :temporal, title: "Time")
|> Vl.encode_field(:y, "value", type: :quantitative, title: "Value", scale: [zero: false])
|> Vl.encode_field(:color, "type", type: :nominal)
|> Kino.VegaLite.new()
|> Kino.render()
# calculate average of list
avg = fn es ->
{sum, count} = Enum.reduce(es, {0, 0}, fn e, {sum, count} -> {sum + e[:value], count + 1} end)
sum / count
end
# execute window
run_window = fn
[], window, _self ->
{[], window}
[head | tail], window, self ->
t = head[:t]
value = head[:value]
head_window = [head] ++ window
head_entries = [
%{t: t, type: "raw", value: value},
%{t: t, type: "avg(3)", value: avg.(Enum.slice(head_window, 0..2))},
# %{t: t, type: "avg(5)", value: avg.(Enum.slice(head_window, 0..4))},
%{t: t, type: "avg(7)", value: avg.(Enum.slice(head_window, 0..6))}
]
new_window = Enum.slice(head_window, 0..6)
{tail_entries, tail_window} = self.(tail, new_window, self)
{head_entries ++ tail_entries, tail_window}
end
fun = fn window ->
entries = Receiver.fetch(filename)
{new_entries, new_window} = run_window.(entries, window, run_window)
# IO.puts("#{new_entries}")
Kino.VegaLite.push_many(kino, new_entries, window: 1000)
{:cont, new_window}
end
Kino.VegaLite.periodically(kino, interval, [], fun)
end
Code for density:
live_density = fn ->
interval = 200
kino =
Vl.new(width: 400, height: 300)
|> Vl.transform(density: "value", bandwidth: 0.5)
|> Vl.mark(:line)
|> Vl.encode_field(:x, "value", type: :quantitative, title: "Value")
|> Vl.encode_field(:y, "density", type: :quantitative)
|> Kino.VegaLite.new()
|> Kino.render()
fun = fn x ->
entries = Receiver.fetch(filename)
Kino.VegaLite.push_many(kino, entries, window: 1000)
{:cont, x + 0.1}
end
Kino.VegaLite.periodically(kino, interval, 0, fun)
end
Code for appending to file (append
interface function not in use):
defmodule SerialToFile do
use GenServer
@sleeptime 1000 * 2
# interface
def start_link(filename, device) do
GenServer.start_link(__MODULE__, {filename, device})
end
def append(pid, entries) do
GenServer.cast(pid, {:append, entries})
end
# callbacks
@impl true
def init({filename, device}) do
Process.send_after(self(), :scan, @sleeptime)
{:ok, file} = File.open(filename, [:append])
{:ok, {file, device}}
end
@impl true
def handle_cast({:entries, entries}, {file, device}) do
write(file, entries)
{:noreply, {file, device}}
end
@impl true
def handle_info(:scan, {file, device}) do
entries = Receiver.fetch(device)
write(file, entries)
Process.send_after(self(), :scan, @sleeptime)
{:noreply, {file, device}}
end
# helpers
defp write(_file, []) do
nil
end
defp write(file, [first | remaining]) do
IO.write(file, "#{first[:t] / 1000},#{first[:value]}\n")
write(file, remaining)
end
end
output_filename = "output_filename.csv"
append_to_file = fn ->
{:ok, pid} = SerialToFile.start_link(output_filename, filename)
pid
end
Choose your action:
- Do nothing
- Live heatmap visualization
- Live running average visualization
- Live density plot visualization
- Append stream to file
kino_viz = Kino.Input.text("Visualization choice", default: "4")
Activate chosen visualization:
vizualizations = [
fn -> "Choose a diffent number" end,
live_heatmap,
live_running,
live_density,
append_to_file
]
viz =
kino_viz
|> Kino.Input.read()
|> String.to_integer()
|> (fn choice -> Enum.at(vizualizations, choice - 1) end).()
result = viz.()