Notesclub

Serial

analog.livemd

Serial

Mix.install([
  {:circuits_uart, "~> 1.3"},
  {:vega_lite, "~> 0.1.5"},
  {:kino_vega_lite, "~> 0.1.7"},
  {:kino, "~> 0.8.0"}
])

Discovery

uarts = Circuits.UART.enumerate()

Reception

Define GenServer to handle connection to a single serial port.

defmodule Receiver do
  use GenServer

  # interface

  def start_link(device, speed) do
    state = %{device: device, speed: speed, incoming: []}
    GenServer.start(Receiver, state, name: via_tuple(device))
  end

  def fetch(device) do
    {:incoming, entries} = GenServer.call(via_tuple(device), {:fetch})
    entries
  end

  def started?(device) do
    case Registry.lookup(:receiver_registry, {__MODULE__, device}) do
      [] -> false
      _ -> true
    end
  end

  def child_spec(device, speed) do
    %{
      id: __MODULE__,
      start: {__MODULE__, :start_link, [device, speed]},
      type: :worker,
      restart: :permanent,
      shutdown: 500
    }
  end

  # helpers

  defp via_tuple(device) do
    {:via, Registry, {:receiver_registry, {__MODULE__, device}}}
  end

  # callbacks

  @impl true
  def init(state) do
    %{device: device, speed: speed} = state

    {:ok, uart_pid} = Circuits.UART.start_link()

    :ok =
      Circuits.UART.open(uart_pid, device,
        speed: speed,
        active: true,
        framing: Circuits.UART.Framing.Line
      )

    {:ok, Map.put(state, :pid, uart_pid)}
  end

  # receive line
  @impl true
  def handle_info({:circuits_uart, _, payload}, state) do
    %{incoming: incoming} = state
    # IO.puts("Message: " <> payload)
    entry = %{
      t: :os.system_time(:milli_seconds),
      value: String.to_integer(String.trim_trailing(payload))
    }

    {:noreply, %{state | incoming: [entry | incoming]}}
  end

  # pop currently stored lines
  @impl true
  def handle_call({:fetch}, _from, state) do
    %{incoming: lines} = state
    {:reply, {:incoming, Enum.reverse(lines)}, %{state | incoming: []}}
  end
end

Choose serial port and baud rate:

kino_device = Kino.Input.text("Device name", default: "ttyUSB0")
kino_speed = Kino.Input.text("Device speed", default: "115200")
{device, speed} = {
  Kino.Input.read(kino_device),
  String.to_integer(Kino.Input.read(kino_speed))
}

Automatic Discovery

Define a dynamic supervisor for all Receiver GenServers.

defmodule ReceiverSupervisor do
  use DynamicSupervisor

  def start_link(init_arg) do
    DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
  end

  def ensure(device, speed) do
    case Receiver.started?(device) do
      false ->
        DynamicSupervisor.start_child(
          __MODULE__,
          Receiver.child_spec(device, speed)
        )

        true

      _ ->
        false
    end
  end

  # callbacks

  @impl true
  def init(_init_arg) do
    DynamicSupervisor.init(strategy: :one_for_one)
  end
end
defmodule Discovery do
  use GenServer

  @sleeptime 1000 * 10

  # interface

  def start_link(_) do
    GenServer.start(Discovery, {})
  end

  # callbacks

  def init(_) do
    Process.send_after(self(), :scan, 0)
    {:ok, %{}}
  end

  def handle_info(:scan, state) do
    uarts = Circuits.UART.enumerate()

    for {device, description} <- uarts do
      {match, speed} =
        case description do
          %{description: "CP2102N USB to UART Bridge Controller", manufacturer: "Silicon Labs"} ->
            {true, 115_200}

          _ ->
            {false, nil}
        end

      if match do
        ReceiverSupervisor.ensure(device, speed)
      end
    end

    Process.send_after(self(), :scan, @sleeptime)
    {:noreply, state}
  end
end

Define a supervisor to keep ReceiverSupervisor, our Receiver registry and our Discovery service supervised.

