Powered by AppSignal & Oban Pro

MQTT Moving Average

mqtt-moving-average.livemd

MQTT Moving Average

Mix.install([
  {:kino, "~> 0.8.0"},
  {:tortoise, "~> 0.9"},
  {:jason, "~> 1.2"}
])

Configuration

Which broker and prefix to work on:

kino_broker = Kino.Input.text("MQTT Broker:", default: "broker.hivemq.com")
# using public mqtt broker
kino_prefix = Kino.Input.text("MQTT Prefix:", default: "dk/sdu/iot/2023/anton")
# subscribe to dk/sdu/iot/2023/anton/#
Kino.Layout.grid([kino_broker, kino_prefix])
{broker, prefix} = {
  Kino.Input.read(kino_broker),
  Kino.Input.read(kino_prefix)
}

Moving Average Node

First we start an MQTT client:

t = :os.system_time(:milli_seconds)
client_id = "sdu_iot_mqtt_livebook_mavg_#{t}"

Tortoise.Supervisor.start_child(
  client_id: client_id,
  handler: {Tortoise.Handler.Default, []},
  server: {Tortoise.Transport.Tcp, host: broker, port: 1883},
  subscriptions: []
)

Then, we define the node:

defmodule MovingAverage do
  use GenServer

  @windowsize 3

  # interface

  def start_link({client_id, topic, windowsize}) do
    otopic = Enum.join(topic, "/") |> String.replace("siggen", "mavg")
    state = [client: client_id, topic: otopic, windowsize: windowsize]
    GenServer.start(__MODULE__, state, name: via_tuple(topic))
  end

  def consume(pid, {_time, _value} = sample) when is_pid(pid) do
    GenServer.cast(pid, {:consume, sample})
  end

  def consume(client_id, topic, {_time, _value} = sample) do
    pid =
      case Registry.lookup(:mavg_registry, {__MODULE__, topic}) do
        [] ->
          options = {client_id, topic, @windowsize}
          {:ok, pid} = MovingAverage.start_link(options)
          pid

        [{pid, _}] ->
          pid
      end

    GenServer.cast(pid, {:consume, sample})
  end

  # callbacks

  @impl true
  def init(client: client_id, topic: topic, windowsize: windowsize) do
    state = [client: client_id, topic: topic, windowsize: windowsize, window: []]
    {:ok, state}
  end

  @impl true
  def handle_cast({:consume, {time, _value} = sample}, state) do
    [client: client_id, topic: topic, windowsize: windowsize, window: window] = state

    {window, mavg} = incorporate([sample] ++ window, windowsize)

    message = '{"time": #{time}, "value": #{mavg}}'
    :ok = Tortoise.publish(client_id, topic, message, qos: 0)

    state = [client: client_id, topic: topic, windowsize: windowsize, window: window]
    {:noreply, state}
  end

  # helpers

  defp via_tuple(topic) do
    {:via, Registry, {:mavg_registry, {__MODULE__, topic}}}
  end

  defp incorporate(window, windowsize) do
    incorporate(window, windowsize, [], 0, 0)
  end

  defp incorporate([], _windowsize, output, sum, count) do
    {output, sum / count}
  end

  defp incorporate(_window, windowsize, output, sum, count) when count == windowsize do
    {output, sum / count}
  end

  defp incorporate([first | rest], windowsize, output, sum, count) do
    {_time, value} = first
    # inefficient
    incorporate(rest, windowsize, output ++ [first], sum + value, count + 1)
  end
end

Uncomment to test:

# options = {client_id, "dk/sdu/iot/2023/tesT/mavg", 3}
# {:ok, mavg_pid} = GenServer.start_link(MovingAverage, options)
# MovingAverage.consume(mavg_pid, {1, 1})
# MovingAverage.consume(mavg_pid, {2, 3})
# MovingAverage.consume(mavg_pid, {3, 5})
# MovingAverage.consume(mavg_pid, {4, 4})
# MovingAverage.consume(mavg_pid, {5, -6})
# MovingAverage.consume(mavg_pid, {6, 8})

Dispatcher Node

defmodule Dispatcher do
  use Tortoise.Handler

  def start_link(args) do
    GenServer.start(__MODULE__, args)
  end

  # callback functions

  @impl true
  def init(client: client_id) do
    {:ok, [client: client_id]}
  end

  def handle_message(topic, payload, [client: client_id] = state) do
    case payload |> Jason.decode() do
      {:ok, %{"time" => time, "value" => value}} ->
        MovingAverage.consume(client_id, topic, {time, value})

      _ ->
        nil
    end

    {:ok, state}
  end

  def child_spec(opts) do
    %{
      id: __MODULE__,
      start: {__MODULE__, :start_link, [opts]},
      type: :worker,
      restart: :permanent,
      shutdown: 500
    }
  end
end

Start registry (should be supervised, but we’ll skip that for brevity):

{:ok, registry_pid} = Registry.start_link(name: :mavg_registry, keys: :unique)

Setting up the subscription:

client_id = "sdu_iot_mqtt_livebook_mavg#{:os.system_time(:milli_seconds)}"
topic_pattern = prefix <> "/siggen/+/+"

{:ok, pid} =
  Tortoise.Connection.start_link(
    client_id: client_id,
    server: {Tortoise.Transport.Tcp, host: broker, port: 1883},
    handler: {
      Dispatcher,
      [client: client_id]
    },
    subscriptions: [{topic_pattern, 2}]
  )