Analyse évolutions IRVE
Mix.install([
{:ecto_sql, "~> 3.10"},
{:postgrex, ">= 0.0.0"},
{:kino_db, "~> 0.2.3"},
{:jason, "~> 1.4"},
{:req, "~> 0.4.0"},
{:nimble_csv, "~> 1.2"},
{:kino_vega_lite, "~> 0.1.10"}
])
Section
opts = [
hostname: "localhost",
port: 5432,
username: "postgres",
password: "",
database: "transport_repo"
]
{:ok, conn} = Kino.start_child({Postgrex, opts})
result =
Postgrex.query!(
conn,
"""
select id, payload ->> 'permanent_url' as url, inserted_at
from resource_history rh
where rh.resource_id = 81623
order by inserted_at asc
""",
[]
)
columns = ["id", "url", "inserted_at"]
%{columns: columns, rows: rows} = result
snapshots =
rows
|> Enum.map(fn x ->
columns
|> Enum.zip(x)
|> Map.new()
end)
path = Path.join(__ENV__.file, "../../apps/shared/lib/req_custom_cache.ex") |> Path.expand()
Code.require_file(path)
defmodule Query do
def cache_dir, do: Path.join(__ENV__.file, "../cache-dir") |> Path.expand()
def cached_get!(url) do
req = Req.new() |> Transport.Shared.ReqCustomCache.attach()
Req.get!(req, url: url, receive_timeout: 100_000, custom_cache_dir: cache_dir())
end
end
:ok
task = fn row = %{"url" => url} ->
IO.puts("getting #{url}")
%{status: 200, body: body} = Query.cached_get!(url)
# NOTE: headers appear as one line at this stage
Map.put(row, "row_count", (body |> length()) - 1)
end
data =
snapshots
|> Task.async_stream(
task,
max_concurrency: 25,
on_timeout: :kill_task,
timeout: 50_000
)
|> Stream.map(fn {:ok, result} -> result end)
|> Stream.map(fn x -> Map.take(x, ["inserted_at", "row_count"]) end)
|> Enum.into([])
data
|> Kino.DataTable.new()
VegaLite.new(width: 800)
|> VegaLite.data_from_values(data, only: ["inserted_at", "row_count"])
|> VegaLite.mark(:bar)
|> VegaLite.encode_field(:x, "inserted_at", type: :temporal)
|> VegaLite.encode_field(:y, "row_count", type: :quantitative)
|> VegaLite.encode_field(:color, "row_count", type: :quantitative)