Powered by AppSignal & Oban Pro

Selecto Group-By and Aggregates Workbook

selecto_group_by_aggregates_workbook.livemd

Selecto Group-By and Aggregates Workbook

Setup 1) Install Dependencies

Run this cell first. It installs Selecto and related packages from Hex for this Livebook session.

selecto_dep = {:selecto, "~> 0.3.5"}

Mix.install([
  selecto_dep,
  {:postgrex, "~> 0.17"},
  {:ecto_sql, "~> 3.11"},
  {:jason, "~> 1.4"},
  {:kino, "~> 0.12"}
])

IO.puts("Using Selecto dependency: #{inspect(selecto_dep)}")

Overview

This workbook focuses on aggregation patterns that are common in reporting and analytics:

  1. Global totals (COUNT, SUM, AVG, MIN, MAX)
  2. Single-dimension grouping (for example, by status)
  3. Multi-dimension grouping (for example, country + status)
  4. Join-aware aggregates (for example, customer tier metrics)
  5. Composable multi-select/2 aggregate query construction
  6. Top-N product metrics from order_items
flowchart LR
  base[Configure query]
  selects[Add aggregate selects]
  filters[Apply filters]
  groups[Group dimensions]
  order[Order by aggregate]
  run[Execute and inspect]

  base --> selects --> filters --> groups --> order --> run
graph LR
  orders[orders]
  customers[customers]
  order_items[order_items]
  products[products]

  orders -->|customer_id to id| customers
  orders -->|id to order_id| order_items
  order_items -->|product_id to id| products

Before running, initialize the sample database:

cd selecto_examples
mix setup

Setup 2) Connect to Repo and Build Aggregate Domains

This cell defines a Repo plus two focused domains:

  1. order_domain for order-level aggregates and customer join dimensions
  2. order_items_domain for product-level sales aggregates
defmodule AggregateLab.Repo do
  use Ecto.Repo,
    otp_app: :aggregate_lab_livebook,
    adapter: Ecto.Adapters.Postgres
end

defmodule AggregateLab.Domains do
  @moduledoc false

  def order_domain do
    %{
      name: "Orders",
      source: %{
        source_table: "orders",
        primary_key: :id,
        fields: [:id, :order_number, :status, :total, :shipping_country, :inserted_at, :customer_id],
        columns: %{
          id: %{type: :integer},
          order_number: %{type: :string},
          status: %{type: :string},
          total: %{type: :decimal},
          shipping_country: %{type: :string},
          inserted_at: %{type: :utc_datetime},
          customer_id: %{type: :integer}
        },
        associations: %{
          customer: %{field: :customer, queryable: :customers, owner_key: :customer_id, related_key: :id},
          order_items: %{field: :order_items, queryable: :order_items, owner_key: :id, related_key: :order_id}
        }
      },
      schemas: %{
        customers: %{
          source_table: "customers",
          primary_key: :id,
          fields: [:id, :name, :tier, :country, :active],
          redact_fields: [],
          columns: %{
            id: %{type: :integer},
            name: %{type: :string},
            tier: %{type: :string},
            country: %{type: :string},
            active: %{type: :boolean}
          }
        },
        order_items: %{
          source_table: "order_items",
          primary_key: :id,
          fields: [:id, :order_id, :product_id, :quantity, :line_total],
          redact_fields: [],
          columns: %{
            id: %{type: :integer},
            order_id: %{type: :integer},
            product_id: %{type: :integer},
            quantity: %{type: :integer},
            line_total: %{type: :decimal}
          }
        }
      },
      joins: %{
        customer: %{
          name: "Customer",
          type: :left,
          source: "customers",
          on: [%{left: "customer_id", right: "id"}],
          fields: %{
            name: %{type: :string},
            tier: %{type: :string},
            country: %{type: :string},
            active: %{type: :boolean}
          }
        },
        order_items: %{
          name: "Order Items",
          type: :left,
          source: "order_items",
          on: [%{left: "id", right: "order_id"}],
          fields: %{
            product_id: %{type: :integer},
            quantity: %{type: :integer},
            line_total: %{type: :decimal}
          }
        }
      },
      default_selected: ["order_number", "status", "total", "inserted_at"],
      default_order_by: [{"inserted_at", :desc}]
    }
  end

  def order_items_domain do
    %{
      name: "OrderItems",
      source: %{
        source_table: "order_items",
        primary_key: :id,
        fields: [:id, :order_id, :product_id, :quantity, :unit_price, :line_total],
        columns: %{
          id: %{type: :integer},
          order_id: %{type: :integer},
          product_id: %{type: :integer},
          quantity: %{type: :integer},
          unit_price: %{type: :decimal},
          line_total: %{type: :decimal}
        },
        associations: %{
          product: %{field: :product, queryable: :products, owner_key: :product_id, related_key: :id},
          order: %{field: :order, queryable: :orders, owner_key: :order_id, related_key: :id}
        }
      },
      schemas: %{
        products: %{
          source_table: "products",
          primary_key: :id,
          fields: [:id, :name, :sku, :price],
          redact_fields: [],
          columns: %{
            id: %{type: :integer},
            name: %{type: :string},
            sku: %{type: :string},
            price: %{type: :decimal}
          }
        },
        orders: %{
          source_table: "orders",
          primary_key: :id,
          fields: [:id, :status, :shipping_country, :inserted_at],
          redact_fields: [],
          columns: %{
            id: %{type: :integer},
            status: %{type: :string},
            shipping_country: %{type: :string},
            inserted_at: %{type: :utc_datetime}
          }
        }
      },
      joins: %{
        product: %{
          name: "Product",
          type: :left,
          source: "products",
          on: [%{left: "product_id", right: "id"}],
          fields: %{
            name: %{type: :string},
            sku: %{type: :string}
          }
        },
        order: %{
          name: "Order",
          type: :left,
          source: "orders",
          on: [%{left: "order_id", right: "id"}],
          fields: %{
            status: %{type: :string},
            shipping_country: %{type: :string},
            inserted_at: %{type: :utc_datetime}
          }
        }
      }
    }
  end
