Powered by AppSignal & Oban Pro

PhoenixClient sensocto

livebooks/livebook-phoenixclient.livemd

PhoenixClient sensocto

Mix.install([
  {:phoenix_client, "~> 0.11.1"},
  {:phoenix_pubsub, "~> 2.1"},
  {:jason, "~> 1.4"},
  {:uuid, "~> 1.1"},
  {:kino, "~> 0.12.0"},
  {:nimble_csv, "~> 1.1"}
])

Configuration

alias NimbleCSV.RFC4180, as: CSV

{:ok, data} =
  """
  1737604423651,0,51.0
  1737604424651,1.0,54.0
  1737604425651,1.0,58.0
  1737604426651,1.0,61.0
  1737604427651,1.0,65.0
  1737604428651,1.0,66.0
  1737604429651,1.0,66.0
  1737604430651,1.0,70.0
  1737604431651,1.0,72.0
  1737604432651,1.0,77.0
  1737604433651,1.0,83.0
  1737604434651,1.0,86.0
  1737604435651,1.0,88.0
  1737604436651,1.0,89.0
  1737604437651,1.0,90.0
  1737604438651,1.0,92.0
  1737604439651,1.0,90.0
  1737604440651,1.0,91.0
  1737604441651,1.0,96.0
  1737604442651,1.0,98.0
  1737604443651,1.0,101.0
  1737604444651,1.0,101.0
  1737604445651,1.0,102.0
  1737604446651,1.0,102.0
  1737604447651,1.0,101.0
  1737604448651,1.0,98.0
  1737604449651,1.0,97.0
  1737604450651,1.0,97.0
  1737604451651,1.0,97.0
  1737604452651,1.0,98.0
  """
  |> String.trim()
  |> CSV.parse_string()
  #       |>  Enum.drop(1)
  |> Enum.map(fn item ->
    %{
      timestamp: String.to_integer(Enum.at(item, 0)),
      delay: String.to_float(Enum.at(item, 1)),
      payload: String.to_float(Enum.at(item, 2))
    }
  end)
  |> (fn data ->
        {:ok, data}
      end).()
  |> IO.inspect()

data
|> Enum.each(fn %{timestamp: timestamp, delay: delay, payload: payload} ->
  {megasec, sec, microsec} = :os.timestamp()

  message = %{
    "payload" => payload,
    "timestamp" => megasec + sec + microsec,
    "uuid" => "uuid"
  }

  IO.inspect(message)
end)
form =
  Kino.Control.form(
    [
      name: Kino.Input.text("Name"),
      socket:
        Kino.Input.select("socket",
          local: "ws://localhost:4000/socket/websocket",
          test: "wss://sensocto.fly.dev/socket/websocket"
        )
    ],
    submit: "Submit"
  )

Kino.listen(form, fn event ->
  IO.inspect(event)
end)

form

Section

