Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Socket

socket.livemd

Socket

Mix.install([
  {:circuits_uart, "~> 1.3"},
  {:vega_lite, "~> 0.1.5"},
  {:kino_vega_lite, "~> 0.1.7"},
  {:kino, "~> 0.8.0"}
])

Introduction

This livebook opens a port. Whenever connections are made to this port, and lines are transmitted, each such line is timestamped and appended to a logfile.

There is a bit of superflous code here. It serves a demo function, but could be removed for real use. This code writes a dummy line to the file.

Configuration

kino_port = Kino.Input.text("Port to listen to:", default: "8082")
kino_filename = Kino.Input.text("File to log to:", default: "in_livebook_working_directory.csv")

Kino.Layout.grid([kino_port, kino_filename])
port = String.to_integer(Kino.Input.read(kino_port))
filename = Kino.Input.read(kino_filename)
"port #{port}#{filename}"

Logger

GenServer for logging time/value pairs to a CSV file:

defmodule LogDumper do
  use GenServer

  # interface

  def start_link(filename) do
    GenServer.start_link(__MODULE__, {filename}, name: __MODULE__)
  end

  def append(entries) do
    GenServer.cast(__MODULE__, {:append, entries})
  end

  # callbacks

  @impl true
  def init({filename}) do
    IO.puts("opening #{filename}")
    {:ok, _file} = File.open(filename, [:append])
  end

  @impl true
  def handle_cast({:append, entries}, file) do
    IO.puts("append called")
    write(file, entries)
    {:noreply, file}
  end

  # helpers

  defp write(_file, []) do
    nil
  end

  defp write(file, [first | remaining]) do
    IO.write(file, "#{first[:t] / 1000},#{first[:value]}\n")
    write(file, remaining)
  end
end
{:ok, pid_logger} = LogDumper.start_link(filename)

What do we know about this process?

Process.info(pid_logger)

Test it:

LogDumper.append([%{t: 1, value: 42}])

Receiver

GenServer to read data (lines, really), timestamp them and send them to the LogDumper GenServer for persistence.

defmodule SocketListener do
  use GenServer

  # interface

  def start_link(port) do
    GenServer.start_link(__MODULE__, {port}, name: __MODULE__)
  end

  def append(entries) do
    GenServer.cast(__MODULE__, {:append, entries})
  end

  # callbacks

  @impl true
  def init({port}) do
    opts = [:binary, active: true, packet: :line, reuseaddr: true]
    {:ok, server_socket} = :gen_tcp.listen(port, opts)
    Process.send_after(self(), :accept, 0)
    {:ok, %{serversocket: server_socket}}
  end

  @impl true
  def handle_info({:tcp, _socket, msg}, state) do
    IO.puts("'#{msg}'")

    LogDumper.append([
      %{t: :os.system_time(:milli_seconds), value: String.trim_trailing(msg)}
    ])

    {:noreply, state}
  end

  def handle_info({:tcp_closed, _port}, state) do
    Process.send_after(self(), :accept, 0)
    {:noreply, state}
  end

  def handle_info(:accept, %{serversocket: server_socket} = state) do
    {:ok, client_socket} = :gen_tcp.accept(server_socket)
    {:noreply, Map.put(state, :clientsocket, client_socket)}
  end
end
{:ok, pid_listener} = SocketListener.start_link(port)