Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Simple websocket server

content/simple_ws_server.livemd

Simple websocket server

Mix.install([
  {:bandit, "~> 1.6"},
  {:websock_adapter, "~> 0.5.8"},
  {:websock, "~> 0.5.3"},
  {:websockex, "~> 0.4.3"},
  {:kino, "~> 0.14.2"},
  case System.user_home() do
    "C:\\Users\\chgeuer" -> {:kino_websocket, path: "c:/github/chgeuer/kino_websocket"}
    "/home/chgeuer" -> {:kino_websocket, path: "/mnt/c/github/chgeuer/kino_websocket"}
      _ -> {:kino_websocket, github: "chgeuer/kino_websocket"}
  end
])

Launch a local server

defmodule WebSocketEcho.Socket do
  @behaviour WebSock

  @impl WebSock
  def init(options) do
    IO.puts("Connect #{inspect(options)}")

    :timer.send_interval(3000, :tick)

    %{
      remote_ip: options.conn.remote_ip,
      authorization: options.conn.req_headers |> Map.new() |> Map.get("authorization")
    }
    |> ok()
  end

  @impl WebSock
  def handle_control(:established, state) do
    IO.puts("Established")

    state
    |> ok()
  end

  def handle_control(x, state) do
    IO.puts("Established #{inspect({x})}")
    :timer.send_interval(1000, :tick)

    state
    |> ok()
  end

  @impl WebSock
  def handle_info(:tick, state) do
    IO.puts("handle_info(:tick)")

    "Current time: #{Time.utc_now() |> Time.truncate(:second) |> Time.to_string()}. State #{inspect(state)}"
    |> push_text(state)
  end

  @impl WebSock
  def handle_in({text, opcode: :text}, state) do
    if text == "close" do
      {:stop, :normal, state}
    else 
      "Echo: #{text}"
      |> push_text(state)
    end
  end

  @impl WebSock
  def terminate(reason, state) do
    IO.inspect("Terminating reason=#{inspect(reason)} state=#{inspect(state)}")
    :ok
  end

  defp ok(state), do: {:ok, state}
  defp push_text(value, state), do: {:text, value} |> push(state)
  defp push(value, state), do: {:push, value, state}
end

defmodule WebSocketEcho.Router do
  use Plug.Router

  plug(:match)
  plug(:dispatch)

  get "/ws" do
    conn
    |> WebSockAdapter.upgrade(WebSocketEcho.Socket, %{conn: conn}, timeout: 3600_000)
  end

  match _ do
    conn
    |> send_resp(404, "Not found")
  end
end

defmodule WebSocketEcho.Application do
  use Application

  @impl true
  def start(_type, [port: port]) do
    children = [
      {Bandit, plug: WebSocketEcho.Router, scheme: :http, port: port}
    ]

    opts = [strategy: :one_for_one, name: WebSocketEcho.Supervisor]

    Supervisor.start_link(children, opts)
  end
end

port = 4040

{:ok, server_pid} = 
  WebSocketEcho.Application.start(nil, port: port)
  |> case do
    {:ok, p} -> {:ok, p}
    {:error, {:already_started, p}} -> {:ok, p}
  end

IO.puts("Connect on ws://localhost:#{port}/ws")
defmodule Kino.WebSocket.SmartCell.Client do
  use WebSockex

  def start_link(endpoint, parent) do
    WebSockex.start_link(endpoint, __MODULE__, parent, extra_headers: [])
  end

  def handle_frame({:text, message}, parent) do
    send(parent, {:websocket_message, message})
    {:ok, parent}
  end

  def handle_disconnect(%{reason: {:local, _reason}}, state) do
    {:ok, state}
  end

  def handle_cast(:close, state) do
    {:close, state}
  end
end

Kino.WebSocket.SmartCell.Client.start_link("wss://ws.postman-echo.com/raw")
defmodule TimeClient do
  use WebSockex

  # @endpoint "ws://localhost:4000/ws"
  # @endpoint "ws://myapp.ninja"

  def start_link(endpoint, state, opts \\ []) do
    WebSockex.start_link(endpoint, __MODULE__, state, opts)
  end

  @impl WebSockex
  def handle_frame({:text, message}, state) do
    IO.inspect({:text, message}, label: "handle_frame")
    # {:reply, {:text, "Echo: #{message}"}, state}
    {:ok, state}
  end

  @impl WebSockex
  def handle_cast(:close, state) do
    {:close, state}
  end

  @impl WebSockex
  def handle_cast(info, state) do
    time = Time.utc_now() |> Time.truncate(:second) |> Time.to_string()
    {:reply, {:text, "Client got a #{inspect(info)}, Current time at client is: #{time}"}, state}
  end

  @impl WebSockex
  def handle_ping(ping_frame, state) do
    IO.puts("Got pinged: #{inspect(ping_frame)}")
    {:ok, state}
  end

  @impl WebSockex
  def handle_pong(pong_frame, state) do
    IO.puts("Got ponged: #{inspect(pong_frame)}")
    {:ok, state}
  end

  @impl WebSockex
  def terminate(close_reason, _state) do
    IO.inspect(close_reason, label: :terminate)
    :ok
    exit(:normal)
  end
  
  def send_frame(client_pid, frame) do
    WebSockex.send_frame(client_pid, frame)
  end

  def send_text(client_pid, text) do
    __MODULE__.send_frame(client_pid, {:text, text})
  end
  
  def close(client_pid) do
    WebSockex.cast(client_pid, :close)
  end
end
endpoint = "ws://localhost:#{port}/ws"

client_pid =
  try do
    {:ok, client_pid} =
      TimeClient.start_link(endpoint, nil,
        extra_headers: [
          {"Authorization", "Bearer ey..."},
          {"X-Foo", "Yeah"}
        ],
        debug: [:trace],
        # host: "localhost",
        # port: port,
        # conn_mod: :gen_tcp,
        # cacerts: 
        # insecure: false
        socket_connect_timeout: 6_000,
        socket_recv_timeout: 5_000
        # ssl_options: %{
        #   verify: :verify_none
        # }
      )

    client_pid
  rescue
    e in WebSockex.ConnError ->
      IO.puts("Connection failed: #{inspect(e)}")
      nil
  catch
    :exit, reason ->
      IO.puts("Connection terminated: #{inspect(reason)}")
      nil
  end
TimeClient.send_text(client_pid, "Hallo3")
# WebSockex.send_frame(client_pid, {:text, "Hallo3"})
# send(client_pid, "foo")
TimeClient.close(client_pid)