Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Serial Terminal

serial-terminal.livemd

Serial Terminal

Mix.install([
  {:circuits_uart, "~> 1.5"},
  {:kino, "~> 0.12.3"}
])

Introduction

This workbook functions as a primitive serial terminal.

Features:

  • Configuration options: device port, baud rate, number of data bits and number of stop bits.
  • Automatically lists available ports.
  • It reads data one line at a time and appends it to a list of messages.
  • A polled input buffer with a windowed display is employed to protect against high-velocity producers.
  • It allows for sending messages over the serial line.

Restrictions:

  • No supervision is employed to deal with disconnects and other errors.
  • A sideeffect of the protection against high-velocity producers is that messages may be dropped.

Bridge

defmodule Bridge do
  use GenServer

  @window_size 10
  @sleep_time 1000

  # interface

  def start(frame, device, speed, data_bits, stop_bits) do
    state = %{
      frame: frame,
      device: device,
      speed: speed,
      data_bits: data_bits,
      stop_bits: stop_bits,
      messages: []
    }

    GenServer.start(__MODULE__, state, name: __MODULE__)
  end

  def send(message) do
    GenServer.cast(__MODULE__, {:send, message})
  end

  # callbacks

  @impl true
  def init(state) do
    %{device: device, speed: speed, data_bits: data_bits, stop_bits: stop_bits} =
      state

    {:ok, uart_pid} = Circuits.UART.start_link()

    result =
      Circuits.UART.open(uart_pid, device,
        speed: speed,
        data_bits: data_bits,
        stop_bits: stop_bits,
        active: true,
        framing: Circuits.UART.Framing.Line
      )

    return_code =
      case result do
        :ok ->
          state = update_view(state, "*#{device} connected*")
          {:ok, Map.put(state, :pid, uart_pid)}

        {:error, :eagain} ->
          update_view(state, "**Error:** *#{device} already in use*")
          {:stop, :eagain}

        {:error, reason} ->
          update_view(
            state,
            "**Error:** Unable to open device *#{device} due to reason `#{reason}`"
          )

          {:stop, reason}
      end

    prime_display()
    return_code
  end

  @impl true
  def handle_cast({:send, message}, state) do
    %{pid: pid} = state
    Circuits.UART.write(pid, message)
    state = update_view(state, "**out:** #{message}")
    {:noreply, state}
  end

  @impl true
  def handle_info({:circuits_uart, _, message}, state) do
    state = update_view(state, "**in:** #{message}")
    {:noreply, state}
  end

  @impl true
  def handle_info(:display, state) do
    %{frame: frame, messages: messages} = state
    messages = Enum.take(messages, @window_size)

    Kino.Frame.clear(frame)
    Kino.Frame.append(frame, Kino.Markdown.new(messages |> Enum.reverse() |> Enum.join("\n\n")))

    prime_display()
    {:noreply, Map.put(state, :messages, messages)}
  end

  # helpers

  defp update_view(state, message) do
    %{messages: messages} = state
    Map.put(state, :messages, [message | messages])
  end

  defp prime_display() do
    Process.send_after(self(), :display, @sleep_time)
  end
end

Discovery

Please attach your device!

List available ports:

bauds = [9600, 19200, 38400, 57600, 115_200, 230_400, 460_800, 576_000]
data_bits = [5, 6, 7, 8]
stop_bits = [1, 2]
uarts = Circuits.UART.enumerate()

if Enum.empty?(uarts) do
  Kino.Markdown.new("**Error:** No UARTs detected!")
else
  Kino.Tree.new(uarts)
end

Configuration

elements = [
  Kino.Input.select("Port:", Enum.map(uarts, fn {k, _v} -> {k, k} end)),
  Kino.Input.select("Device Speed:", Enum.map(bauds, fn s -> {s, s} end), default: 115_200),
  Kino.Input.select("Data bits:", Enum.map(data_bits, fn s -> {s, s} end), default: 8),
  Kino.Input.select("Stop bits:", Enum.map(stop_bits, fn s -> {s, s} end), default: 1)
]

Kino.Layout.grid(elements)
[device, speed, data_bit, stop_bit] =
  Enum.map(elements, fn element -> Kino.Input.read(element) end)

device_config = Map.fetch!(uarts, device)

Kino.Markdown.new(
  "**Configuration:**" <>
    """
    Port `#{device}` at `#{speed}` b/s with `#{data_bit}` data bits and `#{stop_bit}` stop bit(s).

    Info:
    * **Description:** #{Map.get(device_config, :description, "*unknown*")}
    * **Manufacturer:** #{Map.get(device_config, :manufacturer, "*unknown*")}
    * **Vendor ID:** #{Map.get(device_config, :vendor_id, "*unknown*")}
    * **Product ID:** #{Map.get(device_config, :product_id, "*unknown*")}
    * **Serial Number:** #{Map.get(device_config, :serial_number, "*unknown*")}
    """
)

Interface

frame = Kino.Frame.new()
{:ok, bridge_pid} = Bridge.start(frame, device, speed, data_bit, stop_bit)
elements = [
  message: Kino.Input.text("Message:")
]

form = Kino.Control.form(elements, submit: "Send", reset_on_submit: [:command])

Event Loop

for event <- Kino.Control.stream(form) do
  %{data: %{message: message}, type: _type, origin: _origin} = event
  Bridge.send(message)
end

Close connection

Process.exit(bridge_pid, :kill)