defmodule RootSupervisor do
  use Supervisor

  def start_link(init_arg) do
    Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
  end

  @impl true
  def init(_init_arg) do
    children = [
      {ReceiverSupervisor, nil},
      {Registry, [name: :receiver_registry, keys: :unique]},
      {Discovery, nil}
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end
end

Start it up:

{:ok, rootsupervisor_pid} = RootSupervisor.start_link(nil)
rootsupervisor_pid

Visualization

alias VegaLite, as: Vl

Code for 2d histogram illustrated as heatmap:

live_heatmap = fn ->
  interval = 500

  kino =
    Vl.new(width: 600, height: 512)
    |> Vl.transform(
      filter: [
        and: [
          [field: "t", valid: true],
          [field: "value", valid: true]
        ]
      ]
    )
    |> Vl.mark(:rect)
    |> Vl.encode_field(:x, "t", type: :quantitative, bin: [maxbins: 16])
    |> Vl.encode_field(:y, "value",
      type: :quantitative,
      bin: [maxbins: 512],
      scale: [domain: [0, 4096]],
      axis: [values: [0, 1024, 2048, 3072, 4096]]
    )
    |> Vl.encode(:color, aggregate: :count)
    |> Vl.config(view: [stroke: nil])
    |> Kino.VegaLite.new()
    |> Kino.render()

  fun = fn x ->
    entries = Receiver.fetch(device)
    # IO.puts(entries)
    Kino.VegaLite.push_many(kino, entries, window: 1000)
    {:cont, x + 0.1}
  end

  Kino.VegaLite.periodically(kino, interval, 0, fun)
end

Code for Live running average:

live_running = fn ->
  interval = 200

  kino =
    Vl.new(width: 400, height: 100)
    |> Vl.mark(:line)
    |> Vl.encode_field(:x, "t", type: :temporal, title: "Time")
    |> Vl.encode_field(:y, "value", type: :quantitative, title: "Value", scale: [zero: false])
    |> Vl.encode_field(:color, "type", type: :nominal)
    |> Kino.VegaLite.new()
    |> Kino.render()

  # calculate average of list
  avg = fn es ->
    {sum, count} = Enum.reduce(es, {0, 0}, fn e, {sum, count} -> {sum + e[:value], count + 1} end)
    sum / count
  end

  # execute window
  run_window = fn
    [], window, _self ->
      {[], window}

    [head | tail], window, self ->
      t = head[:t]
      value = head[:value]
      head_window = [head] ++ window

      head_entries = [
        %{t: t, type: "raw", value: value},
        %{t: t, type: "avg(3)", value: avg.(Enum.slice(head_window, 0..2))},
        # %{t: t, type: "avg(5)", value: avg.(Enum.slice(head_window, 0..4))},
        %{t: t, type: "avg(7)", value: avg.(Enum.slice(head_window, 0..6))}
      ]

      new_window = Enum.slice(head_window, 0..6)
      {tail_entries, tail_window} = self.(tail, new_window, self)
      {head_entries ++ tail_entries, tail_window}
  end

  fun = fn window ->
    entries = Receiver.fetch(device)
    {new_entries, new_window} = run_window.(entries, window, run_window)
    # IO.puts("#{new_entries}")
    Kino.VegaLite.push_many(kino, new_entries, window: 1000)
    {:cont, new_window}
  end

  Kino.VegaLite.periodically(kino, interval, [], fun)
end

Code for density:

live_density = fn ->
  interval = 200

  kino =
    Vl.new(width: 400, height: 300)
    |> Vl.transform(density: "value", bandwidth: 0.5)
    |> Vl.mark(:line)
    |> Vl.encode_field(:x, "value", type: :quantitative, title: "Value")
    |> Vl.encode_field(:y, "density", type: :quantitative)
    |> Kino.VegaLite.new()
    |> Kino.render()

  fun = fn x ->
    entries = Receiver.fetch(device)
    Kino.VegaLite.push_many(kino, entries, window: 1000)
    {:cont, x + 0.1}
  end

  Kino.VegaLite.periodically(kino, interval, 0, fun)
end

Code for appending to file (append interface function not in use):

defmodule SerialToFile do
  use GenServer

  @sleeptime 1000 * 2

  # interface

  def start_link(filename, device) do
    GenServer.start_link(__MODULE__, {filename, device})
  end

  def append(pid, entries) do
    GenServer.cast(pid, {:append, entries})
  end

  # callbacks

  @impl true
  def init({filename, device}) do
    Process.send_after(self(), :scan, @sleeptime)
    {:ok, file} = File.open(filename, [:append])
    {:ok, {file, device}}
  end

  @impl true
  def handle_cast({:entries, entries}, {file, device}) do
    write(file, entries)
    {:noreply, {file, device}}
  end

  @impl true
  def handle_info(:scan, {file, device}) do
    entries = Receiver.fetch(device)
    write(file, entries)
    Process.send_after(self(), :scan, @sleeptime)
    {:noreply, {file, device}}
  end

  # helpers

  defp write(_file, []) do
    nil
  end

  defp write(file, [first | remaining]) do
    IO.write(file, "#{first[:t] / 1000},#{first[:value]}\n")
    write(file, remaining)
  end
end
output_filename = "output_filename.csv"

append_to_file = fn ->
  {:ok, pid} = SerialToFile.start_link(output_filename, device)

  pid
end

Choose your action:

  1. Do nothing
  2. Live heatmap visualization
  3. Live running average visualization
  4. Live density plot visualization
  5. Append stream to file
kino_viz = Kino.Input.text("Visualization choice", default: "2")

Activate chosen visualization:

vizualizations = [
  fn -> "Choose a diffent number" end,
  live_heatmap,
  live_running,
  live_density,
  append_to_file
]

viz =
  kino_viz
  |> Kino.Input.read()
  |> String.to_integer()
  |> (fn choice -> Enum.at(vizualizations, choice - 1) end).()

result = viz.()