Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Untitled notebook

screens/vehicles.livemd

Untitled notebook

Section

import Ecto.Query

defmodule Fetcher do
  def fetch(url) do
    %{status_code: 200, body: body} =
      Transport.Shared.Wrapper.HTTPoison.impl().get!(url, [], follow_redirect: true)

    %{
      header: %{
        gtfs_realtime_version: _version,
        incrementality: :FULL_DATASET,
        timestamp: _timestamp
      },
      entity: entity
    } = TransitRealtime.FeedMessage.decode(body)

    entity
  end
end

DB.Resource
|> where(format: "gtfs-rt")
|> preload(:dataset)
|> DB.Repo.all(log: false)
|> Enum.filter(& &1.dataset.is_active)
# |> Enum.take(1)
|> Task.async_stream(
  fn resource ->
    try do
      vehicles =
        resource.url
        |> Fetcher.fetch()
        |> Enum.filter(& &1.vehicle)
        |> Enum.map(& &1.vehicle)
        |> Enum.map(fn v ->
          %{
            vehicle_id: v.vehicle.id,
            latitude: v.position.latitude,
            longitude: v.position.longitude,
            bearing: v.position.bearing,
            timestamp: v.timestamp
          }
        end)

      %{
        resource_id: resource.id,
        vehicles: vehicles
      }
    rescue
      e -> %{error: e}
    end
  end,
  max_concurrency: 50,
  on_timeout: :kill_task,
  timeout: 10000
)
|> Enum.map(fn {:ok, r} -> r end)
|> Enum.reject(fn r -> Map.has_key?(r, :error) end)
|> Enum.reject(fn r -> r.vehicles |> length() == 0 end)
|> Enum.map(fn x -> {x.resource_id, x.vehicles |> length()} end)
|> Enum.into([])