end

repo_config = [
  database: System.get_env("SELECTO_EXAMPLES_DB", "selecto_examples_dev"),
  username: System.get_env("SELECTO_EXAMPLES_DB_USER", "postgres"),
  password: System.get_env("SELECTO_EXAMPLES_DB_PASS", "postgres"),
  hostname: System.get_env("SELECTO_EXAMPLES_DB_HOST", "localhost"),
  port: String.to_integer(System.get_env("SELECTO_EXAMPLES_DB_PORT", "5432")),
  pool_size: 5
]

case Process.whereis(AggregateLab.Repo) do
  nil -> {:ok, _pid} = AggregateLab.Repo.start_link(repo_config)
  _pid -> :ok
end

config = %{
  repo: AggregateLab.Repo,
  order_domain: AggregateLab.Domains.order_domain(),
  order_items_domain: AggregateLab.Domains.order_items_domain()
}

{:ok, order_result} = Ecto.Adapters.SQL.query(config.repo, "select count(*) from orders", [])
{:ok, item_result} = Ecto.Adapters.SQL.query(config.repo, "select count(*) from order_items", [])
[[order_count]] = order_result.rows
[[item_count]] = item_result.rows

IO.puts("Connected. Orders: #{order_count}, Order items: #{item_count}")

config

Setup 3) Define Aggregate Helpers

This helper cell prints SQL/params and executes a query with a compact preview.

defmodule AggregateLab.Helpers do
  @moduledoc false

  def explain(label, query) do
    {sql, params} = Selecto.to_sql(query)

    IO.puts("\n=== #{label} ===")
    IO.puts(String.trim(sql))
    IO.puts("Params: #{inspect(params)}")

    {sql, params}
  end

  def run(label, query, preview_count \\ 10) do
    explain(label, query)

    case Selecto.execute(query) do
      {:ok, {rows, columns, aliases}} = ok ->
        IO.puts("Rows: #{length(rows)}")
        IO.puts("Columns: #{inspect(columns)}")
        IO.puts("Aliases: #{inspect(aliases)}")
        IO.inspect(Enum.take(rows, preview_count), label: "Preview (up to #{preview_count} rows)")
        ok

      {:error, error} = failure ->
        IO.puts("Error: #{inspect(error)}")
        failure
    end
  end

  def money(nil), do: "$0.00"
  def money(%Decimal{} = value), do: "$" <> (value |> Decimal.round(2) |> Decimal.to_string())
  def money(value) when is_number(value), do: "$" <> :erlang.float_to_binary(value / 1, decimals: 2)
  def money(value), do: inspect(value)
end

1) Global Totals (No Group By)

Start with a baseline aggregate query over all orders to get total count, distinct customers, and revenue metrics in one row.

global_totals_query =
  Selecto.configure(config.order_domain, config.repo)
  |> Selecto.select([
    {:count, "*"},
    {:count_distinct, "customer_id"},
    {:sum, "total"},
    {:avg, "total"},
    {:min, "total"},
    {:max, "total"}
  ])

AggregateLab.Helpers.run("Global totals", global_totals_query)

2) Orders by Status

Group by status to build a compact operational dashboard that shows volume and revenue by lifecycle stage.

status_summary_query =
  Selecto.configure(config.order_domain, config.repo)
  |> Selecto.select([
    "status",
    {:count, "*"},
    {:sum, "total"},
    {:avg, "total"}
  ])
  |> Selecto.group_by(["status"])
  |> Selecto.order_by({{:count, "*"}, :desc})

AggregateLab.Helpers.run("Order summary by status", status_summary_query)

3) Country and Status Matrix (Two-Dimension Group By)

