Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Serial Processing

serial-processing.livemd

Serial Processing

Mix.install([
  {:circuits_uart, "~> 1.3"},
  {:vega_lite, "~> 0.1.5"},
  {:kino_vega_lite, "~> 0.1.7"},
  {:kino, "~> 0.8.0"}
])

Introduction

This notebook connects to a serial port and does live processing of the lines received over it (assuming that numbers are transmitted). The following kinds of processing are supported:

  • Live plotting of running average.
  • Live plotting of a heatmap of time and value axes.
  • Live density plot over a window of values.
  • Timestamping and logging of each line to a file.

Note: This notebook is designed for ESP32-DEVKIT-C devices, and assumes a bitrate of 115200 baud. This can be changed in the Receiver and/or Discovery modules.

Discovery

uarts = Circuits.UART.enumerate()

Reception

Define GenServer to handle connection to a single serial port.

defmodule Receiver do
  use GenServer

  # interface

  def start_link(device, speed) do
    state = %{device: device, speed: speed, incoming: []}
    GenServer.start(Receiver, state, name: via_tuple(device))
  end

  def fetch(device) do
    {:incoming, entries} = GenServer.call(via_tuple(device), {:fetch})
    entries
  end

  def started?(device) do
    case Registry.lookup(:receiver_registry, {__MODULE__, device}) do
      [] -> false
      _ -> true
    end
  end

  def child_spec(device, speed) do
    %{
      id: __MODULE__,
      start: {__MODULE__, :start_link, [device, speed]},
      type: :worker,
      restart: :permanent,
      shutdown: 500
    }
  end

  # helpers

  defp via_tuple(device) do
    {:via, Registry, {:receiver_registry, {__MODULE__, device}}}
  end

  # callbacks

  @impl true
  def init(state) do
    %{device: device, speed: speed} = state

    {:ok, uart_pid} = Circuits.UART.start_link()

    :ok =
      Circuits.UART.open(uart_pid, device,
        speed: speed,
        active: true,
        framing: Circuits.UART.Framing.Line
      )

    {:ok, Map.put(state, :pid, uart_pid)}
  end

  # receive line
  @impl true
  def handle_info({:circuits_uart, _, payload}, state) do
    %{incoming: incoming} = state
    # IO.puts("Message: " <> payload)
    entry = %{
      t: :os.system_time(:milli_seconds),
      value: String.to_integer(String.trim_trailing(payload))
    }

    {:noreply, %{state | incoming: [entry | incoming]}}
  end

  # pop currently stored lines
  @impl true
  def handle_call({:fetch}, _from, state) do
    %{incoming: lines} = state
    {:reply, {:incoming, Enum.reverse(lines)}, %{state | incoming: []}}
  end
end

Choose serial port:

kino_device = Kino.Input.select("Device name", uarts |> Enum.map(fn {k, _} -> {k, k} end))

Kino.Layout.grid([kino_device])
{device} = {
  Kino.Input.read(kino_device)
}

Automatic Discovery

Define a dynamic supervisor for all Receiver GenServers.

defmodule ReceiverSupervisor do
  use DynamicSupervisor

  def start_link(init_arg) do
    DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
  end

  def ensure(device, speed) do
    case Receiver.started?(device) do
      false ->
        DynamicSupervisor.start_child(
          __MODULE__,
          Receiver.child_spec(device, speed)
        )

        true

      _ ->
        false
    end
  end

  # callbacks

  @impl true
  def init(_init_arg) do
    DynamicSupervisor.init(strategy: :one_for_one)
  end
end
defmodule Discovery do
  use GenServer

  @sleeptime 1000 * 10

  # interface

  def start_link(_) do
    GenServer.start(Discovery, {})
  end

  # callbacks

  def init(_) do
    Process.send_after(self(), :scan, 0)
    {:ok, %{}}
  end

  def handle_info(:scan, state) do
    uarts = Circuits.UART.enumerate()

    for {device, description} <- uarts do
      {match, speed} =
        case description do
          %{description: "CP2102N USB to UART Bridge Controller", manufacturer: "Silicon Labs"} ->
            {true, 115_200}

          _ ->
            {false, nil}
        end

      if match do
        ReceiverSupervisor.ensure(device, speed)
      end
    end

    Process.send_after(self(), :scan, @sleeptime)
    {:noreply, state}
  end
end

Define a supervisor to keep ReceiverSupervisor, our Receiver registry and our Discovery service supervised.