defmodule Sensocto.SensorSimulatorGenServer do
  use GenServer
  require Logger
  alias PhoenixClient.{Socket, Channel, Message}
  alias NimbleCSV.RFC4180, as: CSV

  @socket_opts [
    # url: "ws://localhost:4000/socket/websocket"
    url: "wss://sensocto.fly.dev/socket/websocket"
  ]

  # Max interval in milliseconds for sending messages
  @interval 2000

  # List of UUID attributes to select from
  @uuid_attributes [
    "61d20a90-71a1-11ea-ab12-0800200c9a66",
    "00002a37-0000-1000-8000-00805f9b34fb",
    "feb7cb83-e359-4b57-abc6-628286b7a79b",
    "00002a19-0000-1000-8000-00805f9b34fb"
  ]

  # Public API
  def start_link(%{:sensor_id => sensor_id} = config) do
    GenServer.start_link(__MODULE__, config, name: via_tuple(sensor_id))
  end

  defp via_tuple(sensor_id), do: {:via, Registry, {SensorSimulatorRegistry, sensor_id}}

  # GenServer Callbacks
  @impl true
  def init(%{:sensor_id => sensor_id} = config) do
    case PhoenixClient.Socket.start_link(@socket_opts) do
      {:ok, socket} ->
        wait_until_connected(socket)

        uuid = Enum.random(@uuid_attributes)
        topic = "sensor_data:" <> sensor_id

        IO.puts("Connecting ... #{topic}")

        join_meta = %{
          device_name: config[:device_name],
          batch_size: 1,
          connector_id: config[:connector_id],
          connector_name: config[:connector_name],
          sampling_rate: config[:sampling_rate],
          sensor_id: config[:sensor_id],
          sensor_name: config[:sensor_name],
          sensor_type: config[:sensor_type],
          bearer_token: "fake"
        }

        case PhoenixClient.Channel.join(socket, topic, join_meta) do
          {:ok, _response, channel} ->
            IO.puts("Joined channel successfully for sensor #{sensor_id}")
            # Schedule the first message
            schedule_send_message(sensor_id, channel, uuid, config)
            {:ok, config}

          {:error, reason} ->
            IO.puts("Failed to join channel: #{inspect(reason)}")
            {:stop, reason}
        end

      {:error, reason} ->
        IO.puts("Failed to connect to socket: #{inspect(reason)}")
        {:stop, reason}
    end

    {:ok, config}
  end

  @impl true
  def handle_info({:send_message, sensor_id, channel, uuid, config}, state) do
    case fetch_sensor_data(sensor_id, config) do
      {:ok, data} ->
        new_state = Map.update(state, :queue, data, &amp; &amp;1)
        IO.inspect(new_state)
        schedule_process_queue(sensor_id, channel, uuid)
        {:noreply, new_state}

      _ ->
        schedule_send_message(sensor_id, channel, uuid, config)
        {:noreply, state}
    end
  end

  @impl true
  def handle_info({:process_queue, sensor_id, channel, uuid}, state) do
    process_queue(state, channel, uuid)
  end

  @impl true
  def handle_info({:push_message, channel, message}, state) do
    if state.config[:phoenix_channel] do
      PhoenixClient.Channel.push_async(channel, "measurement", message)
      {:noreply, state}
    else
      {:noreply, state}
    end
  end

  def handle_info(%Message{event: _message, payload: _payload}, state) do
    # IO.puts("Incoming Message: #{message} #{inspect(payload)}")
    {:noreply, state}
  end

  # A helper function to interact with the GenServer
  def get_data(sensor_id) do
    GenServer.call(via_tuple(sensor_id), :get_data)
  end

  # Private Functions
  defp process_queue(state, channel, uuid) do
    case state.queue do
      [] ->
        :noop

      [head | tail] ->
        %{timestamp: timestamp, delay: delay, value: value} = head

        message = %{
          "payload" => value,
          "timestamp" => timestamp,
          "uuid" => uuid
        }

        if state[:phoenix_channel] do
          Process.send_after(self(), {:push_message, channel, message}, delay * 1000)
        end

        new_state = Map.update(state, :queue, tail, &amp; &amp;1)

        {delay_s, _} = Float.parse("#{delay}")

        Process.send_after(
          self(),
          {:process_queue, state.sensor_id, channel, uuid},
          round(delay_s) * 1000
        )

        {:noreply, new_state}
    end
  end

  defp wait_until_connected(socket) do
    unless PhoenixClient.Socket.connected?(socket) do
      Process.sleep(100)
      wait_until_connected(socket)
    end
  end

  defp schedule_send_message(sensor_id, channel, uuid, config) do
    Process.send_after(
      self(),
      {:send_message, sensor_id, channel, uuid, config},
      :rand.uniform(@interval)
    )
  end

  def schedule_process_queue(sensor_id, channel, uuid) do
    Process.send_after(
      self(),
      {:process_queue, sensor_id, channel, uuid},
      10
    )
  end

  def fetch_sensor_data_(sensor_id, config) do
    alias NimbleCSV.RFC4180, as: CSV

    {:ok, data} =
      """
      1737604423651,0,51.0
      1737604424651,1.0,54.0
      1737604425651,1.0,58.0
      1737604426651,1.0,61.0
      1737604427651,1.0,65.0
      1737604428651,1.0,66.0
      1737604429651,1.0,66.0
      1737604430651,1.0,70.0
      1737604431651,1.0,72.0
      1737604432651,1.0,77.0
      1737604433651,1.0,83.0
      1737604434651,1.0,86.0
      1737604435651,1.0,88.0
      1737604436651,1.0,89.0
      1737604437651,1.0,90.0
      1737604438651,1.0,92.0
      1737604439651,1.0,90.0
      1737604440651,1.0,91.0
      1737604441651,1.0,96.0
      1737604442651,1.0,98.0
      1737604443651,1.0,101.0
      1737604444651,1.0,101.0
      1737604445651,1.0,102.0
      1737604446651,1.0,102.0
      1737604447651,1.0,101.0
      1737604448651,1.0,98.0
      1737604449651,1.0,97.0
      1737604450651,1.0,97.0
      1737604451651,1.0,97.0
      1737604452651,1.0,98.0
      """
      |> String.trim()
      |> CSV.parse_string()
      #       |>  Enum.drop(1)
      |> Enum.map(fn item ->
        %{
          timestamp: String.to_integer(Enum.at(item, 0)),
          delay: String.to_float(Enum.at(item, 1)),
          payload: String.to_float(Enum.at(item, 2))
        }
      end)
      |> (fn data ->
            {:ok, data}
          end).()
      |> IO.inspect()

    data
    |> Enum.each(fn %{timestamp: timestamp, delay: delay, payload: payload} ->
      {megasec, sec, microsec} = :os.timestamp()

      message = %{
        "payload" => payload,
        "timestamp" => megasec + sec + microsec,
        "uuid" => "uuid"
      }

      IO.inspect(message)
    end)
  end

  def fetch_sensor_data_(sensor_id, config) do
    duration = config[:duration]
    sampling_rate = config[:sampling_rate]
    heart_rate = config[:heart_rate]
    respiratory_rate = config[:respiratory_rate]
    scr_number = config[:scr_number]
    burst_number = config[:burst_number]
    sensor_type = config[:sensor_type]

    try do
      System.cmd("python3", [
        "../sensocto-simulator.py",
        "--mode",
        "csv",
        "--sensor_id",
        sensor_id,
        "--sensor_type",
        sensor_type,
        "--duration",
        "#{duration}",
        "--sampling_rate",
        "#{sampling_rate}",
        "--heart_rate",
        "#{heart_rate}",
        "--respiratory_rate",
        "#{respiratory_rate}",
        "--scr_number",
        "#{scr_number}",
        "--burst_number",
        "#{burst_number}"
      ])
      |> (fn
            {output, 0} ->
              output
              |> String.trim()
              |> CSV.parse_string()
              |> Enum.drop(1)
              |> Enum.map(fn item ->
                %{
                  timestamp: String.to_integer(Enum.at(item, 0)),
                  delay: String.to_float(Enum.at(item, 1)),
                  value: String.to_float(Enum.at(item, 2))
                }
              end)
              |> (fn data ->
                    {:ok, data}
                  end).()

            {output, status} ->
              IO.puts("Error executing python script")
              IO.inspect(output)
              IO.inspect(status)
              :error
          end).()
    rescue
      e ->
        IO.puts("Error executing python script")
        IO.inspect(e)
        :error
    end
  end

  defp generate_random_sensor_id do
    # :crypto.strong_rand_bytes(8)
    # |> Base.encode64()
    # Shorten for simplicity
    # |> binary_part(0, 8)

    uuid_fragment = Enum.take(String.split(UUID.uuid1(), "-"), 1) |> List.last()
    "Sim:" <> uuid_fragment
  end
