Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Logflare Websocket Client

logflare_websockets_ingest.livemd

Logflare Websocket Client

Connect

Mix.install([
  {:jason, "~> 1.2"},
  {:castore, "~> 0.1.0"},
  {:phoenix_client, "~> 0.11.1"},
  {:kino, "~> 0.5.0"}
])
api_key = Kino.Input.text("Logflare ingest API key")
source_uuid = Kino.Input.text("Logflare source UUID")
defmodule Logflare.PhxClient do
  use GenServer

  alias PhoenixClient.{Socket, Channel, Message}

  @url "wss://api.logflare.app/logs/websocket"

  def start_link(opts) do
    opts = [{:base_url, @url} | opts]
    GenServer.start_link(__MODULE__, opts)
  end

  def push_batch(pid, batch) do
    GenServer.cast(pid, {:push_batch, batch})
  end

  def ping(pid) do
    GenServer.cast(pid, :ping)
  end

  def init(opts) do
    api_key = opts[:api_key]
    source_uuid = opts[:source_uuid]
    base_url = opts[:base_url]

    params = URI.encode_query(%{"api_key" => api_key, "vsn" => "2.0.0"})
    url = base_url <> "?" <> params

    socket_opts = [url: url]

    {:ok, socket} = Socket.start_link(socket_opts)

    join(0)

    {:ok,
     %{
       socket: socket,
       connected: false,
       joined: false,
       channel: nil,
       source_uuid: source_uuid
     }}
  end

  def handle_cast({:push_batch, batch}, state) do
    if state.connected do
      Channel.push_async(state.channel, "batch", batch)

      {:ok, :pushed}
    else
      {:error, :not_connected}
    end

    {:noreply, state}
  end

  def handle_cast(:ping, state) do
    if state.connected do
      Channel.push_async(state.channel, "ping", "pinging")

      {:ok, :pushed}
    else
      {:error, :not_connected}
    end

    {:noreply, state}
  end

  def handle_info(%Message{} = payload, state) do
    IO.puts("Incoming Message: #{inspect(payload)}")
    {:noreply, state}
  end

  def handle_info(:join, state) do
    state =
      if Socket.connected?(state.socket) do
        IO.puts("Connected!")
        {:ok, _response, channel} = Channel.join(state.socket, "logs:" <> state.source_uuid)
        IO.puts("Joined!")

        %{state | connected: true, joined: true, channel: channel}
      else
        join()

        state
      end

    {:noreply, state}
  end

  defp join(every \\ 1_000) do
    IO.puts("Connecting...")
    Process.send_after(self(), :join, every)
  end
end
opts = [api_key: Kino.Input.read(api_key), source_uuid: Kino.Input.read(source_uuid)]

{:ok, pid} = Logflare.PhxClient.start_link(opts)
Logflare.PhxClient.ping(pid)
batch = [%{message: "This is a log message", metadata: %{product: "hat", color: "blue"}}]

Logflare.PhxClient.push_batch(pid, %{"batch" => batch})