Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

MQTT

mqtt.livemd

MQTT

Mix.install([
  {:vega_lite, "~> 0.1.5"},
  {:kino_vega_lite, "~> 0.1.7"},
  {:kino, "~> 0.8.0"},
  {:tortoise, "~> 0.9"},
  {:jason, "~> 1.2"}
])

Introduction

This notebook implements a simple MQTT producer/consumer pair using the Tortoise module.

Config

kino_broker = Kino.Input.text("MQTT Broker:", default: "broker.hivemq.com")
kino_topic =
  Kino.Input.text("MQTT Topic:",
    default: "some/test/topic/#{:os.system_time(:milli_seconds)}"
  )
{broker, topic} = {
  Kino.Input.read(kino_broker),
  Kino.Input.read(kino_topic)
}

Generator

Lets define a function:

f = fn x -> :math.sin(x * 2) * :math.cos(x) end

Producer

Server code for the producer:

defmodule Producer do
  use GenServer

  @sleeptime 1000

  def start_link(broker, topic, f) do
    GenServer.start(__MODULE__, {broker, topic, f}, name: __MODULE__)
  end

  # callbacks

  @impl true
  def init({broker, topic, f}) do
    t = :os.system_time(:milli_seconds)
    client_id = "mqtt_producer_#{t}"
    start_client(broker, client_id)
    register_timeout()
    state = [t0: t, f: f, client: client_id, topic: topic]
    {:ok, state}
  end

  @impl true
  def handle_info(:emit, [t0: t0, f: f, client: client_id, topic: topic] = state) do
    t = :os.system_time(:milli_seconds)
    tdiff = (t - t0) / 10_000
    Tortoise.publish(client_id, topic, ~c"{\"t\": #{tdiff}, \"v\": #{f.(tdiff)}}", qos: 0)
    register_timeout()
    {:noreply, state}
  end

  # helpers

  defp start_client(broker, client_id) do
    Tortoise.Supervisor.start_child(
      client_id: client_id,
      handler: {Tortoise.Handler.Default, []},
      server: {Tortoise.Transport.Tcp, host: broker, port: 1883},
      subscriptions: []
    )
  end

  defp register_timeout() do
    Process.send_after(self(), :emit, @sleeptime)
  end
end

Start producer:

{:ok, producer_pid} = Producer.start_link(broker, topic, f)

Visualization

alias VegaLite, as: Vl

interval = 100

kino =
  Vl.new(width: 400, height: 400)
  |> Vl.mark(:line)
  |> Vl.encode_field(:x, "x", type: :quantitative)
  |> Vl.encode_field(:y, "y", type: :quantitative)
  |> Kino.VegaLite.new()
  |> Kino.render()

nil

Data should appear once the consumer has been started.

Consumer

defmodule Consumer do
  use Tortoise.Handler

  def start_link(args) do
    GenServer.start(__MODULE__, args)
  end

  # callback functions

  @impl true
  def init([kino]) do
    {:ok, %{kino: kino}}
  end

  def handle_message(_topic, payload, %{kino: kino} = state) do
    case payload |> Jason.decode() do
      {:ok, %{"t" => x, "v" => y}} ->
        Kino.VegaLite.push(kino, %{x: x, y: y}, window: 100)

      _ ->
        IO.puts("Rejecting format of payload #{payload}")
    end

    {:ok, state}
  end

  def child_spec(opts) do
    %{
      id: __MODULE__,
      start: {__MODULE__, :start_link, [opts]},
      type: :worker,
      restart: :permanent,
      shutdown: 500
    }
  end
end
client_id = "mqtt_consumer#{:os.system_time(:milli_seconds)}"

{:ok, pid} =
  Tortoise.Connection.start_link(
    client_id: client_id,
    server: {Tortoise.Transport.Tcp, host: broker, port: 1883},
    handler: {
      Consumer,
      [kino]
    },
    subscriptions: [{topic, 0}]
  )