Notesclub

Analog Replay

analog_replay.livemd

Analog Replay

Mix.install([
  {:circuits_uart, "~> 1.3"},
  {:vega_lite, "~> 0.1.5"},
  {:kino_vega_lite, "~> 0.1.7"},
  {:kino, "~> 0.8.0"}
])

Discovery

kino_filename = Kino.Input.text("Filename", default: "/data/data/sample_static.csv")
filename = Kino.Input.read(kino_filename)
File.exists?(filename)

Historical Replayer

defmodule Receiver do
  use GenServer

  # interface

  # def start_link(filename) do
  #  GenServer.start_link(__MODULE__, {filename})
  # end

  def start_link(filename) do
    GenServer.start(Receiver, {filename}, name: via_tuple(filename))
  end

  def fetch(filename) do
    {:incoming, entries} = GenServer.call(via_tuple(filename), {:fetch})
    entries
  end

  def started?(filename) do
    case Registry.lookup(:receiver_registry, {__MODULE__, filename}) do
      [] -> false
      _ -> true
    end
  end

  def child_spec(filename) do
    %{
      id: __MODULE__,
      start: {__MODULE__, :start_link, [filename]},
      type: :worker,
      restart: :permanent,
      shutdown: 500
    }
  end

  # callbacks

  @impl true
  def init({filename}) do
    # {:ok, file} = File.open(filename, [])
    entries =
      filename
      |> File.stream!()
      |> Stream.map(&String.trim_trailing/1)
      |> Stream.map(fn line -> String.split(line, ",") end)
      |> Stream.map(fn [t | [value | _]] ->
        {elem(Float.parse(t), 0) * 1000, elem(Integer.parse(value), 0)}
      end)
      |> Enum.into([])

    [{tfirst, _} | _] = entries
    tnow = :os.system_time(:milli_seconds)
    offset = tnow - tfirst

    register_timeout(offset, entries)

    {:ok, {entries, offset, []}}
  end

  @impl true
  def handle_info(:scan, {incoming, offset, outgoing}) do
    t = :os.system_time(:milli_seconds) - offset
    {beforet, aftert} = timesplit(t, incoming, outgoing)
    register_timeout(offset, incoming)
    {:noreply, {aftert, offset, beforet}}
  end

  @impl true
  def handle_call({:fetch}, _from, {incoming, offset, outgoing}) do
    {:reply, {:incoming, Enum.reverse(outgoing)}, {incoming, offset, []}}
  end

  # helpers

  defp via_tuple(device) do
    {:via, Registry, {:receiver_registry, {__MODULE__, device}}}
  end

  defp register_timeout(offset, incoming) do
    [{t, _} | _] = incoming

    sleeptime =
      case Kernel.trunc(:os.system_time(:milli_seconds) - t - offset) do
        t when t > 0 ->
          t

        _ ->
          1
      end

    Process.send_after(self(), :scan, sleeptime)
  end

  defp timesplit(threshold, [{t, value} | tail], before) when threshold > t do
    timesplit(Kernel.trunc(threshold), tail, [%{t: t / 1000, value: value}] ++ before)
  end

  defp timesplit(_, l, before) do
    {before, l}
  end
end
{:ok, registry_pid} = Registry.start_link(name: :receiver_registry, keys: :unique)
{:ok, receiver_pid} = Receiver.start_link(filename)
l = Receiver.fetch(filename)

Automatic Discovery

We are skipping automatic discovery in this livebook.

Visualization

alias VegaLite, as: Vl

Code for 2d histogram illustrated as heatmap:

live_heatmap = fn ->
  interval = 500

  kino =
    Vl.new(width: 600, height: 512)
    |> Vl.transform(
      filter: [
        and: [
          [field: "t", valid: true],
          [field: "value", valid: true]
        ]
      ]
    )
    |> Vl.mark(:rect)
    |> Vl.encode_field(:x, "t", type: :quantitative, bin: [maxbins: 16])
    |> Vl.encode_field(:y, "value",
      type: :quantitative,
      bin: [maxbins: 512],
      scale: [domain: [0, 4096]],
      axis: [values: [0, 1024, 2048, 3072, 4096]]
    )
    |> Vl.encode(:color, aggregate: :count)
    |> Vl.config(view: [stroke: nil])
    |> Kino.VegaLite.new()
    |> Kino.render()

  fun = fn x ->
    entries = Receiver.fetch(filename)
    # IO.puts(entries)
    Kino.VegaLite.push_many(kino, entries, window: 1000)
    {:cont, x + 0.1}
  end

  Kino.VegaLite.periodically(kino, interval, 0, fun)
