Websockets
Mix.install([
{:gun, "~> 2.2"},
{:gen_stage, "~> 1.3.2"},
{:jason, "~> 1.4.4"},
{:flow, "~> 1.2.4"},
{:websockex, "~> 0.5.0", hex: :websockex_wt}
])
Section
defmodule BlueskyFirehoseClient do
use WebSockex
def start_link(url, state \\ []) do
WebSockex.start_link(url, __MODULE__, state)
end
def init(state) do
{:ok, state}
end
def handle_frame({:text, message}, state) do
# IO.puts("Received message: #{message}")
send(:broadcaster, message)
{:ok, state}
end
def handle_connect(_conn, state) do
IO.puts("Connected!")
{:ok, state}
end
def handle_disconnect(_reason, state) do
IO.puts("Disconnected!")
{:ok, state}
end
end
defmodule Broadcaster do
use GenStage
def start_link(%{name: name} = initial) do
GenStage.start_link(__MODULE__, initial, name: name)
end
def init(initial) do
state = %{pending_demand: 0, buffer: []}
subscribe_options = Map.get(initial, :subscribe_options, nil)
if subscribe_options do
{:producer_consumer, state,
subscribe_to: subscribe_options, dispatcher: GenStage.BroadcastDispatcher}
else
{:producer, state, dispatcher: GenStage.BroadcastDispatcher}
end
end
defp get_events_to_emit_and_next_state(state) do
pending_demand = state.pending_demand
{to_emit, remaining} = Enum.split(state.buffer, pending_demand)
next_state =
state
|> Map.put(:buffer, remaining)
|> Map.put(:pending_demand, pending_demand - length(to_emit))
{to_emit, next_state}
end
def handle_demand(new_demand, state) do
pending_demand = state.pending_demand
demand = pending_demand + new_demand
state = %{state | pending_demand: demand}
{to_emit, next_state} = get_events_to_emit_and_next_state(state)
{:noreply, to_emit, next_state}
end
def handle_info(events, state) when is_list(events), do: handle_events(events, self(), state)
def handle_info(event, state), do: handle_events([event], self(), state)
def handle_events(new_events, _from, state) do
buffer = state.buffer
new_state = %{state | buffer: buffer ++ new_events}
{to_emit, next_state} = get_events_to_emit_and_next_state(new_state)
{:noreply, to_emit, next_state}
end
end
{:ok, br_pid} = Broadcaster.start_link(%{name: :broadcaster})
window =
Flow.Window.fixed(1, :second, fn %{"time_us" => timestamp} -> div(timestamp, 1000) end)
|> Flow.Window.allowed_lateness(1, :second)
{:ok, pid } = BlueskyFirehoseClient.start_link("wss://jetstream1.us-east.bsky.network/subscribe")
flow = Flow.from_stages([br_pid])
|> Flow.map(&Jason.decode!/1)
|> Flow.partition(window: window)
|> Flow.reduce(fn -> [] end, fn event, acc -> [ event | acc ] end)
|> Flow.on_trigger(fn acc, _partition_info, {_type, window, trigger} ->
if trigger == :done do
# IO.puts("Window: #{DateTime.from_unix!(window, :millisecond)}, Nº: #{length(acc)}")
{[{window, length(acc)}], []}
else
{[], acc}
end
end)
|> Flow.partition(stages: 1,
window: Flow.Window.fixed(1, :second, fn {window, _acc} -> window end))
|> Flow.group_by(fn {window, _acc} -> window end)
|> Flow.on_trigger(fn acc, _partition_info, {_type, _window, trigger} ->
if trigger == :done do
Enum.each(acc, fn group ->
{window, grouped} = group
count = Enum.reduce(grouped, 0, fn {_timestamp, val}, acc -> acc + val end)
IO.puts("Window: #{DateTime.from_unix!(window, :millisecond)}, Nº: #{count}")
end)
{[], []}
else
{[], acc}
end
end)
|> Flow.start_link()