Logflare Websocket Client
Connect
Mix.install([
{:jason, "~> 1.2"},
{:castore, "~> 0.1.0"},
{:phoenix_client, "~> 0.11.1"},
{:kino, "~> 0.5.0"}
])
api_key = Kino.Input.text("Logflare ingest API key")
source_uuid = Kino.Input.text("Logflare source UUID")
defmodule Logflare.PhxClient do
use GenServer
alias PhoenixClient.{Socket, Channel, Message}
@url "wss://api.logflare.app/logs/websocket"
def start_link(opts) do
opts = [{:base_url, @url} | opts]
GenServer.start_link(__MODULE__, opts)
end
def push_batch(pid, batch) do
GenServer.cast(pid, {:push_batch, batch})
end
def ping(pid) do
GenServer.cast(pid, :ping)
end
def init(opts) do
api_key = opts[:api_key]
source_uuid = opts[:source_uuid]
base_url = opts[:base_url]
params = URI.encode_query(%{"api_key" => api_key, "vsn" => "2.0.0"})
url = base_url <> "?" <> params
socket_opts = [url: url]
{:ok, socket} = Socket.start_link(socket_opts)
join(0)
{:ok,
%{
socket: socket,
connected: false,
joined: false,
channel: nil,
source_uuid: source_uuid
}}
end
def handle_cast({:push_batch, batch}, state) do
if state.connected do
Channel.push_async(state.channel, "batch", batch)
{:ok, :pushed}
else
{:error, :not_connected}
end
{:noreply, state}
end
def handle_cast(:ping, state) do
if state.connected do
Channel.push_async(state.channel, "ping", "pinging")
{:ok, :pushed}
else
{:error, :not_connected}
end
{:noreply, state}
end
def handle_info(%Message{} = payload, state) do
IO.puts("Incoming Message: #{inspect(payload)}")
{:noreply, state}
end
def handle_info(:join, state) do
state =
if Socket.connected?(state.socket) do
IO.puts("Connected!")
{:ok, _response, channel} = Channel.join(state.socket, "logs:" <> state.source_uuid)
IO.puts("Joined!")
%{state | connected: true, joined: true, channel: channel}
else
join()
state
end
{:noreply, state}
end
defp join(every \\ 1_000) do
IO.puts("Connecting...")
Process.send_after(self(), :join, every)
end
end
opts = [api_key: Kino.Input.read(api_key), source_uuid: Kino.Input.read(source_uuid)]
{:ok, pid} = Logflare.PhxClient.start_link(opts)
Logflare.PhxClient.ping(pid)
batch = [%{message: "This is a log message", metadata: %{product: "hat", color: "blue"}}]
Logflare.PhxClient.push_batch(pid, %{"batch" => batch})