end

Code for Live running average:

live_running = fn ->
  interval = 200

  kino =
    Vl.new(width: 400, height: 100)
    |> Vl.mark(:line)
    |> Vl.encode_field(:x, "t", type: :temporal, title: "Time")
    |> Vl.encode_field(:y, "value", type: :quantitative, title: "Value", scale: [zero: false])
    |> Vl.encode_field(:color, "type", type: :nominal)
    |> Kino.VegaLite.new()
    |> Kino.render()

  # calculate average of list
  avg = fn es ->
    {sum, count} = Enum.reduce(es, {0, 0}, fn e, {sum, count} -> {sum + e[:value], count + 1} end)
    sum / count
  end

  # execute window
  run_window = fn
    [], window, _self ->
      {[], window}

    [head | tail], window, self ->
      t = head[:t]
      value = head[:value]
      head_window = [head] ++ window

      head_entries = [
        %{t: t, type: "raw", value: value},
        %{t: t, type: "avg(3)", value: avg.(Enum.slice(head_window, 0..2))},
        # %{t: t, type: "avg(5)", value: avg.(Enum.slice(head_window, 0..4))},
        %{t: t, type: "avg(7)", value: avg.(Enum.slice(head_window, 0..6))}
      ]

      new_window = Enum.slice(head_window, 0..6)
      {tail_entries, tail_window} = self.(tail, new_window, self)
      {head_entries ++ tail_entries, tail_window}
  end

  fun = fn window ->
    entries = Receiver.fetch(filename)
    {new_entries, new_window} = run_window.(entries, window, run_window)
    # IO.puts("#{new_entries}")
    Kino.VegaLite.push_many(kino, new_entries, window: 1000)
    {:cont, new_window}
  end

  Kino.VegaLite.periodically(kino, interval, [], fun)
end

Code for density:

live_density = fn ->
  interval = 200

  kino =
    Vl.new(width: 400, height: 300)
    |> Vl.transform(density: "value", bandwidth: 0.5)
    |> Vl.mark(:line)
    |> Vl.encode_field(:x, "value", type: :quantitative, title: "Value")
    |> Vl.encode_field(:y, "density", type: :quantitative)
    |> Kino.VegaLite.new()
    |> Kino.render()

  fun = fn x ->
    entries = Receiver.fetch(filename)
    Kino.VegaLite.push_many(kino, entries, window: 1000)
    {:cont, x + 0.1}
  end

  Kino.VegaLite.periodically(kino, interval, 0, fun)
end

Code for appending to file (append interface function not in use):

defmodule SerialToFile do
  use GenServer

  @sleeptime 1000 * 2

  # interface

  def start_link(filename, device) do
    GenServer.start_link(__MODULE__, {filename, device})
  end

  def append(pid, entries) do
    GenServer.cast(pid, {:append, entries})
  end

  # callbacks

  @impl true
  def init({filename, device}) do
    Process.send_after(self(), :scan, @sleeptime)
    {:ok, file} = File.open(filename, [:append])
    {:ok, {file, device}}
  end

  @impl true
  def handle_cast({:entries, entries}, {file, device}) do
    write(file, entries)
    {:noreply, {file, device}}
  end

  @impl true
  def handle_info(:scan, {file, device}) do
    entries = Receiver.fetch(device)
    write(file, entries)
    Process.send_after(self(), :scan, @sleeptime)
    {:noreply, {file, device}}
  end

  # helpers

  defp write(_file, []) do
    nil
  end

  defp write(file, [first | remaining]) do
    IO.write(file, "#{first[:t] / 1000},#{first[:value]}\n")
    write(file, remaining)
  end
end
output_filename = "output_filename.csv"

append_to_file = fn ->
  {:ok, pid} = SerialToFile.start_link(output_filename, filename)

  pid
end

Choose your action:

  1. Do nothing
  2. Live heatmap visualization
  3. Live running average visualization
  4. Live density plot visualization
  5. Append stream to file
kino_viz = Kino.Input.text("Visualization choice", default: "4")

Activate chosen visualization:

vizualizations = [
  fn -> "Choose a diffent number" end,
  live_heatmap,
  live_running,
  live_density,
  append_to_file
]

viz =
  kino_viz
  |> Kino.Input.read()
  |> String.to_integer()
  |> (fn choice -> Enum.at(vizualizations, choice - 1) end).()

result = viz.()