defmodule RootSupervisor do
  use Supervisor

  def start_link(init_arg) do
    Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
  end

  @impl true
  def init(_init_arg) do
    children = [
      {ReceiverSupervisor, nil},
      {Registry, [name: :receiver_registry, keys: :unique]},
      {Discovery, nil}
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end
end

Start it up:

{:ok, rootsupervisor_pid} = RootSupervisor.start_link(nil)
rootsupervisor_pid

Processing Suite

alias VegaLite, as: Vl

Code for 2d histogram illustrated as heatmap:

live_heatmap = fn ->
  interval = 500

  kino =
    Vl.new(width: 600, height: 512)
    |> Vl.transform(
      filter: [
        and: [
          [field: "t", valid: true],
          [field: "value", valid: true]
        ]
      ]
    )
    |> Vl.mark(:rect)
    |> Vl.encode_field(:x, "t", type: :quantitative, bin: [maxbins: 16])
    |> Vl.encode_field(:y, "value",
      type: :quantitative,
      bin: [maxbins: 512],
      scale: [domain: [0, 4096]],
      axis: [values: [0, 1024, 2048, 3072, 4096]]
    )
    |> Vl.encode(:color, aggregate: :count)
    |> Vl.config(view: [stroke: nil])
    |> Kino.VegaLite.new()
    |> Kino.render()

  fun = fn _, _ ->
    entries = Receiver.fetch(device)
    Kino.VegaLite.push_many(kino, entries, window: 1000)
    {:cont, nil}
  end

  Kino.listen(interval, nil, fun)
end

Code for Live running average:

live_running = fn ->
  interval = 200

  kino =
    Vl.new(width: 400, height: 100)
    |> Vl.mark(:line)
    |> Vl.encode_field(:x, "t", type: :temporal, title: "Time")
    |> Vl.encode_field(:y, "value", type: :quantitative, title: "Value", scale: [zero: false])
    |> Vl.encode_field(:color, "type", type: :nominal)
    |> Kino.VegaLite.new()
    |> Kino.render()

  # calculate average of list
  avg = fn es ->
    {sum, count} = Enum.reduce(es, {0, 0}, fn e, {sum, count} -> {sum + e[:value], count + 1} end)
    sum / count
  end

  # execute window
  run_window = fn
    [], window, _self ->
      {[], window}

    [head | tail], window, self ->
      t = head[:t]
      value = head[:value]
      head_window = [head] ++ window

      head_entries = [
        %{t: t, type: "raw", value: value},
        %{t: t, type: "avg(3)", value: avg.(Enum.slice(head_window, 0..2))},
        # %{t: t, type: "avg(5)", value: avg.(Enum.slice(head_window, 0..4))},
        %{t: t, type: "avg(7)", value: avg.(Enum.slice(head_window, 0..6))}
      ]

      new_window = Enum.slice(head_window, 0..6)
      {tail_entries, tail_window} = self.(tail, new_window, self)
      {head_entries ++ tail_entries, tail_window}
  end

  fun = fn _, window ->
    entries = Receiver.fetch(device)
    {new_entries, new_window} = run_window.(entries, window, run_window)
    Kino.VegaLite.push_many(kino, new_entries, window: 1000)
    {:cont, new_window}
  end

  Kino.listen(interval, [], fun)
end

Code for density:

live_density = fn ->
  interval = 200

  kino =
    Vl.new(width: 400, height: 300)
    |> Vl.transform(density: "value", bandwidth: 0.5)
    |> Vl.mark(:line)
    |> Vl.encode_field(:x, "value", type: :quantitative, title: "Value")
    |> Vl.encode_field(:y, "density", type: :quantitative)
    |> Kino.VegaLite.new()
    |> Kino.render()

  fun = fn _, nil ->
    entries = Receiver.fetch(device)
    Kino.VegaLite.push_many(kino, entries, window: 1000)
    {:cont, nil}
  end

  Kino.listen(interval, nil, fun)
end

Code for appending to file (append interface function not in use):

defmodule SerialToFile do
  use GenServer

  @sleeptime 1000 * 2

  # interface

  def start_link(filename, device) do
    GenServer.start_link(__MODULE__, {filename, device})
  end

  def append(pid, entries) do
    GenServer.cast(pid, {:append, entries})
  end

  # callbacks

  @impl true
  def init({filename, device}) do
    Process.send_after(self(), :scan, @sleeptime)
    {:ok, file} = File.open(filename, [:append])
    {:ok, {file, device}}
  end

  @impl true
  def handle_cast({:entries, entries}, {file, device}) do
    write(file, entries)
    {:noreply, {file, device}}
  end

  @impl true
  def handle_info(:scan, {file, device}) do
    entries = Receiver.fetch(device)
    write(file, entries)
    Process.send_after(self(), :scan, @sleeptime)
    {:noreply, {file, device}}
  end

  # helpers

  defp write(_file, []) do
    nil
  end

  defp write(file, [first | remaining]) do
    IO.write(file, "#{first[:t] / 1000},#{first[:value]}\n")
    write(file, remaining)
  end
end
output_filename = "output_filename.csv"

append_to_file = fn ->
  {:ok, pid} = SerialToFile.start_link(output_filename, device)

  pid
end
processing_suite = [
  %{label: "Do nothing", id: :nothing, fun: fn -> nil end},
  %{label: "Live Heatmap visualization", id: :live_heatmap, fun: live_heatmap},
  %{label: "Live running average visualization", id: :live_running, fun: live_running},
  %{label: "Live density plot visualization", id: :live_density, fun: live_density},
  %{label: "Append stream to file", id: :append2file, fun: append_to_file}
]

Processing

kino_choices =
  processing_suite
  |> Enum.map(fn %{label: label, id: _id, fun: fun} -> {fun, label} end)

input_kino =
  Kino.Input.select("Chose your action:", kino_choices)
viz = Kino.Input.read(input_kino)
result = viz.()