Day 15
# Configure postgrex to use the build-in JSON library before it's compiled
Application.put_env(:postgrex, :json_library, JSON)
Mix.install([
{:ecto_sql, "~> 3.10"},
{:postgrex, ">= 0.0.0"},
{:kino, "~> 0.18.0"}
])
Kino.configure(inspect: [charlists: :as_lists])
Application.put_env(:myapp, Repo,
url: "ecto://postgres:postgres@localhost/advent_of_sql_2025"
)
defmodule Repo do
use Ecto.Repo, adapter: Ecto.Adapters.Postgres, otp_app: :myapp
end
{:ok, _pid} = Repo.start_link()
Setup
import Ecto.Query
defmodule IncomingDispatch do
use Ecto.Schema
@primary_key false
schema "incoming_dispatches" do
field :system_id, :string
field :dispatched_at, :naive_datetime
field :payload, :map
end
end
defmodule SystemDispatch do
use Ecto.Schema
schema "system_dispatches" do
field :system_id, :string
field :dispatched_at, :naive_datetime
field :payload, :map
field :marker_letter, :string
end
end
Process incoming dispatches
# Process incoming dispatches as an atomic operation
process_query =
from(
i in IncomingDispatch,
select: map(i, [:system_id, :dispatched_at, :payload])
)
{:ok, {num_deleted, num_inserted}} = Repo.transact(fn ->
{num_deleted, deleted} = Repo.delete_all(process_query)
{num_inserted, _inserted} = Repo.insert_all(SystemDispatch, deleted, on_conflict: :nothing)
{:ok, {num_deleted, num_inserted}}
end)
"Processed #{num_inserted} dispatches (#{num_deleted - num_inserted} duplicate)"
Fetch confirmation phrase
latest_systems =
from(
s in SystemDispatch,
where: json_extract_path(s.payload, ["source"]) == "primary",
select: %{
system_id: s.system_id,
dispatched_at: s.dispatched_at,
marker_letter: s.marker_letter,
num: row_number() |> over(partition_by: s.system_id, order_by: [desc: s.dispatched_at])
}
)
from(
l in subquery(latest_systems),
where: l.num == 1,
group_by: l.num,
select: %{
phrase: fragment("STRING_AGG(?, '' order by ?)", l.marker_letter, l.dispatched_at)
}
)
|> Repo.one!()
|> Map.fetch!(:phrase)