Powered by AppSignal & Oban Pro

Day 15

2025/day15/day15.livemd

Day 15

# Configure postgrex to use the build-in JSON library before it's compiled
Application.put_env(:postgrex, :json_library, JSON)

Mix.install([
  {:ecto_sql, "~> 3.10"},
  {:postgrex, ">= 0.0.0"},
  {:kino, "~> 0.18.0"}
])

Kino.configure(inspect: [charlists: :as_lists])

Application.put_env(:myapp, Repo,
  url: "ecto://postgres:postgres@localhost/advent_of_sql_2025"
)

defmodule Repo do
  use Ecto.Repo, adapter: Ecto.Adapters.Postgres, otp_app: :myapp
end

{:ok, _pid} = Repo.start_link()

Setup

import Ecto.Query

defmodule IncomingDispatch do
  use Ecto.Schema

  @primary_key false
  schema "incoming_dispatches" do
    field :system_id, :string
    field :dispatched_at, :naive_datetime
    field :payload, :map
  end
end

defmodule SystemDispatch do
  use Ecto.Schema

  schema "system_dispatches" do
    field :system_id, :string
    field :dispatched_at, :naive_datetime
    field :payload, :map
    field :marker_letter, :string
  end
end

Process incoming dispatches

# Process incoming dispatches as an atomic operation

process_query =
  from(
    i in IncomingDispatch,
    select: map(i, [:system_id, :dispatched_at, :payload])
  )

{:ok, {num_deleted, num_inserted}} = Repo.transact(fn ->
  {num_deleted, deleted} = Repo.delete_all(process_query)
  {num_inserted, _inserted} = Repo.insert_all(SystemDispatch, deleted, on_conflict: :nothing)
  
  {:ok, {num_deleted, num_inserted}}
end)

"Processed #{num_inserted} dispatches (#{num_deleted - num_inserted} duplicate)"

Fetch confirmation phrase

latest_systems =
  from(
    s in SystemDispatch,
    where: json_extract_path(s.payload, ["source"]) == "primary",
    select: %{
      system_id: s.system_id,
      dispatched_at: s.dispatched_at,
      marker_letter: s.marker_letter,
      num: row_number() |> over(partition_by: s.system_id, order_by: [desc: s.dispatched_at])
    }
  )

from(
  l in subquery(latest_systems),
  where: l.num == 1,
  group_by: l.num,
  select: %{
    phrase: fragment("STRING_AGG(?, '' order by ?)", l.marker_letter, l.dispatched_at)
  }
)

|> Repo.one!()
|> Map.fetch!(:phrase)