Serial Terminal
Mix.install([
{:circuits_uart, "~> 1.5"},
{:kino, "~> 0.12.3"}
])
Introduction
This workbook functions as a primitive serial terminal.
Features:
- Configuration options: device port, baud rate, number of data bits and number of stop bits.
- Automatically lists available ports.
- It reads data one line at a time and appends it to a list of messages.
- A polled input buffer with a windowed display is employed to protect against high-velocity producers.
- It allows for sending messages over the serial line.
Restrictions:
- No supervision is employed to deal with disconnects and other errors.
- A sideeffect of the protection against high-velocity producers is that messages may be dropped.
Bridge
defmodule Bridge do
use GenServer
@window_size 10
@sleep_time 1000
# interface
def start(frame, device, speed, data_bits, stop_bits) do
state = %{
frame: frame,
device: device,
speed: speed,
data_bits: data_bits,
stop_bits: stop_bits,
messages: []
}
GenServer.start(__MODULE__, state, name: __MODULE__)
end
def send(message) do
GenServer.cast(__MODULE__, {:send, message})
end
# callbacks
@impl true
def init(state) do
%{device: device, speed: speed, data_bits: data_bits, stop_bits: stop_bits} =
state
{:ok, uart_pid} = Circuits.UART.start_link()
result =
Circuits.UART.open(uart_pid, device,
speed: speed,
data_bits: data_bits,
stop_bits: stop_bits,
active: true,
framing: Circuits.UART.Framing.Line
)
return_code =
case result do
:ok ->
state = update_view(state, "*#{device} connected*")
{:ok, Map.put(state, :pid, uart_pid)}
{:error, :eagain} ->
update_view(state, "**Error:** *#{device} already in use*")
{:stop, :eagain}
{:error, reason} ->
update_view(
state,
"**Error:** Unable to open device *#{device} due to reason `#{reason}`"
)
{:stop, reason}
end
prime_display()
return_code
end
@impl true
def handle_cast({:send, message}, state) do
%{pid: pid} = state
Circuits.UART.write(pid, message)
state = update_view(state, "**out:** #{message}")
{:noreply, state}
end
@impl true
def handle_info({:circuits_uart, _, message}, state) do
state = update_view(state, "**in:** #{message}")
{:noreply, state}
end
@impl true
def handle_info(:display, state) do
%{frame: frame, messages: messages} = state
messages = Enum.take(messages, @window_size)
Kino.Frame.clear(frame)
Kino.Frame.append(frame, Kino.Markdown.new(messages |> Enum.reverse() |> Enum.join("\n\n")))
prime_display()
{:noreply, Map.put(state, :messages, messages)}
end
# helpers
defp update_view(state, message) do
%{messages: messages} = state
Map.put(state, :messages, [message | messages])
end
defp prime_display() do
Process.send_after(self(), :display, @sleep_time)
end
end
Discovery
Please attach your device!
List available ports:
bauds = [9600, 19200, 38400, 57600, 115_200, 230_400, 460_800, 576_000]
data_bits = [5, 6, 7, 8]
stop_bits = [1, 2]
uarts = Circuits.UART.enumerate()
if Enum.empty?(uarts) do
Kino.Markdown.new("**Error:** No UARTs detected!")
else
Kino.Tree.new(uarts)
end
Configuration
elements = [
Kino.Input.select("Port:", Enum.map(uarts, fn {k, _v} -> {k, k} end)),
Kino.Input.select("Device Speed:", Enum.map(bauds, fn s -> {s, s} end), default: 115_200),
Kino.Input.select("Data bits:", Enum.map(data_bits, fn s -> {s, s} end), default: 8),
Kino.Input.select("Stop bits:", Enum.map(stop_bits, fn s -> {s, s} end), default: 1)
]
Kino.Layout.grid(elements)
[device, speed, data_bit, stop_bit] =
Enum.map(elements, fn element -> Kino.Input.read(element) end)
device_config = Map.fetch!(uarts, device)
Kino.Markdown.new(
"**Configuration:**" <>
"""
Port `#{device}` at `#{speed}` b/s with `#{data_bit}` data bits and `#{stop_bit}` stop bit(s).
Info:
* **Description:** #{Map.get(device_config, :description, "*unknown*")}
* **Manufacturer:** #{Map.get(device_config, :manufacturer, "*unknown*")}
* **Vendor ID:** #{Map.get(device_config, :vendor_id, "*unknown*")}
* **Product ID:** #{Map.get(device_config, :product_id, "*unknown*")}
* **Serial Number:** #{Map.get(device_config, :serial_number, "*unknown*")}
"""
)
Interface
frame = Kino.Frame.new()
{:ok, bridge_pid} = Bridge.start(frame, device, speed, data_bit, stop_bit)
elements = [
message: Kino.Input.text("Message:")
]
form = Kino.Control.form(elements, submit: "Send", reset_on_submit: [:command])
Event Loop
for event <- Kino.Control.stream(form) do
%{data: %{message: message}, type: _type, origin: _origin} = event
Bridge.send(message)
end
Close connection
Process.exit(bridge_pid, :kill)