end
defmodule SensorSimulatorSupervisor do
  use DynamicSupervisor

  # Start the DynamicSupervisor
  def start_link(_) do
    DynamicSupervisor.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  @impl true
  def init(:ok) do
    DynamicSupervisor.init(strategy: :one_for_one)
  end

  # Function to start a new SensorDataGenServer dynamically
  def start_sensor(%{:sensor_id => sensor_id} = config) do
    child_spec = %{
      id: sensor_id,
      start: {Sensocto.SensorSimulatorGenServer, :start_link, [config]},
      shutdown: 5_000,
      restart: :permanent,
      type: :worker
    }

    IO.inspect(child_spec)

    # spec = {Sensocto.SensorSimulatorGenServer, config}
    DynamicSupervisor.start_child(__MODULE__, child_spec)
  end

  # Stop a sensor using its sensor_id
  def stop_sensor(sensor_id) do
    case Registry.lookup(SensorRegistry, sensor_id) do
      [{pid, _value}] ->
        DynamicSupervisor.terminate_child(__MODULE__, pid)

      [] ->
        {:error, :not_found}
    end
  end

  def get_children() do
    DynamicSupervisor.which_children(__MODULE__)
  end

  # Nice utility method to check which processes are under supervision
  def count_children() do
    DynamicSupervisor.count_children(__MODULE__)
  end
