Connect to DSMR meter
Mix.install([
{:dsmr, "~> 0.6"},
{:kino, "~> 0.12.0"},
{:kino_vega_lite, "~> 0.1.10"}
])
alias VegaLite, as: Vl
Configuration
Configure your DSMR meter connection settings:
# Enter your DSMR meter's network address
# Examples:
# - WiFi P1 adapter: "192.168.1.100"
# - Hostname: "dsmr-meter.local"
meter_host = Kino.Input.text("Meter Host/IP", default: "192.168.1.100")
# Most P1-to-WiFi adapters use port 23 (Telnet)
# Some may use different ports - check your adapter's documentation
meter_port = Kino.Input.number("Meter Port", default: 23)
Kino.Layout.grid([meter_host, meter_port], columns: 2)
Define Meter GenServer
defmodule Meter do
use GenServer
require Logger
@connect_timeout 5000
@recv_timeout 5000
def listen(fun, opts) do
GenServer.start_link(__MODULE__, {fun, opts}, name: __MODULE__)
end
@impl true
def init({fun, opts}) do
{:ok, host} = parse_host(opts[:host])
state = %{socket: nil, fun: fun, lines: ""}
{:ok, state, {:continue, host: host, port: opts[:port]}}
end
@impl true
def handle_continue(opts, state) do
socket_opts = [:binary, active: false, packet: :line]
case :gen_tcp.connect(opts[:host], opts[:port], socket_opts, @connect_timeout) do
{:ok, socket} ->
send(self(), :recv_loop)
{:noreply, %{state | socket: socket}}
{:error, reason} ->
Logger.error("Unable to connect to meter - reason: #{inspect(reason)}")
{:stop, :normal, state}
end
end
@impl true
def handle_info(:recv_loop, state) do
case :gen_tcp.recv(state.socket, 0, @recv_timeout) do
{:ok, line} ->
send(self(), {:recv_line, line})
send(self(), :recv_loop)
{:noreply, state}
{:error, reason} ->
Logger.error("Unable to connect to remote TCP socket - reason: #{inspect(reason)}")
{:stop, :normal, state}
end
end
@impl true
def handle_info({:recv_line, "!" <> _ = line}, state) do
if state.lines != "" do
send(self(), {:telegram, state.lines <> line})
end
{:noreply, %{state | lines: ""}}
end
@impl true
def handle_info({:recv_line, "/" <> _ = line}, %{lines: ""} = state) do
# Ignore partially received telegram when starting to read from socket.
{:noreply, %{state | lines: line}}
end
@impl true
def handle_info({:recv_line, _line}, %{lines: ""} = state) do
# Ignore partially received telegram when starting to read from socket.
{:noreply, state}
end
@impl true
def handle_info({:recv_line, line}, state) do
{:noreply, %{state | lines: state.lines <> line}}
end
@impl true
def handle_info({:telegram, raw}, state) do
case DSMR.parse(raw) do
{:ok, telegram} ->
Logger.info("Received telegram - #{telegram.checksum}")
state.fun.(telegram)
{:error, reason} ->
Logger.error("Unable to parse telegram - reason: #{inspect(reason)}")
end
{:noreply, state}
end
defp parse_host(host) when is_binary(host) do
parse_host(String.to_charlist(host))
end
defp parse_host(host) do
case :inet.parse_address(host) do
{:ok, ip} -> {:ok, ip}
{:error, :einval} -> {:ok, host}
end
end
end
Visualize Electricity Usage
Create a real-time plot showing electricity delivered and returned to the grid:
usage_plot =
Vl.new(width: 600, height: 400, padding: 20)
|> Vl.repeat(
[layer: ["delivered", "returned"]],
Vl.new()
|> Vl.mark(:line)
|> Vl.encode_field(:x, "date", type: :temporal, title: "Measurement")
|> Vl.encode_repeat(:y, :layer, type: :quantitative, title: "Electricity Usage (kW)")
|> Vl.encode(:color, datum: [repeat: :layer], type: :nominal)
)
|> Kino.VegaLite.new()
Connect to Meter
Start listening to the meter and update the plot with each telegram:
{:ok, pid} =
Meter.listen(
fn telegram ->
# Extract measurement values
delivered =
case telegram.electricity_currently_delivered do
%{value: %Decimal{} = d} -> Decimal.to_float(d)
%{value: v} when is_float(v) -> v
_ -> 0.0
end
returned =
case telegram.electricity_currently_returned do
%{value: %Decimal{} = d} -> Decimal.to_float(d)
%{value: v} when is_float(v) -> v
_ -> 0.0
end
# Extract timestamp
timestamp =
case telegram.measured_at do
%{value: dt} -> dt
dt when is_struct(dt, NaiveDateTime) -> dt
_ -> NaiveDateTime.utc_now()
end
# Push to chart
Kino.VegaLite.push(
usage_plot,
%{date: timestamp, delivered: delivered, returned: returned},
window: 300
)
end,
host: Kino.Input.read(meter_host),
port: Kino.Input.read(meter_port)
)
Stop Listening
When you’re done, stop the meter listener:
GenServer.stop(pid, :shutdown)
Example: Simple Logger
Alternatively, you can just log telegram data without visualization:
{:ok, pid} =
Meter.listen(
fn telegram ->
IO.puts("""
========================================
Timestamp: #{inspect(telegram.measured_at)}
Version: DSMR #{telegram.version}
Currently Delivered: #{inspect(telegram.electricity_currently_delivered)}
Currently Returned: #{inspect(telegram.electricity_currently_returned)}
Cumulative Delivered (T1): #{inspect(telegram.electricity_delivered_1)}
Cumulative Delivered (T2): #{inspect(telegram.electricity_delivered_2)}
Gas: #{inspect(List.first(telegram.mbus_devices))}
========================================
""")
end,
host: Kino.Input.read(meter_host),
port: Kino.Input.read(meter_port)
)
# Stop when done
# GenServer.stop(pid, :shutdown)