Pirates by the USB bay
Mix.install([
{:circuits_uart, "~> 1.5"},
{:nimble_parsec, "~> 1.4"}
])
Root
Another on1
# nodes = ["ora@127.0.0.1"]
# Enum.each(nodes, fn node ->
# Node.connect(String.to_atom(node))
# end)
defmodule Pirate.Cluster do
def connect_to_node(node_name, cookie) do
# Convert to atom if string was provided
node_name = if is_binary(node_name), do: String.to_atom(node_name), else: node_name
cookie = if is_binary(cookie), do: String.to_atom(cookie), else: cookie
# Set the cookie for this specific node
Node.set_cookie(node_name, cookie)
# Attempt connection
case Node.connect(node_name) do
true -> {:ok, node_name}
false -> {:error, :connection_failed}
end
end
def send_to_topic_on_node(node_name, topic, message) do
# Convert to atom if string was provided
node_name = if is_binary(node_name), do: String.to_atom(node_name), else: node_name
# Use RPC to execute the broadcast on the remote node
:erpc.call(
node_name,
Phoenix.PubSub,
:broadcast,
[Dojo.PubSub, topic, message]
)
end
end
Pirate.Cluster.connect_to_node("ora@127.0.0.1", "enterthedojo")
# Enum.reduce(0..40 ,0,fn x, acc ->
# :timer.sleep(500)
# if rem(acc,5) == 0 do
# Pirate.Cluster.send_to_topic_on_node("ora@127.0.0.1",
# "class:shell:pirate",
# {Dojo.Controls, "rt", [acc]})
# else
# Pirate.Cluster.send_to_topic_on_node("ora@127.0.0.1",
# "class:shell:pirate",
# {Dojo.Controls, "fw", [acc]})
# end
# acc+2
# end)
Binding
# defmodule InputSystem do
# @moduledoc """
# Simplified generic input binding system that emits commands based on input state changes.
# """
# @doc """
# Process state changes to generate commands.
# """
# def process_change(previous, current, context \\ %{}) do
# previous
# |> StateComparator.compare(current)
# |> EventGenerator.generate_events()
# |> CommandMapper.map_to_commands(context)
# end
# end
# defmodule StateComparator do
# @moduledoc """
# Compares previous and current input states to detect changes.
# """
# def compare(previous, current) do
# %{
# changes: detect_changes(previous, current),
# timestamp: Map.get(current, "timestamp", 0)
# }
# end
# defp detect_changes(previous, current) do
# %{
# buttons: detect_button_changes(previous, current),
# knobs: detect_knob_changes(previous, current)
# }
# end
# defp detect_button_changes(previous, current) do
# prev_buttons = Map.get(previous, "buttons", [])
# curr_buttons = Map.get(current, "buttons", [])
# # Get all unique button IDs
# button_ids = (prev_buttons ++ curr_buttons)
# |> Enum.map(&Map.get(&1, "id"))
# |> Enum.uniq()
# # Create lookup maps for efficient access
# prev_map = index_by_id(prev_buttons)
# curr_map = index_by_id(curr_buttons)
# # Generate changes for each button
# button_ids
# |> Enum.map(fn id ->
# analyze_button_change(
# id,
# Map.get(prev_map, id, %{"id" => id, "pressed" => false, "duration" => 0}),
# Map.get(curr_map, id, %{"id" => id, "pressed" => false, "duration" => 0})
# )
# end)
# |> Enum.filter(&(&1.change_type != :none))
# end
# defp analyze_button_change(id, previous, current) do
# change_type = cond do
# !previous["pressed"] && current["pressed"] -> :pressed
# previous["pressed"] && !current["pressed"] -> :released
# previous["pressed"] && current["pressed"] && previous["duration"] != current["duration"] -> :held
# true -> :none
# end
# %{
# id: id,
# change_type: change_type,
# previous: previous,
# current: current,
# duration_delta: (current["duration"] || 0) - (previous["duration"] || 0)
# }
# end
# defp detect_knob_changes(previous, current) do
# IO.inspect(previous, label: "prev")
# IO.inspect(current, label: "curr")
# prev_knobs = Map.get(previous, "knobs", [])
# curr_knobs = Map.get(current, "knobs", [])
# # Get all unique knob IDs
# knob_ids = (prev_knobs ++ curr_knobs)
# |> Enum.map(&Map.get(&1, "id"))
# |> Enum.uniq()
# # Create lookup maps for efficient access
# prev_map = index_by_id(prev_knobs)
# curr_map = index_by_id(curr_knobs)
# # Generate changes for each knob
# knob_ids
# |> Enum.map(fn id ->
# analyze_knob_change(
# id,
# Map.get(prev_map, id, %{"id" => id, "value" => 0}),
# Map.get(curr_map, id, %{"id" => id, "value" => 0})
# )
# end)
# |> Enum.filter(&(&1.change_type != :none))
# end
# defp analyze_knob_change(id, previous, current) do
# value_delta = (current["value"] || 0) - (previous["value"] || 0)
# change_type = if abs(value_delta) <= 1, do: :none, else: :changed
# %{
# id: id,
# change_type: change_type,
# previous: previous,
# current: current,
# value_delta: value_delta
# }
# end
# defp index_by_id(items) do
# Enum.reduce(items, %{}, fn item, acc ->
# Map.put(acc, item["id"], item)
# end)
# end
# end
# defmodule InputEvent do
# @moduledoc """
# Represents a change in input state.
# """
# defstruct [:type, :id, :change_type, :timestamp, :metadata]
# def new(type, id, change_type, timestamp, metadata \\ %{}) do
# %InputEvent{
# type: type,
# id: id,
# change_type: change_type,
# timestamp: timestamp,
# metadata: metadata
# }
# end
# end
# defmodule EventGenerator do
# @moduledoc """
# Generates InputEvents from state changes.
# """
# def generate_events(diff) do
# timestamp = diff.timestamp
# button_events = generate_button_events(diff.changes.buttons, timestamp)
# knob_events = generate_knob_events(diff.changes.knobs, timestamp)
# button_events ++ knob_events
# end
# defp generate_button_events(button_changes, timestamp) do
# Enum.map(button_changes, fn change ->
# InputEvent.new(
# :button,
# change.id,
# change.change_type,
# timestamp,
# %{
# duration: change.current["duration"],
# pressed: change.current["pressed"],
# duration_delta: change.duration_delta
# }
# )
# end)
# end
# defp generate_knob_events(knob_changes, timestamp) do
# Enum.map(knob_changes, fn change ->
# IO.inspect(change, label: "knob changes")
# InputEvent.new(
# :knob,
# change.id,
# change.change_type,
# timestamp,
# %{
# value: change.current["value"],
# value_delta: change.value_delta
# }
# )
# end)
# end
# end
# defmodule Command do
# @moduledoc """
# Represents an action to be executed based on input changes.
# """
# defstruct [:action, :source_id, :source_type, :params, :timestamp]
# def new(action, source_id, source_type, opts \\ []) do
# %Command{
# action: action,
# source_id: source_id,
# source_type: source_type,
# params: Keyword.get(opts, :params, %{}),
# timestamp: Keyword.get(opts, :timestamp, 0)
# }
# end
# def to_string(command) do
# param_str = if map_size(command.params) == 0 do
# ""
# else
# params_formatted = command.params
# |> Enum.map(fn {k, v} -> "#{k}=#{v}" end)
# |> Enum.join(",")
# "[#{params_formatted}]"
# end
# "#{command.action}#{param_str}"
# end
# end
# defprotocol InputHandler do
# @moduledoc """
# Defines how input events are translated to commands.
# """
# def handle_event(handler, event)
# end
# defmodule CommandMapper do
# @moduledoc """
# Maps input events to commands based on bindings.
# """
# def map_to_commands(events, context) do
# bindings = Map.get(context, :bindings, %{})
# events
# |> Enum.map(fn event -> map_event_to_command(event, bindings) end)
# |> Enum.filter(&(&1 != nil))
# end
# defp map_event_to_command(event, bindings) do
# handler = get_handler(event, bindings)
# if handler do
# InputHandler.handle_event(handler, event)
# else
# nil
# end
# end
# defp get_handler(event, bindings) do
# binding_key = {event.type, event.id}
# Map.get(bindings, binding_key)
# end
# end
# defmodule Handlers do
# @moduledoc """
# Collection of standard input handlers.
# """
# defmodule MomentaryButton do
# @moduledoc "Handler for momentary button actions"
# defstruct [:action]
# def new(action), do: %__MODULE__{action: action}
# end
# defmodule ToggleButton do
# @moduledoc "Handler for toggle button actions"
# defstruct [:action]
# def new(action), do: %__MODULE__{action: action}
# end
# defmodule DurationButton do
# @moduledoc "Handler for duration-based button actions"
# defstruct [:action, :thresholds]
# def new(action, thresholds \\ &(&1/2)) do
# %__MODULE__{
# action: action,
# thresholds: thresholds
# }
# end
# def calculate_intensity(duration, thresholds) do
# thresholds.(duration)
# end
# end
# defmodule DirectionalKnob do
# @moduledoc "Handler for directional knob actions"
# defstruct [:action]
# def new(action), do: %__MODULE__{action: action}
# end
# defmodule AbsoluteKnob do
# @moduledoc "Handler for absolute position knob actions"
# defstruct [:action, :valuefn]
# def new(action, opts), do: %__MODULE__{action: action, valuefn: Keyword.get(opts, :valuefn, &(&1))}
# end
# defmodule DeltaKnob do
# @moduledoc "Handler for absolute position knob actions"
# defstruct [:action, :valuefn]
# def new(action, opts), do: %__MODULE__{action: action, valuefn: Keyword.get(opts, :valuefn, &(&1))}
# end
# end
# # Implementing the InputHandler protocol for each handler type
# defimpl InputHandler, for: Handlers.MomentaryButton do
# def handle_event(handler, event) do
# case event.change_type do
# :released ->
# Command.new(handler.action, event.id, event.type, timestamp: event.timestamp)
# :held ->
# Command.new(handler.action, event.id, event.type, timestamp: event.timestamp)
# _ -> nil
# end
# end
# end
# defimpl InputHandler, for: Handlers.ToggleButton do
# def handle_event(handler, event) do
# case event.change_type do
# :pressed ->
# Command.new(handler.action, event.id, event.type, params: %{state: "on"}, timestamp: event.timestamp)
# :released ->
# Command.new(handler.action, event.id, event.type, params: %{state: "off"}, timestamp: event.timestamp)
# _ -> nil
# end
# end
# end
# defimpl InputHandler, for: Handlers.DurationButton do
# def handle_event(handler, event) do
# case event.change_type do
# :pressed ->
# duration = event.metadata.duration || 0
# #intensity = Handlers.DurationButton.calculate_intensity(duration, handler.thresholds)
# Command.new(handler.action, event.id, event.type, params: %{mag: 0}, timestamp: event.timestamp)
# :released ->
# duration = -event.metadata.duration_delta || 0
# Command.new(handler.action, event.id, event.type, params: %{mag: (duration>500) && round(duration/50) || 10}, timestamp: event.timestamp)
# :held ->
# duration = event.metadata.duration || 0
# Command.new(handler.action, event.id, event.type, params: %{mag: round(duration/50)}, timestamp: event.timestamp)
# _ -> nil
# end
# end
# end
# defimpl InputHandler, for: Handlers.DirectionalKnob do
# def handle_event(handler, event) do
# case event.change_type do
# :changed ->
# value_delta = event.metadata.value_delta
# direction = if value_delta > 0, do: "clockwise", else: "counterclockwise"
# Command.new(handler.action, event.id, event.type, params: %{direction: direction, magnitude: abs(value_delta)}, timestamp: event.timestamp)
# _ -> nil
# end
# end
# end
# defimpl InputHandler, for: Handlers.DeltaKnob do
# def handle_event(handler, event) do
# case event.change_type do
# :changed ->
# Command.new(handler.action.(event.metadata.value), event.id, event.type, params: %{value: handler.valuefn.(event.metadata.value_delta)}, timestamp: event.timestamp)
# _ -> nil
# end
# end
# end
# defimpl InputHandler, for: Handlers.AbsoluteKnob do
# def handle_event(handler, event) do
# case event.change_type do
# :changed ->
# Command.new(handler.action.(event.metadata.value), event.id, event.type, params: %{value: handler.valuefn.(event.metadata.value)}, timestamp: event.timestamp)
# _ -> nil
# end
# end
# end
# defmodule BindingManager do
# @moduledoc """
# Manages input bindings configuration.
# """
# def new, do: %{}
# def bind_button(bindings, id, handler) do
# Map.put(bindings, {:button, id}, handler)
# end
# def bind_knob(bindings, id, handler) do
# Map.put(bindings, {:knob, id}, handler)
# end
# def pyrg_controller do
# new()
# |> bind_button(0, Handlers.DurationButton.new(:rotator))
# |> bind_button(1, Handlers.DurationButton.new(:fw))
# |> bind_button(2, Handlers.DurationButton.new(:jmp))
# |> bind_button(3, Handlers.DurationButton.new(:undo))
# #|> bind_knob(0, Handlers.AbsoluteKnob.new(&(&1>0 && :rt || :lt), valuefn: &(abs(round(&1 / 3) * 3))))
# |> bind_knob(0, Handlers.DeltaKnob.new(&(&1>0 && :rt || :lt), valuefn: &(abs(round(&1 / 3) * 3))))
# end
# #|> bind_knob(1, Handlers.AbsoluteKnob.new(:wait))
# end
# defmodule Example do
# @moduledoc "Example usage of the input system"
# def demonstrate do
# # Create binding configuration
# bindings = BindingManager.pyrg_controller()
# # Define states
# state1 = %{
# "buttons" => [
# %{"id" => 0, "pressed" => false, "duration" => 0},
# %{"id" => 1, "pressed" => false, "duration" => 0}
# ],
# "knobs" => [%{"id" => 0, "value" => -30}],
# "timestamp" => 100
# }
# state2 = %{
# "buttons" => [
# %{"id" => 0, "pressed" => true, "duration" => 0},
# %{"id" => 1, "pressed" => false, "duration" => 0}
# ],
# "knobs" => [%{"id" => 0, "value" => -44}],
# "timestamp" => 200
# }
# # Process state changes
# commands = InputSystem.process_change(state1, state2, %{bindings: bindings})
# # Output command representations
# command_strings = Enum.map(commands, &Command.to_string/1)
# IO.inspect(command_strings, label: "Generated Commands")
# end
# end
# Example.demonstrate()
# Core primitive: just signal differences
defmodule Signal do
defstruct [:id, :value, :timestamp, :type]
def new(id, value, timestamp, type), do: %Signal{id: id, value: value, timestamp: timestamp, type: type}
def diff(%Signal{id: id} = old, %Signal{id: id} = new) do
%{
id: id,
type: new.type,
delta: new.value - old.value,
dt: new.timestamp - old.timestamp,
from: old.value,
to: new.value,
timestamp: new.timestamp
}
end
end
# Gesture-based accumulator - resets on any button press
defmodule GestureState do
defstruct [:accumulators]
def new do
%GestureState{accumulators: %{}}
end
def accumulate(gesture_state, signal_key, delta) do
current = Map.get(gesture_state.accumulators, signal_key, 0)
new_accumulator = current + delta
updated_state = %{gesture_state |
accumulators: Map.put(gesture_state.accumulators, signal_key, new_accumulator)
}
{updated_state, new_accumulator}
end
def reset_all(gesture_state) do
%{gesture_state | accumulators: %{}}
end
def get_accumulated(gesture_state, signal_key) do
Map.get(gesture_state.accumulators, signal_key, 0)
end
end
# Command generators - pure functions
defmodule Gen do
# Button generators
def momentary(action) do
fn diff, _window_state ->
if diff.from > 0 && diff.to == 0, do: [{action, %{}, diff.timestamp}], else: []
end
end
def duration(action) do
fn diff, _window_state ->
cond do
diff.from == 0 && diff.to > 0 ->
[{action, %{mag: 0}, diff.timestamp}]
diff.from > 0 && diff.to == 0 ->
mag = if diff.from > 500, do: round(diff.from/50), else: 10
[{action, %{mag: mag}, diff.timestamp}]
diff.from > 0 && diff.to > 0 && diff.delta > 0 ->
[{action, %{mag: round(diff.to/50)}, diff.timestamp}]
true -> []
end
end
end
# Knob generators
def delta(action_fn, opts \\ []) do
value_fn = Keyword.get(opts, :value_fn, &(&1))
threshold = Keyword.get(opts, :threshold, 3)
fn diff, gesture_state ->
signal_key = {diff.type, diff.id}
accumulated = GestureState.get_accumulated(gesture_state, signal_key)
if abs(accumulated) >= threshold do
action = action_fn.(accumulated)
value = value_fn.(accumulated)
[{action, %{value: value}, diff.timestamp}]
else
[]
end
end
end
def directional(action) do
fn diff, _gesture_state ->
if abs(diff.delta) > 0 do
direction = if diff.delta > 0, do: "clockwise", else: "counterclockwise"
[{action, %{direction: direction, magnitude: abs(diff.delta)}, diff.timestamp}]
else
[]
end
end
end
end
# The system - holds gesture state internally
defmodule InputSystem do
defstruct [:bindings, :gesture_state]
def new(bindings \\ %{}) do
%InputSystem{
bindings: bindings,
gesture_state: GestureState.new()
}
end
def process_input(system, prev_state, curr_state) do
# Parse to signals
prev_signals = parse_to_signals(prev_state)
curr_signals = parse_to_signals(curr_state)
# Calculate diffs
diffs = calculate_diffs(prev_signals, curr_signals)
# Check if any button was pressed (reset trigger)
button_pressed = Enum.any?(diffs, fn diff ->
diff.type == :button && diff.from == 0 && diff.to > 0
end)
# Reset accumulators if button pressed
initial_gesture_state = if button_pressed do
GestureState.reset_all(system.gesture_state)
else
system.gesture_state
end
# Process each diff and accumulate gesture state
{updated_gesture_state, commands} =
Enum.reduce(diffs, {initial_gesture_state, []}, fn diff, {gesture_state, cmds} ->
# Accumulate knob movements
updated_gesture_state =
if diff.type == :knob && diff.delta != 0 do
{new_gesture_state, _accumulated} =
GestureState.accumulate(gesture_state, {diff.type, diff.id}, diff.delta)
new_gesture_state
else
gesture_state
end
# Generate commands
new_cmds = case Map.get(system.bindings, {diff.type, diff.id}) do
nil -> []
generator -> generator.(diff, updated_gesture_state)
end
{updated_gesture_state, cmds ++ new_cmds}
end)
updated_system = %{system | gesture_state: updated_gesture_state}
{updated_system, commands}
end
defp parse_to_signals(input_state) do
timestamp = Map.get(input_state, "timestamp", 0)
buttons = Map.get(input_state, "buttons", [])
|> Enum.map(&Signal.new(&1["id"], &1["duration"] || 0, timestamp, :button))
knobs = Map.get(input_state, "knobs", [])
|> Enum.map(&Signal.new(&1["id"], &1["value"] || 0, timestamp, :knob))
buttons ++ knobs
end
defp calculate_diffs(prev_signals, curr_signals) do
prev_map = Enum.into(prev_signals, %{}, fn s -> {{s.type, s.id}, s} end)
Enum.map(curr_signals, fn curr ->
key = {curr.type, curr.id}
prev = Map.get(prev_map, key, %Signal{id: curr.id, value: 0, timestamp: curr.timestamp, type: curr.type})
Signal.diff(prev, curr)
end)
|> Enum.filter(&(&1.delta != 0 || &1.dt > 0))
end
end
# Clean binding DSL
defmodule Bindings do
def new, do: %{}
def button(bindings, id, generator), do: Map.put(bindings, {:button, id}, generator)
def knob(bindings, id, generator), do: Map.put(bindings, {:knob, id}, generator)
# lions
def lionclaws do
new()
|> button(0, Gen.duration(:rotator))
|> button(1, Gen.duration(:fw))
|> button(2, Gen.duration(:rt))
|> button(3, Gen.duration(:undo))
|> knob(0, Gen.delta(&(&1 > 0 && :lt || :rt), value_fn: &(abs(round(&1 / 3) * 3))))
end
end
Controller
defmodule Pirate.Controller do
use GenServer
require Logger
alias Circuits.UART
import NimbleParsec
# Define the parser for the JSON format
json_marker_start = string("##JSON##")
json_marker_end = string("##END##")
# Capture everything between the markers
json_content =
ignore(json_marker_start)
|> repeat(lookahead_not(json_marker_end) |> utf8_char([]))
|> reduce({List, :to_string, []})
|> ignore(json_marker_end)
|> optional(string("\r"))
# d0_parse
defparsec(:do_parse, json_content)
# Client API
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts)
end
def find_ports(pid, baud_rate \\ 9600) do
GenServer.call(pid, {:find_ports, baud_rate})
end
def disconnect(pid) do
GenServer.call(pid, :disconnect)
end
@impl true
def init(_opts) do
{:ok, uart_pid} = UART.start_link()
bindings = Bindings.lionclaws()
{:ok,
%{
uart_pid: uart_pid,
port: nil,
connected: false,
last: nil,
system: InputSystem.new(bindings)
}}
end
@impl true
def handle_call({:find_ports, baud_rate}, _from, state) do
curr =
case Circuits.UART.enumerate()
|> Enum.find(fn
{_k, %{description: "USB Serial"}} -> true
_ -> false
end)
|> elem(0) do
port when is_binary(port) -> port
_ -> nil
end
send(self(), {:connect, curr, baud_rate})
{:reply, curr, state}
end
@impl true
def handle_call(:disconnect, _, %{uart_pid: uart_pid} = state) do
if state.connected do
UART.close(uart_pid)
Logger.info("Disconnected from Arduino")
{:reply, :ok, %{state | port: nil, connected: false}}
else
{:reply, {:error, :not_connected}, state}
end
end
def handle_call(event, state) do
{:reply, {:error, event}, state}
end
@impl true
def handle_info({:connect, port, baud_rate}, %{uart_pid: uart_pid} = state) do
if state.connected do
IO.inspect("disconnecting....")
UART.close(uart_pid)
end
IO.inspect("connecting....")
case UART.open(uart_pid, port,
speed: baud_rate,
id: :pid,
framing: Circuits.UART.Framing.Line
) do
:ok ->
IO.inspect("Connected to Arduino on port #{port} at #{baud_rate} baud")
{:noreply, %{state | port: port, connected: true}}
{:error, reason} ->
IO.inspect("Failed to connect to port #{port}: #{inspect(reason)}")
{:noreply, state}
end
end
def handle_info({circuits_uart, pid, data}, %{last: prev_state, system: system} = state)
when not is_nil(prev_state) do
curr_state = parse(data)
case curr_state["type"] do
"state" ->
{updated_system, commands} = InputSystem.process_input(system, prev_state, curr_state)
# Process commands
Enum.each(commands, fn {action, params, timestamp} ->
Pirate.Cluster.send_to_topic_on_node(
"ora@127.0.0.1",
"class:shell:pirate",
{Dojo.Controls, action, Map.values(params)}
)
end)
{:noreply, %{state | last: curr_state, system: updated_system}}
event ->
IO.inspect(curr_state, label: event)
{:noreply, state}
end
end
def handle_info({circuits_uart, pid, data}, state) do
curr_state = parse(data)
# if data !== state.last do
# Pirate.Cluster.send_to_topic_on_node("ora@127.0.0.1", "class:shell:pirate", {Dojo.Controls, "rt", [data]})
# end
if curr_state["type"] == "state" do
{:noreply, %{state | last: curr_state}}
else
IO.inspect(curr_state)
{:noreply, state}
end
end
def handle_info(event, _from, state) do
IO.inspect(event, label: "pokemon_clause")
{:noreply, state}
end
# Private Functions
defp parse(input) do
# Parse the raw data from the Arduino based on your protocol
try do
with {:ok, [json_string, _], "", _, _, _} <- do_parse(input),
parsed <- :json.decode(json_string) do
# IO.inspect(json_string)
parsed
else
{:error, reason, _, _, _, _} -> {:error, "Parse error: #{reason}"}
{:error, reason} -> {:error, reason}
what -> {:error, "Unknown parsing error", what}
end
rescue
e in RuntimeError ->
%{}
e in ErlangError ->
IO.inspect(e)
%{}
end
end
end
{:ok, pid}= Pirate.Controller.start_link()
Pirate.Controller.find_ports(pid, 115200)
Pirate.Controller.disconnect(pid)