Powered by AppSignal & Oban Pro

Connect to DSMR meter

examples/connect_to_dsmr_meter.livemd

Connect to DSMR meter

Mix.install([
  {:dsmr, "~> 0.6"},
  {:kino, "~> 0.12.0"},
  {:kino_vega_lite, "~> 0.1.10"}
])

alias VegaLite, as: Vl

Configuration

Configure your DSMR meter connection settings:

# Enter your DSMR meter's network address
# Examples:
#   - WiFi P1 adapter: "192.168.1.100"
#   - Hostname: "dsmr-meter.local"
meter_host = Kino.Input.text("Meter Host/IP", default: "192.168.1.100")

# Most P1-to-WiFi adapters use port 23 (Telnet)
# Some may use different ports - check your adapter's documentation
meter_port = Kino.Input.number("Meter Port", default: 23)

Kino.Layout.grid([meter_host, meter_port], columns: 2)

Define Meter GenServer

defmodule Meter do
  use GenServer

  require Logger

  @connect_timeout 5000
  @recv_timeout 5000

  def listen(fun, opts) do
    GenServer.start_link(__MODULE__, {fun, opts}, name: __MODULE__)
  end

  @impl true
  def init({fun, opts}) do
    {:ok, host} = parse_host(opts[:host])

    state = %{socket: nil, fun: fun, lines: ""}
    {:ok, state, {:continue, host: host, port: opts[:port]}}
  end

  @impl true
  def handle_continue(opts, state) do
    socket_opts = [:binary, active: false, packet: :line]

    case :gen_tcp.connect(opts[:host], opts[:port], socket_opts, @connect_timeout) do
      {:ok, socket} ->
        send(self(), :recv_loop)
        {:noreply, %{state | socket: socket}}

      {:error, reason} ->
        Logger.error("Unable to connect to meter - reason: #{inspect(reason)}")
        {:stop, :normal, state}
    end
  end

  @impl true
  def handle_info(:recv_loop, state) do
    case :gen_tcp.recv(state.socket, 0, @recv_timeout) do
      {:ok, line} ->
        send(self(), {:recv_line, line})
        send(self(), :recv_loop)
        {:noreply, state}

      {:error, reason} ->
        Logger.error("Unable to connect to remote TCP socket - reason: #{inspect(reason)}")
        {:stop, :normal, state}
    end
  end

  @impl true
  def handle_info({:recv_line, "!" <> _ = line}, state) do
    if state.lines != "" do
      send(self(), {:telegram, state.lines <> line})
    end

    {:noreply, %{state | lines: ""}}
  end

  @impl true
  def handle_info({:recv_line, "/" <> _ = line}, %{lines: ""} = state) do
    # Ignore partially received telegram when starting to read from socket.
    {:noreply, %{state | lines: line}}
  end

  @impl true
  def handle_info({:recv_line, _line}, %{lines: ""} = state) do
    # Ignore partially received telegram when starting to read from socket.
    {:noreply, state}
  end

  @impl true
  def handle_info({:recv_line, line}, state) do
    {:noreply, %{state | lines: state.lines <> line}}
  end

  @impl true
  def handle_info({:telegram, raw}, state) do
    case DSMR.parse(raw) do
      {:ok, telegram} ->
        Logger.info("Received telegram - #{telegram.checksum}")
        state.fun.(telegram)

      {:error, reason} ->
        Logger.error("Unable to parse telegram - reason: #{inspect(reason)}")
    end

    {:noreply, state}
  end

  defp parse_host(host) when is_binary(host) do
    parse_host(String.to_charlist(host))
  end

  defp parse_host(host) do
    case :inet.parse_address(host) do
      {:ok, ip} -> {:ok, ip}
      {:error, :einval} -> {:ok, host}
    end
  end
end

Visualize Electricity Usage

Create a real-time plot showing electricity delivered and returned to the grid:

usage_plot =
  Vl.new(width: 600, height: 400, padding: 20)
  |> Vl.repeat(
    [layer: ["delivered", "returned"]],
    Vl.new()
    |> Vl.mark(:line)
    |> Vl.encode_field(:x, "date", type: :temporal, title: "Measurement")
    |> Vl.encode_repeat(:y, :layer, type: :quantitative, title: "Electricity Usage (kW)")
    |> Vl.encode(:color, datum: [repeat: :layer], type: :nominal)
  )
  |> Kino.VegaLite.new()

Connect to Meter

Start listening to the meter and update the plot with each telegram:

{:ok, pid} =
  Meter.listen(
    fn telegram ->
      # Extract measurement values
      delivered =
        case telegram.electricity_currently_delivered do
          %{value: %Decimal{} = d} -> Decimal.to_float(d)
          %{value: v} when is_float(v) -> v
          _ -> 0.0
        end

      returned =
        case telegram.electricity_currently_returned do
          %{value: %Decimal{} = d} -> Decimal.to_float(d)
          %{value: v} when is_float(v) -> v
          _ -> 0.0
        end

      # Extract timestamp
      timestamp =
        case telegram.measured_at do
          %{value: dt} -> dt
          dt when is_struct(dt, NaiveDateTime) -> dt
          _ -> NaiveDateTime.utc_now()
        end

      # Push to chart
      Kino.VegaLite.push(
        usage_plot,
        %{date: timestamp, delivered: delivered, returned: returned},
        window: 300
      )
    end,
    host: Kino.Input.read(meter_host),
    port: Kino.Input.read(meter_port)
  )

Stop Listening

When you’re done, stop the meter listener:

GenServer.stop(pid, :shutdown)

Example: Simple Logger

Alternatively, you can just log telegram data without visualization:

{:ok, pid} =
  Meter.listen(
    fn telegram ->
      IO.puts("""
      ========================================
      Timestamp: #{inspect(telegram.measured_at)}
      Version: DSMR #{telegram.version}
      Currently Delivered: #{inspect(telegram.electricity_currently_delivered)}
      Currently Returned: #{inspect(telegram.electricity_currently_returned)}
      Cumulative Delivered (T1): #{inspect(telegram.electricity_delivered_1)}
      Cumulative Delivered (T2): #{inspect(telegram.electricity_delivered_2)}
      Gas: #{inspect(List.first(telegram.mbus_devices))}
      ========================================
      """)
    end,
    host: Kino.Input.read(meter_host),
    port: Kino.Input.read(meter_port)
  )

# Stop when done
# GenServer.stop(pid, :shutdown)