Powered by AppSignal & Oban Pro

Dummy Smart Meter

examples/dummy_smartmeter.livemd

Dummy Smart Meter

Mix.install([
  {:dsmr, "~> 1.0"},
  {:kino, "~> 0.12.0"},
  {:kino_vega_lite, "~> 0.1.10"}
])

alias VegaLite, as: Vl

Configuration

Configure the dummy meter settings:

# Emission interval in milliseconds (default: 1000ms = 1 second)
emission_interval = Kino.Input.number("Emission Interval (ms)", default: 1000)

# Time acceleration factor (1 = real-time, 60 = 1 minute per second, etc.)
time_acceleration = Kino.Input.number("Time Acceleration Factor", default: 60)

# TCP server port for P1 clients to connect
server_port = Kino.Input.number("TCP Server Port", default: 8000)

Kino.Layout.grid([emission_interval, time_acceleration, server_port], columns: 3)

Define TCP Server

defmodule TCPServer do
  use GenServer

  require Logger

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def broadcast(telegram_string) do
    GenServer.cast(__MODULE__, {:broadcast, telegram_string})
  end

  @impl true
  def init(opts) do
    port = opts[:port] || 8000

    {:ok, listen_socket} =
      :gen_tcp.listen(port, [
        :binary,
        active: false,
        reuseaddr: true,
        packet: :line
      ])

    Logger.info("TCP server listening on port #{port}")

    # Start accepting connections
    send(self(), :accept)

    {:ok, %{listen_socket: listen_socket, clients: []}}
  end

  @impl true
  def handle_info(:accept, state) do
    case :gen_tcp.accept(state.listen_socket) do
      {:ok, client_socket} ->
        Logger.info("Client connected: #{inspect(client_socket)}")
        # Continue accepting more connections
        send(self(), :accept)
        {:noreply, %{state | clients: [client_socket | state.clients]}}

      {:error, reason} ->
        Logger.error("Failed to accept connection: #{inspect(reason)}")
        {:noreply, state}
    end
  end

  @impl true
  def handle_cast({:broadcast, telegram_string}, state) do
    # Send telegram to all connected clients
    active_clients =
      Enum.filter(state.clients, fn socket ->
        case :gen_tcp.send(socket, telegram_string) do
          :ok ->
            true

          {:error, reason} ->
            Logger.warning("Failed to send to client: #{inspect(reason)}")
            :gen_tcp.close(socket)
            false
        end
      end)

    {:noreply, %{state | clients: active_clients}}
  end
end

Define Dummy Meter GenServer

