Powered by AppSignal & Oban Pro

Websockets

data_exploration/bluesky_firehose.livemd

Websockets

Mix.install([
  {:gun, "~> 2.2"},
  {:gen_stage, "~> 1.3.2"},
  {:jason, "~> 1.4.4"},
  {:flow, "~> 1.2.4"},
  {:websockex, "~> 0.5.0", hex: :websockex_wt}
])

Section

defmodule BlueskyFirehoseClient do
  use WebSockex

  def start_link(url, state \\ []) do
    WebSockex.start_link(url, __MODULE__, state)
  end

  def init(state) do
    {:ok, state}
  end
  
  def handle_frame({:text, message}, state) do
    # IO.puts("Received message: #{message}")
    send(:broadcaster, message)
    {:ok, state}
  end
  
  def handle_connect(_conn, state) do
    IO.puts("Connected!")
    {:ok, state}
  end

  def handle_disconnect(_reason, state) do
    IO.puts("Disconnected!")
    {:ok, state}
  end
end
defmodule Broadcaster do
  use GenStage

  def start_link(%{name: name} = initial) do
    GenStage.start_link(__MODULE__, initial, name: name)
  end

  def init(initial) do
    state = %{pending_demand: 0, buffer: []}
    subscribe_options = Map.get(initial, :subscribe_options, nil)

    if subscribe_options do
      {:producer_consumer, state,
       subscribe_to: subscribe_options, dispatcher: GenStage.BroadcastDispatcher}
    else
      {:producer, state, dispatcher: GenStage.BroadcastDispatcher}
    end
  end

  defp get_events_to_emit_and_next_state(state) do
    pending_demand = state.pending_demand
    {to_emit, remaining} = Enum.split(state.buffer, pending_demand)

    next_state =
      state
      |> Map.put(:buffer, remaining)
      |> Map.put(:pending_demand, pending_demand - length(to_emit))

    {to_emit, next_state}
  end

  def handle_demand(new_demand, state) do
    pending_demand = state.pending_demand
    demand = pending_demand + new_demand
    state = %{state | pending_demand: demand}
    {to_emit, next_state} = get_events_to_emit_and_next_state(state)

    {:noreply, to_emit, next_state}
  end

  def handle_info(events, state) when is_list(events), do: handle_events(events, self(), state)

  def handle_info(event, state), do: handle_events([event], self(), state)

  def handle_events(new_events, _from, state) do
    buffer = state.buffer
    new_state = %{state | buffer: buffer ++ new_events}
    {to_emit, next_state} = get_events_to_emit_and_next_state(new_state)

    {:noreply, to_emit, next_state}
  end

end
{:ok, br_pid} = Broadcaster.start_link(%{name: :broadcaster})
window =
  Flow.Window.fixed(1, :second, fn %{"time_us" => timestamp} -> div(timestamp, 1000) end)
  |> Flow.Window.allowed_lateness(1, :second) 
{:ok, pid } = BlueskyFirehoseClient.start_link("wss://jetstream1.us-east.bsky.network/subscribe")
flow = Flow.from_stages([br_pid])
       |> Flow.map(&Jason.decode!/1)
       |> Flow.partition(window: window)
       |> Flow.reduce(fn -> [] end, fn event, acc -> [ event | acc ] end)
       |> Flow.on_trigger(fn acc, _partition_info, {_type, window, trigger} ->
          if trigger == :done do
            # IO.puts("Window: #{DateTime.from_unix!(window, :millisecond)}, Nº: #{length(acc)}")
            {[{window, length(acc)}], []}
          else
            {[], acc}
          end
        end)
       |> Flow.partition(stages: 1,
         window: Flow.Window.fixed(1, :second, fn {window, _acc} -> window end))
       |> Flow.group_by(fn {window, _acc} -> window end)
       |> Flow.on_trigger(fn acc, _partition_info, {_type, _window, trigger} ->
          if trigger == :done do
            Enum.each(acc, fn group ->
              {window, grouped} = group
              count = Enum.reduce(grouped, 0, fn {_timestamp, val}, acc -> acc + val end)
              IO.puts("Window: #{DateTime.from_unix!(window, :millisecond)}, Nº: #{count}")
            end)
            {[], []}
          else
            {[], acc}
          end
        end)
       |> Flow.start_link()