MQTT
Mix.install([
{:vega_lite, "~> 0.1.5"},
{:kino_vega_lite, "~> 0.1.7"},
{:kino, "~> 0.8.0"},
{:tortoise, "~> 0.9"},
{:jason, "~> 1.2"}
])
Introduction
This notebook implements a simple MQTT producer/consumer pair using the Tortoise module.
Config
kino_broker = Kino.Input.text("MQTT Broker:", default: "broker.hivemq.com")
kino_topic =
Kino.Input.text("MQTT Topic:",
default: "some/test/topic/#{:os.system_time(:milli_seconds)}"
)
{broker, topic} = {
Kino.Input.read(kino_broker),
Kino.Input.read(kino_topic)
}
Generator
Lets define a function:
f = fn x -> :math.sin(x * 2) * :math.cos(x) end
Producer
Server code for the producer:
defmodule Producer do
use GenServer
@sleeptime 1000
def start_link(broker, topic, f) do
GenServer.start(__MODULE__, {broker, topic, f}, name: __MODULE__)
end
# callbacks
@impl true
def init({broker, topic, f}) do
t = :os.system_time(:milli_seconds)
client_id = "mqtt_producer_#{t}"
start_client(broker, client_id)
register_timeout()
state = [t0: t, f: f, client: client_id, topic: topic]
{:ok, state}
end
@impl true
def handle_info(:emit, [t0: t0, f: f, client: client_id, topic: topic] = state) do
t = :os.system_time(:milli_seconds)
tdiff = (t - t0) / 10_000
Tortoise.publish(client_id, topic, ~c"{\"t\": #{tdiff}, \"v\": #{f.(tdiff)}}", qos: 0)
register_timeout()
{:noreply, state}
end
# helpers
defp start_client(broker, client_id) do
Tortoise.Supervisor.start_child(
client_id: client_id,
handler: {Tortoise.Handler.Default, []},
server: {Tortoise.Transport.Tcp, host: broker, port: 1883},
subscriptions: []
)
end
defp register_timeout() do
Process.send_after(self(), :emit, @sleeptime)
end
end
Start producer:
{:ok, producer_pid} = Producer.start_link(broker, topic, f)
Visualization
alias VegaLite, as: Vl
interval = 100
kino =
Vl.new(width: 400, height: 400)
|> Vl.mark(:line)
|> Vl.encode_field(:x, "x", type: :quantitative)
|> Vl.encode_field(:y, "y", type: :quantitative)
|> Kino.VegaLite.new()
|> Kino.render()
nil
Data should appear once the consumer has been started.
Consumer
defmodule Consumer do
use Tortoise.Handler
def start_link(args) do
GenServer.start(__MODULE__, args)
end
# callback functions
@impl true
def init([kino]) do
{:ok, %{kino: kino}}
end
def handle_message(_topic, payload, %{kino: kino} = state) do
case payload |> Jason.decode() do
{:ok, %{"t" => x, "v" => y}} ->
Kino.VegaLite.push(kino, %{x: x, y: y}, window: 100)
_ ->
IO.puts("Rejecting format of payload #{payload}")
end
{:ok, state}
end
def child_spec(opts) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [opts]},
type: :worker,
restart: :permanent,
shutdown: 500
}
end
end
client_id = "mqtt_consumer#{:os.system_time(:milli_seconds)}"
{:ok, pid} =
Tortoise.Connection.start_link(
client_id: client_id,
server: {Tortoise.Transport.Tcp, host: broker, port: 1883},
handler: {
Consumer,
[kino]
},
subscriptions: [{topic, 0}]
)