Socket
Mix.install([
{:circuits_uart, "~> 1.3"},
{:vega_lite, "~> 0.1.5"},
{:kino_vega_lite, "~> 0.1.7"},
{:kino, "~> 0.8.0"}
])
Introduction
This livebook opens a port. Whenever connections are made to this port, and lines are transmitted, each such line is timestamped and appended to a logfile.
There is a bit of superflous code here. It serves a demo function, but could be removed for real use. This code writes a dummy line to the file.
Configuration
kino_port = Kino.Input.text("Port to listen to:", default: "8082")
kino_filename = Kino.Input.text("File to log to:", default: "in_livebook_working_directory.csv")
Kino.Layout.grid([kino_port, kino_filename])
port = String.to_integer(Kino.Input.read(kino_port))
filename = Kino.Input.read(kino_filename)
"port #{port} → #{filename}"
Logger
GenServer for logging time/value pairs to a CSV file:
defmodule LogDumper do
use GenServer
# interface
def start_link(filename) do
GenServer.start_link(__MODULE__, {filename}, name: __MODULE__)
end
def append(entries) do
GenServer.cast(__MODULE__, {:append, entries})
end
# callbacks
@impl true
def init({filename}) do
IO.puts("opening #{filename}")
{:ok, _file} = File.open(filename, [:append])
end
@impl true
def handle_cast({:append, entries}, file) do
IO.puts("append called")
write(file, entries)
{:noreply, file}
end
# helpers
defp write(_file, []) do
nil
end
defp write(file, [first | remaining]) do
IO.write(file, "#{first[:t] / 1000},#{first[:value]}\n")
write(file, remaining)
end
end
{:ok, pid_logger} = LogDumper.start_link(filename)
What do we know about this process?
Process.info(pid_logger)
Test it:
LogDumper.append([%{t: 1, value: 42}])
Receiver
GenServer to read data (lines, really), timestamp them and send them to the LogDumper
GenServer for persistence.
defmodule SocketListener do
use GenServer
# interface
def start_link(port) do
GenServer.start_link(__MODULE__, {port}, name: __MODULE__)
end
def append(entries) do
GenServer.cast(__MODULE__, {:append, entries})
end
# callbacks
@impl true
def init({port}) do
opts = [:binary, active: true, packet: :line, reuseaddr: true]
{:ok, server_socket} = :gen_tcp.listen(port, opts)
Process.send_after(self(), :accept, 0)
{:ok, %{serversocket: server_socket}}
end
@impl true
def handle_info({:tcp, _socket, msg}, state) do
IO.puts("'#{msg}'")
LogDumper.append([
%{t: :os.system_time(:milli_seconds), value: String.trim_trailing(msg)}
])
{:noreply, state}
end
def handle_info({:tcp_closed, _port}, state) do
Process.send_after(self(), :accept, 0)
{:noreply, state}
end
def handle_info(:accept, %{serversocket: server_socket} = state) do
{:ok, client_socket} = :gen_tcp.accept(server_socket)
{:noreply, Map.put(state, :clientsocket, client_socket)}
end
end
{:ok, pid_listener} = SocketListener.start_link(port)