Group across two dimensions (shipping_country and status) to inspect where order outcomes are concentrated.

country_status_query =
  Selecto.configure(config.order_domain, config.repo)
  |> Selecto.select([
    "shipping_country",
    "status",
    {:count, "*"},
    {:sum, "total"}
  ])
  |> Selecto.group_by(["shipping_country", "status"])
  |> Selecto.order_by({{:sum, "total"}, :desc})
  |> Selecto.limit(20)

AggregateLab.Helpers.run("Country + status matrix", country_status_query)

4) Join-Aware Aggregates by Customer Tier

Use joined customer fields as grouping dimensions and filters while still aggregating order totals from the root table.

tier_country_summary_query =
  Selecto.configure(config.order_domain, config.repo)
  |> Selecto.select([
    "customer.tier",
    "customer.country",
    {:count, "*"},
    {:sum, "total"},
    {:avg, "total"}
  ])
  |> Selecto.filter({"customer.active", true})
  |> Selecto.group_by(["customer.tier", "customer.country"])
  |> Selecto.order_by({{:sum, "total"}, :desc})
  |> Selecto.limit(15)

AggregateLab.Helpers.run("Join-aware tier + country summary", tier_country_summary_query)

5) Composable Aggregate Query with Multiple select/2 Calls

This cell intentionally builds the select list in stages to show composability: you can add dimensions and aggregates incrementally before execution.

composable_aggregate_query =
  Selecto.configure(config.order_domain, config.repo)
  |> Selecto.select(["status"])
  |> Selecto.select([{:count, "*"}])
  |> Selecto.select([{:sum, "total"}])
  |> Selecto.select([{:avg, "total"}])
  |> Selecto.group_by(["status"])
  |> Selecto.order_by({{:sum, "total"}, :desc})

AggregateLab.Helpers.run("Composable multi-select aggregate", composable_aggregate_query)

6) Distinct Reach per Customer Tier

Pair COUNT(*) with COUNT DISTINCT to compare raw order volume against breadth (how many unique shipping countries each tier reaches).

distinct_reach_query =
  Selecto.configure(config.order_domain, config.repo)
  |> Selecto.select([
    "customer.tier",
    {:count, "*"},
    {:count_distinct, "shipping_country"},
    {:sum, "total"}
  ])
  |> Selecto.group_by(["customer.tier"])
  |> Selecto.order_by({{:sum, "total"}, :desc})

AggregateLab.Helpers.run("Distinct reach by tier", distinct_reach_query)

7) Top Products by Revenue from Order Items

Switch to the order_items domain to aggregate at line-item granularity and rank products by revenue and units sold.

top_products_query =
  Selecto.configure(config.order_items_domain, config.repo)
  |> Selecto.select([
    "product.name",
    "product.sku",
    {:count_distinct, "order_id"},
    {:sum, "quantity"},
    {:sum, "line_total"}
  ])
  |> Selecto.filter({"order.status", {:in, ["shipped", "delivered"]}})
  |> Selecto.group_by(["product.name", "product.sku"])
  |> Selecto.order_by({{:sum, "line_total"}, :desc})
  |> Selecto.limit(10)

AggregateLab.Helpers.run("Top products by shipped/delivered revenue", top_products_query)

8) Side-by-Side Aggregate Comparison in Elixir

Run two aggregate queries (delivered-only and pending-only) and compare the resulting totals in Elixir for quick ad hoc analysis.

delivered_total_query =
  Selecto.configure(config.order_domain, config.repo)
  |> Selecto.select([{:count, "*"}, {:sum, "total"}, {:avg, "total"}])
  |> Selecto.filter({"status", "delivered"})

pending_total_query =
  Selecto.configure(config.order_domain, config.repo)
  |> Selecto.select([{:count, "*"}, {:sum, "total"}, {:avg, "total"}])
  |> Selecto.filter({"status", "pending"})

{:ok, {[delivered_row], _cols1, _aliases1}} = Selecto.execute(delivered_total_query)
{:ok, {[pending_row], _cols2, _aliases2}} = Selecto.execute(pending_total_query)

[delivered_count, delivered_sum, delivered_avg] = delivered_row
[pending_count, pending_sum, pending_avg] = pending_row

IO.puts("Delivered: #{delivered_count} orders, total #{AggregateLab.Helpers.money(delivered_sum)}, avg #{AggregateLab.Helpers.money(delivered_avg)}")
IO.puts("Pending:   #{pending_count} orders, total #{AggregateLab.Helpers.money(pending_sum)}, avg #{AggregateLab.Helpers.money(pending_avg)}")

Next Steps

To extend this workbook, add:

  1. Time-bucketed aggregates (for example month-level trends) using SQL function selectors
  2. Pivot + aggregate flows to move from order-level to product-level analysis in one query
  3. Export-focused cells that transform aggregate rows into maps for charts