defmodule DummyMeter do
  use GenServer

  require Logger

  @moduledoc """
  A dummy smart meter that emits telegrams with wave-pattern electricity usage.
  More electricity is consumed during the day (8am-10pm) than at night.
  """

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def stop do
    GenServer.stop(__MODULE__)
  end

  @impl true
  def init(opts) do
    interval = opts[:interval] || 1000
    acceleration = opts[:acceleration] || 1
    callback = opts[:callback]
    tcp_enabled = opts[:tcp_enabled] || false

    # Initial state with cumulative counters
    state = %{
      interval: interval,
      acceleration: acceleration,
      callback: callback,
      tcp_enabled: tcp_enabled,
      virtual_time: NaiveDateTime.utc_now(),
      electricity_delivered_1: 1234.567,
      electricity_delivered_2: 2345.678,
      electricity_returned_1: 123.456,
      electricity_returned_2: 234.567,
      gas_delivered: 987.654
    }

    # Start emission loop
    schedule_emission(0)

    {:ok, state}
  end

  @impl true
  def handle_info(:emit_telegram, state) do
    # Advance virtual time
    virtual_time =
      NaiveDateTime.add(state.virtual_time, state.acceleration, :second)

    # Calculate current power based on time of day (wave pattern)
    hour = virtual_time.hour
    minute = virtual_time.minute

    # Calculate time as fraction of day (0.0 to 1.0)
    time_of_day = (hour * 60 + minute) / (24 * 60)

    # Wave pattern: peak at 18:00 (0.75), low at 3:00 (0.125)
    # Using shifted cosine wave
    phase = (time_of_day - 0.75) * 2 * :math.pi()
    wave = (:math.cos(phase) + 1) / 2

    # Map wave to power range: 0.2 kW (night) to 2.5 kW (day peak)
    base_power = 0.2
    peak_power = 2.5
    current_power = base_power + wave * (peak_power - base_power)

    # Add some randomness (±10%)
    noise = :rand.uniform() * 0.2 - 0.1
    current_power = current_power * (1 + noise)
    current_power = max(0.0, current_power)

    # Update cumulative counters (add power consumption for this interval)
    # Power is in kW, interval is in seconds, so kWh = kW * (seconds / 3600)
    energy_increment = current_power * (state.interval / 1000 / 3600)

    # Randomly distribute between tariff 1 and 2 (day/night)
    {delivered_1, delivered_2} =
      if hour >= 23 or hour < 7 do
        # Night tariff (T2)
        {state.electricity_delivered_1, state.electricity_delivered_2 + energy_increment}
      else
        # Day tariff (T1)
        {state.electricity_delivered_1 + energy_increment, state.electricity_delivered_2}
      end

    # Occasionally return power (solar panels)
    {current_returned, returned_1, returned_2} =
      if hour >= 10 and hour < 17 and :rand.uniform() < 0.3 do
        returned_power = :rand.uniform() * 1.5
        returned_increment = returned_power * (state.interval / 1000 / 3600)

        {returned_power, state.electricity_returned_1 + returned_increment,
         state.electricity_returned_2}
      else
        {0.0, state.electricity_returned_1, state.electricity_returned_2}
      end

    # Gas increments slowly (typical usage: ~5 m³/day)
    gas_increment =
      if rem(virtual_time.hour, 1) == 0 and virtual_time.minute == 0 do
        # Update gas once per hour
        0.2
      else
        0.0
      end

    gas_delivered = state.gas_delivered + gas_increment

    # Build telegram
    telegram = %DSMR.Telegram{
      header: "ISk5\\2MT382-1000",
      version: "50",
      measured_at: %DSMR.Timestamp{
        value: virtual_time,
        dst: "W"
      },
      equipment_id: "4530303437303030303037363330383137",
      electricity_delivered_1: %DSMR.Measurement{
        value: Float.round(delivered_1, 3),
        unit: "kWh"
      },
      electricity_delivered_2: %DSMR.Measurement{
        value: Float.round(delivered_2, 3),
        unit: "kWh"
      },
      electricity_returned_1: %DSMR.Measurement{
        value: Float.round(returned_1, 3),
        unit: "kWh"
      },
      electricity_returned_2: %DSMR.Measurement{
        value: Float.round(returned_2, 3),
        unit: "kWh"
      },
      electricity_tariff_indicator: if(hour >= 23 or hour < 7, do: "0002", else: "0001"),
      electricity_currently_delivered: %DSMR.Measurement{
        value: Float.round(current_power, 3),
        unit: "kW"
      },
      electricity_currently_returned: %DSMR.Measurement{
        value: Float.round(current_returned, 3),
        unit: "kW"
      },
      power_failures_count: "0",
      power_failures_long_count: "0",
      voltage_sags_l1_count: "0",
      voltage_swells_l1_count: "0",
      mbus_devices: [
        %DSMR.MBusDevice{
          channel: 1,
          device_type: "003",
          equipment_id: "4730303339303031373434313430323137",
          last_reading_measured_at: %DSMR.Timestamp{
            value: virtual_time,
            dst: "W"
          },
          last_reading_value: %DSMR.Measurement{
            value: Float.round(gas_delivered, 3),
            unit: "m3"
          }
        }
      ],
      checksum: "0000"
    }

    # Calculate actual checksum
    telegram_string = DSMR.Telegram.to_string(%{telegram | checksum: "0000"})
    checksum = calculate_checksum(telegram_string)
    telegram = %{telegram | checksum: checksum}
    final_telegram_string = DSMR.Telegram.to_string(telegram)

    # Emit telegram via callback
    Logger.info("Emitting telegram for #{virtual_time}")
    state.callback.(telegram)

    # Broadcast to TCP clients if enabled
    if state.tcp_enabled do
      TCPServer.broadcast(final_telegram_string)
    end

    # Schedule next emission
    schedule_emission(state.interval)

    {:noreply,
     %{
       state
       | virtual_time: virtual_time,
         electricity_delivered_1: delivered_1,
         electricity_delivered_2: delivered_2,
         electricity_returned_1: returned_1,
         electricity_returned_2: returned_2,
         gas_delivered: gas_delivered
     }}
  end

  defp schedule_emission(interval) do
    Process.send_after(self(), :emit_telegram, interval)
  end

  defp calculate_checksum(telegram_string) do
    # Extract data between / and ! (inclusive of !)
    data =
      telegram_string
      |> String.split("/", parts: 2)
      |> List.last()

    # Calculate CRC16
    crc = DSMR.CRC16.calculate(data)

    # Return as uppercase hex string
    crc
    |> Integer.to_string(16)
    |> String.pad_leading(4, "0")
    |> String.upcase()
  end
