PhoenixClient sensocto
Mix.install([
{:phoenix_client, "~> 0.11.1"},
{:phoenix_pubsub, "~> 2.1"},
{:jason, "~> 1.4"},
{:uuid, "~> 1.1"},
{:kino, "~> 0.12.0"},
{:nimble_csv, "~> 1.1"}
])
Configuration
alias NimbleCSV.RFC4180, as: CSV
{:ok, data} =
"""
1737604423651,0,51.0
1737604424651,1.0,54.0
1737604425651,1.0,58.0
1737604426651,1.0,61.0
1737604427651,1.0,65.0
1737604428651,1.0,66.0
1737604429651,1.0,66.0
1737604430651,1.0,70.0
1737604431651,1.0,72.0
1737604432651,1.0,77.0
1737604433651,1.0,83.0
1737604434651,1.0,86.0
1737604435651,1.0,88.0
1737604436651,1.0,89.0
1737604437651,1.0,90.0
1737604438651,1.0,92.0
1737604439651,1.0,90.0
1737604440651,1.0,91.0
1737604441651,1.0,96.0
1737604442651,1.0,98.0
1737604443651,1.0,101.0
1737604444651,1.0,101.0
1737604445651,1.0,102.0
1737604446651,1.0,102.0
1737604447651,1.0,101.0
1737604448651,1.0,98.0
1737604449651,1.0,97.0
1737604450651,1.0,97.0
1737604451651,1.0,97.0
1737604452651,1.0,98.0
"""
|> String.trim()
|> CSV.parse_string()
# |> Enum.drop(1)
|> Enum.map(fn item ->
%{
timestamp: String.to_integer(Enum.at(item, 0)),
delay: String.to_float(Enum.at(item, 1)),
payload: String.to_float(Enum.at(item, 2))
}
end)
|> (fn data ->
{:ok, data}
end).()
|> IO.inspect()
data
|> Enum.each(fn %{timestamp: timestamp, delay: delay, payload: payload} ->
{megasec, sec, microsec} = :os.timestamp()
message = %{
"payload" => payload,
"timestamp" => megasec + sec + microsec,
"uuid" => "uuid"
}
IO.inspect(message)
end)
form =
Kino.Control.form(
[
name: Kino.Input.text("Name"),
socket:
Kino.Input.select("socket",
local: "ws://localhost:4000/socket/websocket",
test: "wss://sensocto.fly.dev/socket/websocket"
)
],
submit: "Submit"
)
Kino.listen(form, fn event ->
IO.inspect(event)
end)
form
Section
defmodule Sensocto.SensorSimulatorGenServer do
use GenServer
require Logger
alias PhoenixClient.{Socket, Channel, Message}
alias NimbleCSV.RFC4180, as: CSV
@socket_opts [
# url: "ws://localhost:4000/socket/websocket"
url: "wss://sensocto.fly.dev/socket/websocket"
]
# Max interval in milliseconds for sending messages
@interval 2000
# List of UUID attributes to select from
@uuid_attributes [
"61d20a90-71a1-11ea-ab12-0800200c9a66",
"00002a37-0000-1000-8000-00805f9b34fb",
"feb7cb83-e359-4b57-abc6-628286b7a79b",
"00002a19-0000-1000-8000-00805f9b34fb"
]
# Public API
def start_link(%{:sensor_id => sensor_id} = config) do
GenServer.start_link(__MODULE__, config, name: via_tuple(sensor_id))
end
defp via_tuple(sensor_id), do: {:via, Registry, {SensorSimulatorRegistry, sensor_id}}
# GenServer Callbacks
@impl true
def init(%{:sensor_id => sensor_id} = config) do
case PhoenixClient.Socket.start_link(@socket_opts) do
{:ok, socket} ->
wait_until_connected(socket)
uuid = Enum.random(@uuid_attributes)
topic = "sensor_data:" <> sensor_id
IO.puts("Connecting ... #{topic}")
join_meta = %{
device_name: config[:device_name],
batch_size: 1,
connector_id: config[:connector_id],
connector_name: config[:connector_name],
sampling_rate: config[:sampling_rate],
sensor_id: config[:sensor_id],
sensor_name: config[:sensor_name],
sensor_type: config[:sensor_type],
bearer_token: "fake"
}
case PhoenixClient.Channel.join(socket, topic, join_meta) do
{:ok, _response, channel} ->
IO.puts("Joined channel successfully for sensor #{sensor_id}")
# Schedule the first message
schedule_send_message(sensor_id, channel, uuid, config)
{:ok, config}
{:error, reason} ->
IO.puts("Failed to join channel: #{inspect(reason)}")
{:stop, reason}
end
{:error, reason} ->
IO.puts("Failed to connect to socket: #{inspect(reason)}")
{:stop, reason}
end
{:ok, config}
end
@impl true
def handle_info({:send_message, sensor_id, channel, uuid, config}, state) do
case fetch_sensor_data(sensor_id, config) do
{:ok, data} ->
new_state = Map.update(state, :queue, data, & &1)
IO.inspect(new_state)
schedule_process_queue(sensor_id, channel, uuid)
{:noreply, new_state}
_ ->
schedule_send_message(sensor_id, channel, uuid, config)
{:noreply, state}
end
end
@impl true
def handle_info({:process_queue, sensor_id, channel, uuid}, state) do
process_queue(state, channel, uuid)
end
@impl true
def handle_info({:push_message, channel, message}, state) do
if state.config[:phoenix_channel] do
PhoenixClient.Channel.push_async(channel, "measurement", message)
{:noreply, state}
else
{:noreply, state}
end
end
def handle_info(%Message{event: _message, payload: _payload}, state) do
# IO.puts("Incoming Message: #{message} #{inspect(payload)}")
{:noreply, state}
end
# A helper function to interact with the GenServer
def get_data(sensor_id) do
GenServer.call(via_tuple(sensor_id), :get_data)
end
# Private Functions
defp process_queue(state, channel, uuid) do
case state.queue do
[] ->
:noop
[head | tail] ->
%{timestamp: timestamp, delay: delay, value: value} = head
message = %{
"payload" => value,
"timestamp" => timestamp,
"uuid" => uuid
}
if state[:phoenix_channel] do
Process.send_after(self(), {:push_message, channel, message}, delay * 1000)
end
new_state = Map.update(state, :queue, tail, & &1)
{delay_s, _} = Float.parse("#{delay}")
Process.send_after(
self(),
{:process_queue, state.sensor_id, channel, uuid},
round(delay_s) * 1000
)
{:noreply, new_state}
end
end
defp wait_until_connected(socket) do
unless PhoenixClient.Socket.connected?(socket) do
Process.sleep(100)
wait_until_connected(socket)
end
end
defp schedule_send_message(sensor_id, channel, uuid, config) do
Process.send_after(
self(),
{:send_message, sensor_id, channel, uuid, config},
:rand.uniform(@interval)
)
end
def schedule_process_queue(sensor_id, channel, uuid) do
Process.send_after(
self(),
{:process_queue, sensor_id, channel, uuid},
10
)
end
def fetch_sensor_data_(sensor_id, config) do
alias NimbleCSV.RFC4180, as: CSV
{:ok, data} =
"""
1737604423651,0,51.0
1737604424651,1.0,54.0
1737604425651,1.0,58.0
1737604426651,1.0,61.0
1737604427651,1.0,65.0
1737604428651,1.0,66.0
1737604429651,1.0,66.0
1737604430651,1.0,70.0
1737604431651,1.0,72.0
1737604432651,1.0,77.0
1737604433651,1.0,83.0
1737604434651,1.0,86.0
1737604435651,1.0,88.0
1737604436651,1.0,89.0
1737604437651,1.0,90.0
1737604438651,1.0,92.0
1737604439651,1.0,90.0
1737604440651,1.0,91.0
1737604441651,1.0,96.0
1737604442651,1.0,98.0
1737604443651,1.0,101.0
1737604444651,1.0,101.0
1737604445651,1.0,102.0
1737604446651,1.0,102.0
1737604447651,1.0,101.0
1737604448651,1.0,98.0
1737604449651,1.0,97.0
1737604450651,1.0,97.0
1737604451651,1.0,97.0
1737604452651,1.0,98.0
"""
|> String.trim()
|> CSV.parse_string()
# |> Enum.drop(1)
|> Enum.map(fn item ->
%{
timestamp: String.to_integer(Enum.at(item, 0)),
delay: String.to_float(Enum.at(item, 1)),
payload: String.to_float(Enum.at(item, 2))
}
end)
|> (fn data ->
{:ok, data}
end).()
|> IO.inspect()
data
|> Enum.each(fn %{timestamp: timestamp, delay: delay, payload: payload} ->
{megasec, sec, microsec} = :os.timestamp()
message = %{
"payload" => payload,
"timestamp" => megasec + sec + microsec,
"uuid" => "uuid"
}
IO.inspect(message)
end)
end
def fetch_sensor_data_(sensor_id, config) do
duration = config[:duration]
sampling_rate = config[:sampling_rate]
heart_rate = config[:heart_rate]
respiratory_rate = config[:respiratory_rate]
scr_number = config[:scr_number]
burst_number = config[:burst_number]
sensor_type = config[:sensor_type]
try do
System.cmd("python3", [
"../sensocto-simulator.py",
"--mode",
"csv",
"--sensor_id",
sensor_id,
"--sensor_type",
sensor_type,
"--duration",
"#{duration}",
"--sampling_rate",
"#{sampling_rate}",
"--heart_rate",
"#{heart_rate}",
"--respiratory_rate",
"#{respiratory_rate}",
"--scr_number",
"#{scr_number}",
"--burst_number",
"#{burst_number}"
])
|> (fn
{output, 0} ->
output
|> String.trim()
|> CSV.parse_string()
|> Enum.drop(1)
|> Enum.map(fn item ->
%{
timestamp: String.to_integer(Enum.at(item, 0)),
delay: String.to_float(Enum.at(item, 1)),
value: String.to_float(Enum.at(item, 2))
}
end)
|> (fn data ->
{:ok, data}
end).()
{output, status} ->
IO.puts("Error executing python script")
IO.inspect(output)
IO.inspect(status)
:error
end).()
rescue
e ->
IO.puts("Error executing python script")
IO.inspect(e)
:error
end
end
defp generate_random_sensor_id do
# :crypto.strong_rand_bytes(8)
# |> Base.encode64()
# Shorten for simplicity
# |> binary_part(0, 8)
uuid_fragment = Enum.take(String.split(UUID.uuid1(), "-"), 1) |> List.last()
"Sim:" <> uuid_fragment
end
end
defmodule SensorSimulatorSupervisor do
use DynamicSupervisor
# Start the DynamicSupervisor
def start_link(_) do
DynamicSupervisor.start_link(__MODULE__, :ok, name: __MODULE__)
end
@impl true
def init(:ok) do
DynamicSupervisor.init(strategy: :one_for_one)
end
# Function to start a new SensorDataGenServer dynamically
def start_sensor(%{:sensor_id => sensor_id} = config) do
child_spec = %{
id: sensor_id,
start: {Sensocto.SensorSimulatorGenServer, :start_link, [config]},
shutdown: 5_000,
restart: :permanent,
type: :worker
}
IO.inspect(child_spec)
# spec = {Sensocto.SensorSimulatorGenServer, config}
DynamicSupervisor.start_child(__MODULE__, child_spec)
end
# Stop a sensor using its sensor_id
def stop_sensor(sensor_id) do
case Registry.lookup(SensorRegistry, sensor_id) do
[{pid, _value}] ->
DynamicSupervisor.terminate_child(__MODULE__, pid)
[] ->
{:error, :not_found}
end
end
def get_children() do
DynamicSupervisor.which_children(__MODULE__)
end
# Nice utility method to check which processes are under supervision
def count_children() do
DynamicSupervisor.count_children(__MODULE__)
end
end
defmodule Sensocto.Simulator.Application do
# use Application
@configs [
%{
device_name: "Device1",
batch_size: 1,
connector_id: "1111111",
connector_name: "SensoctoSim",
sampling_rate: 10,
sensor_id: "Device1:heartrate",
sensor_name: "Device1:heartrate",
sensor_type: "heartrate",
duration: 10,
sampling_rate: 1,
heart_rate: 60,
respiratory_rate: 15,
scr_number: 5,
burst_number: 5,
sensor_type: "heartrate"
},
%{
device_name: "Device2",
batch_size: 1,
connector_id: "22222",
connector_name: "SensoctoSim",
sampling_rate: 10,
sensor_id: "Device1:heartrate",
sensor_name: "Device1:heartrate",
sensor_type: "heartrate",
duration: 10,
sampling_rate: 1,
heart_rate: 150,
respiratory_rate: 30,
scr_number: 5,
burst_number: 5,
sensor_type: "heartrate"
}
]
def start(_type, _args) do
IO.puts("Start simulator")
children = [
SensorSimulatorSupervisor,
{Registry, keys: :unique, name: SensorSimulatorRegistry}
]
# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: Sensocto.Supervisor]
Supervisor.start_link(children, opts)
end
def ensure_running_count(keep_running, ramp_up_delay, ramp_down_delay) do
processes_running = Enum.count(SensorSimulatorSupervisor.get_children())
IO.inspect(SensorSimulatorSupervisor.get_children())
IO.puts("keep: #{keep_running}, running: #{processes_running}")
if processes_running < keep_running do
IO.puts("start servers")
sensor_numbers = 1..keep_running
Enum.each(sensor_numbers, fn number ->
# Convert the number to a sensor name string (e.g., "sensor1", "sensor2", ...)
config = Enum.random(@configs)
sensor_name = config[~c"sensor_name"]
# Start the sensor by calling start_sensor on the SensorSupervisor
case SensorSimulatorSupervisor.start_sensor(sensor_name, [config]) do
{:ok, pid} -> IO.puts("started #{sensor_name}")
{:error, _} -> IO.puts("failed to start #{sensor_name}")
end
Process.sleep(:rand.uniform(ramp_up_delay))
end)
SensorSimulatorSupervisor.get_children()
else
IO.puts("keep or stop servers")
Enum.take(SensorSimulatorSupervisor.get_children(), processes_running - keep_running)
|> Enum.each(fn {_, pid, _, _type} ->
IO.inspect(pid)
DynamicSupervisor.terminate_child(SensorSimulatorSupervisor, pid)
Process.sleep(:rand.uniform(ramp_down_delay))
end)
SensorSimulatorSupervisor.get_children()
end
end
end
case Sensocto.Simulator.Application.start(1, 1) do
{:ok, pid} -> IO.puts("App started")
{:error, {:already_started, pid}} -> IO.puts("App already started")
end
config = %{
device_name: "Device1",
batch_size: 1,
connector_id: "1111111",
connector_name: "SensoctoSim",
sampling_rate: 10,
sensor_id: "Device1:heartrate",
sensor_name: "Device1:heartrate",
sensor_type: "heartrate",
duration: 10,
sampling_rate: 1,
heart_rate: 60,
respiratory_rate: 15,
scr_number: 5,
burst_number: 5,
sensor_type: "heartrate"
}
# Sensocto.Simulator.Application.ensure_running_count(1, 100, 100)
SensorSimulatorSupervisor.start_sensor(config)
SensorSimulatorSupervisor.get_children()
defmodule Test do
def sensor_name_for_uuid(uuid) do
case uuid do
"61d20a90-71a1-11ea-ab12-0800200c9a66" -> "Pressure"
"00002a37-0000-1000-8000-00805f9b34fb" -> "Heart Rate"
"feb7cb83-e359-4b57-abc6-628286b7a79b" -> "Flexsense"
"00002a19-0000-1000-8000-00805f9b34fb" -> "Battery"
# Default for unknown UUIDs
_ -> "Unknown Sensor"
end
end
end
"Test" <> Test.sensor_name_for_uuid("61d20a90-71a1-11ea-ab12-0800200c9a66")
require Kino.RPC
node = :simulator@localhost
Node.set_cookie(node, String.to_atom(System.fetch_env!("LB_ERL_COOKIE")))
Kino.RPC.eval_string(
node,
~S"""
Kino.Process.seq_trace(fn ->
{:ok, agent_pid} = Agent.start_link(fn -> [] end)
Process.monitor(agent_pid)
1..2
|> Task.async_stream(
fn value ->
Agent.get(agent_pid, fn value -> value end)
100 * value
end,
max_concurrency: 3
)
|> Stream.run()
Agent.stop(agent_pid)
end)
parent = self()
Process.send_after(:biosense_data_server_2, {:get_dawaaaa, parent, %{}}, 0)
#alias Sensocto.DeviceSupervisor
#DeviceSupervisor.get_device_names()
""",
file: __ENV__.file
)
uuid_fragment = Enum.take(String.split(UUID.uuid1(), "-"), 1) |> List.last()
"Sim " <> uuid_fragment