MQTT Moving Average
Mix.install([
{:kino, "~> 0.8.0"},
{:tortoise, "~> 0.9"},
{:jason, "~> 1.2"}
])
Configuration
Which broker and prefix to work on:
kino_broker = Kino.Input.text("MQTT Broker:", default: "broker.hivemq.com")
# using public mqtt broker
kino_prefix = Kino.Input.text("MQTT Prefix:", default: "dk/sdu/iot/2023/anton")
# subscribe to dk/sdu/iot/2023/anton/#
Kino.Layout.grid([kino_broker, kino_prefix])
{broker, prefix} = {
Kino.Input.read(kino_broker),
Kino.Input.read(kino_prefix)
}
Moving Average Node
First we start an MQTT client:
t = :os.system_time(:milli_seconds)
client_id = "sdu_iot_mqtt_livebook_mavg_#{t}"
Tortoise.Supervisor.start_child(
client_id: client_id,
handler: {Tortoise.Handler.Default, []},
server: {Tortoise.Transport.Tcp, host: broker, port: 1883},
subscriptions: []
)
Then, we define the node:
defmodule MovingAverage do
use GenServer
@windowsize 3
# interface
def start_link({client_id, topic, windowsize}) do
otopic = Enum.join(topic, "/") |> String.replace("siggen", "mavg")
state = [client: client_id, topic: otopic, windowsize: windowsize]
GenServer.start(__MODULE__, state, name: via_tuple(topic))
end
def consume(pid, {_time, _value} = sample) when is_pid(pid) do
GenServer.cast(pid, {:consume, sample})
end
def consume(client_id, topic, {_time, _value} = sample) do
pid =
case Registry.lookup(:mavg_registry, {__MODULE__, topic}) do
[] ->
options = {client_id, topic, @windowsize}
{:ok, pid} = MovingAverage.start_link(options)
pid
[{pid, _}] ->
pid
end
GenServer.cast(pid, {:consume, sample})
end
# callbacks
@impl true
def init(client: client_id, topic: topic, windowsize: windowsize) do
state = [client: client_id, topic: topic, windowsize: windowsize, window: []]
{:ok, state}
end
@impl true
def handle_cast({:consume, {time, _value} = sample}, state) do
[client: client_id, topic: topic, windowsize: windowsize, window: window] = state
{window, mavg} = incorporate([sample] ++ window, windowsize)
message = '{"time": #{time}, "value": #{mavg}}'
:ok = Tortoise.publish(client_id, topic, message, qos: 0)
state = [client: client_id, topic: topic, windowsize: windowsize, window: window]
{:noreply, state}
end
# helpers
defp via_tuple(topic) do
{:via, Registry, {:mavg_registry, {__MODULE__, topic}}}
end
defp incorporate(window, windowsize) do
incorporate(window, windowsize, [], 0, 0)
end
defp incorporate([], _windowsize, output, sum, count) do
{output, sum / count}
end
defp incorporate(_window, windowsize, output, sum, count) when count == windowsize do
{output, sum / count}
end
defp incorporate([first | rest], windowsize, output, sum, count) do
{_time, value} = first
# inefficient
incorporate(rest, windowsize, output ++ [first], sum + value, count + 1)
end
end
Uncomment to test:
# options = {client_id, "dk/sdu/iot/2023/tesT/mavg", 3}
# {:ok, mavg_pid} = GenServer.start_link(MovingAverage, options)
# MovingAverage.consume(mavg_pid, {1, 1})
# MovingAverage.consume(mavg_pid, {2, 3})
# MovingAverage.consume(mavg_pid, {3, 5})
# MovingAverage.consume(mavg_pid, {4, 4})
# MovingAverage.consume(mavg_pid, {5, -6})
# MovingAverage.consume(mavg_pid, {6, 8})
Dispatcher Node
defmodule Dispatcher do
use Tortoise.Handler
def start_link(args) do
GenServer.start(__MODULE__, args)
end
# callback functions
@impl true
def init(client: client_id) do
{:ok, [client: client_id]}
end
def handle_message(topic, payload, [client: client_id] = state) do
case payload |> Jason.decode() do
{:ok, %{"time" => time, "value" => value}} ->
MovingAverage.consume(client_id, topic, {time, value})
_ ->
nil
end
{:ok, state}
end
def child_spec(opts) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [opts]},
type: :worker,
restart: :permanent,
shutdown: 500
}
end
end
Start registry (should be supervised, but we’ll skip that for brevity):
{:ok, registry_pid} = Registry.start_link(name: :mavg_registry, keys: :unique)
Setting up the subscription:
client_id = "sdu_iot_mqtt_livebook_mavg#{:os.system_time(:milli_seconds)}"
topic_pattern = prefix <> "/siggen/+/+"
{:ok, pid} =
Tortoise.Connection.start_link(
client_id: client_id,
server: {Tortoise.Transport.Tcp, host: broker, port: 1883},
handler: {
Dispatcher,
[client: client_id]
},
subscriptions: [{topic_pattern, 2}]
)