end

Start TCP Server

Start the TCP server to allow P1 clients to connect:

{:ok, tcp_server} = TCPServer.start_link(port: Kino.Input.read(server_port))

Visualize Electricity Usage

Create a real-time plot showing electricity delivered and returned:

usage_plot =
  Vl.new(width: 600, height: 400, padding: 20)
  |> Vl.repeat(
    [layer: ["delivered", "returned"]],
    Vl.new()
    |> Vl.mark(:line)
    |> Vl.encode_field(:x, "date", type: :temporal, title: "Time")
    |> Vl.encode_repeat(:y, :layer, type: :quantitative, title: "Electricity (kW)")
    |> Vl.encode(:color, datum: [repeat: :layer], type: :nominal)
  )
  |> Kino.VegaLite.new()

Start Dummy Meter

Start the dummy meter and watch the wave pattern emerge. The meter will broadcast telegrams to all connected P1 clients:

{:ok, meter_pid} =
  DummyMeter.start_link(
    interval: Kino.Input.read(emission_interval),
    acceleration: Kino.Input.read(time_acceleration),
    tcp_enabled: true,
    callback: fn telegram ->
      # Extract current power
      delivered = telegram.electricity_currently_delivered.value
      returned = telegram.electricity_currently_returned.value
      timestamp = telegram.measured_at.value

      # Push to chart
      Kino.VegaLite.push(
        usage_plot,
        %{date: timestamp, delivered: delivered, returned: returned},
        window: 300
      )
    end
  )

Connect with P1 Client

You can now connect to the dummy meter from another Livebook or application using the existing connect_to_dsmr_meter.livemd example. Just use localhost as the host and the configured port (default: 8000).

Example connection settings:

  • Host: localhost
  • Port: 8000 (or your configured port)

Stop Meter and Server

When you’re done, stop the dummy meter and TCP server:

DummyMeter.stop()
GenServer.stop(tcp_server)

Alternative: Log Telegrams Only

If you only want to log telegram data without TCP broadcasting:

{:ok, meter_pid} =
  DummyMeter.start_link(
    interval: Kino.Input.read(emission_interval),
    acceleration: Kino.Input.read(time_acceleration),
    tcp_enabled: false,
    callback: fn telegram ->
      IO.puts("""
      ========================================
      Timestamp: #{inspect(telegram.measured_at.value)}
      Currently Delivered: #{telegram.electricity_currently_delivered.value} kW
      Currently Returned: #{telegram.electricity_currently_returned.value} kW
      Cumulative T1: #{telegram.electricity_delivered_1.value} kWh
      Cumulative T2: #{telegram.electricity_delivered_2.value} kWh
      Gas: #{List.first(telegram.mbus_devices).last_reading_value.value} m³
      ========================================
      """)
    end
  )

# Stop when done
# DummyMeter.stop()

Alternative: Export as Raw DSMR Format

You can also print telegrams as raw DSMR strings:

{:ok, meter_pid} =
  DummyMeter.start_link(
    interval: Kino.Input.read(emission_interval),
    acceleration: Kino.Input.read(time_acceleration),
    tcp_enabled: false,
    callback: fn telegram ->
      telegram
      |> DSMR.Telegram.to_string()
      |> IO.puts()
    end
  )

# Stop when done
# DummyMeter.stop()