Powered by AppSignal & Oban Pro

ROSBRIDGE

docs/roswhisperer.livemd

ROSBRIDGE

Mix.install([
  {:jason, "~> 1.3"},
  {:gun, "~> 2.0.0-rc.2"}
])

Root

defmodule RosWhisperer do
  use GenServer

  @timeout 10000

  def start_link(args) do
    {:ok, pid} = GenServer.start_link(__MODULE__, args, [{:name, __MODULE__}])
  end

  def init(arguments) do
    IO.inspect(arguments)
    {:ok, nil, {:continue, :connect}}
  end

  def handle_continue(:connect, state) do
    connect_opts = %{
      connect_timeout: 60000,
      retry: 10,
      retry_timeout: 300,
      transport: :tcp,
      # Specify :http or :http2 to tell the server what you'd like. Support for http 2  is not great generally
      protocols: [:http],
      # Tell the server which version to use. Note: this is an atom.
      http_opts: %{version: :"HTTP/1.1"}
    }

    {:ok, conn} = :gun.open('localhost', 9090, connect_opts)
    {:ok, :http} = :gun.await_up(conn, 1000)
    ref = :gun.ws_upgrade(conn, '/')
    {:ok, {["websocket"], _header}} = await_upgrade(conn, ref)
    {:noreply, %{conn: conn, ref: ref}}
  end

  def handle_info({:gun_ws, conn, _stream_ref, {:text, frame}}, state) do
    {:ok, message} = Jason.decode(frame)
    # {:ok, {@channel, ^message}} = PubSub.broadcast_from(@channel, message)
    {:noreply, state}
  end

  def handle_call({:ws_send, frame}, _from, state) do
    IO.inspect(state)
    {:reply, :gun.ws_send(state.conn, state.ref, {:text, frame |> Jason.encode!()}), state}
  end

  def handle_call({:say, goodbye}, _from, conn) do
    IO.puts("server going bye bye in T-#{inspect(goodbye)}s")
    :timer.send_interval(:timer.seconds(goodbye), self(), :stop)
    {:reply, {:ok, :saygoodbye}, conn}
  end

  def handle_info(:stop, conn) do
    IO.puts("sayonara")
    {:stop, :normal, conn}
  end

  defp await_upgrade(gun_pid, stream_ref) do
    receive do
      {:gun_upgrade, ^gun_pid, ^stream_ref, protocols, headers} ->
        {:ok, {protocols, headers}}
    after
      @timeout ->
        {:error, :timeout}
    end
  end

  def terminate(reason, conn) do
    _pid = spawn(fn -> :ok = :gun.close(conn) end)
    {:ok, reason}
  end
end

Gen Server Commands

config = [host: "localhost"]
{:ok, pid} = GenServer.start_link(RosWhisperer, config)
advertise = %{
  "op" => "advertise",
  "id" => "test",
  "topic" => "destination",
  "type" => "std_msgs/String"
}

GenServer.call(pid, {:ws_send, advertise})

frame = %{
  "op" => "publish",
  "id" => "test",
  "topic" => "destination",
  "msg" => %{data: "home a30ne"}
}

GenServer.call(pid, {:ws_send, frame})

Cleaning up

GenServer.call(pid, {:say, 2})

Without GenServer

receiver = fn gun_pid, stream_ref ->
  receive do
    {:gun_upgrade, ^gun_pid, ^stream_ref, protocols, headers} ->
      {:ok, {protocols, headers}}
  after
    10000 ->
      {:error, :timeout}
  end
end

# RosWhisperer.start_link(nil)
connect_opts = %{
  connect_timeout: 60000,
  retry: 10,
  retry_timeout: 300,
  transport: :tcp,
  # Specify :http or :http2 to tell the server what you'd like. Support for http 2  is not great generally
  protocols: [:http],
  # Tell the server which version to use. Note: this is an atom.
  http_opts: %{version: :"HTTP/1.1"}
}

{:ok, conn} = :gun.open('localhost', 9090, connect_opts)
{:ok, :http} = :gun.await_up(conn, 1000)
ref = :gun.ws_upgrade(conn, '/')
{:ok, {["websocket"], _header}} = receiver.(conn, ref)
:gun.ws_send(
  conn,
  ref,
  {:text, %{"op" => "advertise", "type" => "string", "topic" => "destination"} |> Jason.encode!()}
)
:gun.close(conn)