end
defmodule Sensocto.Simulator.Application do
  # use Application

  @configs [
    %{
      device_name: "Device1",
      batch_size: 1,
      connector_id: "1111111",
      connector_name: "SensoctoSim",
      sampling_rate: 10,
      sensor_id: "Device1:heartrate",
      sensor_name: "Device1:heartrate",
      sensor_type: "heartrate",
      duration: 10,
      sampling_rate: 1,
      heart_rate: 60,
      respiratory_rate: 15,
      scr_number: 5,
      burst_number: 5,
      sensor_type: "heartrate"
    },
    %{
      device_name: "Device2",
      batch_size: 1,
      connector_id: "22222",
      connector_name: "SensoctoSim",
      sampling_rate: 10,
      sensor_id: "Device1:heartrate",
      sensor_name: "Device1:heartrate",
      sensor_type: "heartrate",
      duration: 10,
      sampling_rate: 1,
      heart_rate: 150,
      respiratory_rate: 30,
      scr_number: 5,
      burst_number: 5,
      sensor_type: "heartrate"
    }
  ]

  def start(_type, _args) do
    IO.puts("Start simulator")

    children = [
      SensorSimulatorSupervisor,
      {Registry, keys: :unique, name: SensorSimulatorRegistry}
    ]

    # See https://hexdocs.pm/elixir/Supervisor.html
    # for other strategies and supported options
    opts = [strategy: :one_for_one, name: Sensocto.Supervisor]
    Supervisor.start_link(children, opts)
  end

  def ensure_running_count(keep_running, ramp_up_delay, ramp_down_delay) do
    processes_running = Enum.count(SensorSimulatorSupervisor.get_children())
    IO.inspect(SensorSimulatorSupervisor.get_children())

    IO.puts("keep: #{keep_running}, running: #{processes_running}")

    if processes_running < keep_running do
      IO.puts("start servers")

      sensor_numbers = 1..keep_running

      Enum.each(sensor_numbers, fn number ->
        # Convert the number to a sensor name string (e.g., "sensor1", "sensor2", ...)

        config = Enum.random(@configs)
        sensor_name = config[~c"sensor_name"]

        # Start the sensor by calling start_sensor on the SensorSupervisor
        case SensorSimulatorSupervisor.start_sensor(sensor_name, [config]) do
          {:ok, pid} -> IO.puts("started #{sensor_name}")
          {:error, _} -> IO.puts("failed to start #{sensor_name}")
        end

        Process.sleep(:rand.uniform(ramp_up_delay))
      end)

      SensorSimulatorSupervisor.get_children()
    else
      IO.puts("keep or stop servers")

      Enum.take(SensorSimulatorSupervisor.get_children(), processes_running - keep_running)
      |> Enum.each(fn {_, pid, _, _type} ->
        IO.inspect(pid)
        DynamicSupervisor.terminate_child(SensorSimulatorSupervisor, pid)
        Process.sleep(:rand.uniform(ramp_down_delay))
      end)

      SensorSimulatorSupervisor.get_children()
    end
  end
end

case Sensocto.Simulator.Application.start(1, 1) do
  {:ok, pid} -> IO.puts("App started")
  {:error, {:already_started, pid}} -> IO.puts("App already started")
end

config = %{
  device_name: "Device1",
  batch_size: 1,
  connector_id: "1111111",
  connector_name: "SensoctoSim",
  sampling_rate: 10,
  sensor_id: "Device1:heartrate",
  sensor_name: "Device1:heartrate",
  sensor_type: "heartrate",
  duration: 10,
  sampling_rate: 1,
  heart_rate: 60,
  respiratory_rate: 15,
  scr_number: 5,
  burst_number: 5,
  sensor_type: "heartrate"
}

# Sensocto.Simulator.Application.ensure_running_count(1, 100, 100)
SensorSimulatorSupervisor.start_sensor(config)
SensorSimulatorSupervisor.get_children()
defmodule Test do
  def sensor_name_for_uuid(uuid) do
    case uuid do
      "61d20a90-71a1-11ea-ab12-0800200c9a66" -> "Pressure"
      "00002a37-0000-1000-8000-00805f9b34fb" -> "Heart Rate"
      "feb7cb83-e359-4b57-abc6-628286b7a79b" -> "Flexsense"
      "00002a19-0000-1000-8000-00805f9b34fb" -> "Battery"
      # Default for unknown UUIDs
      _ -> "Unknown Sensor"
    end
  end
end

"Test" <> Test.sensor_name_for_uuid("61d20a90-71a1-11ea-ab12-0800200c9a66")
require Kino.RPC
node = :simulator@localhost
Node.set_cookie(node, String.to_atom(System.fetch_env!("LB_ERL_COOKIE")))

Kino.RPC.eval_string(
  node,
  ~S"""
  Kino.Process.seq_trace(fn ->
    {:ok, agent_pid} = Agent.start_link(fn -> [] end)
    Process.monitor(agent_pid)

    1..2
    |> Task.async_stream(
      fn value ->
        Agent.get(agent_pid, fn value -> value end)
        100 * value
      end,
      max_concurrency: 3
    )
    |> Stream.run()

    Agent.stop(agent_pid)
  end)

  parent = self()
  Process.send_after(:biosense_data_server_2, {:get_dawaaaa, parent, %{}}, 0)

  #alias Sensocto.DeviceSupervisor
  #DeviceSupervisor.get_device_names()


  """,
  file: __ENV__.file
)
uuid_fragment = Enum.take(String.split(UUID.uuid1(), "-"), 1) |> List.last()
"Sim " <> uuid_fragment