Selecto Group-By and Aggregates Workbook
Setup 1) Install Dependencies
Run this cell first. It installs Selecto and related packages from Hex for this Livebook session.
selecto_dep = {:selecto, "~> 0.3.5"}
Mix.install([
selecto_dep,
{:postgrex, "~> 0.17"},
{:ecto_sql, "~> 3.11"},
{:jason, "~> 1.4"},
{:kino, "~> 0.12"}
])
IO.puts("Using Selecto dependency: #{inspect(selecto_dep)}")
Overview
This workbook focuses on aggregation patterns that are common in reporting and analytics:
-
Global totals (
COUNT,SUM,AVG,MIN,MAX) - Single-dimension grouping (for example, by status)
- Multi-dimension grouping (for example, country + status)
- Join-aware aggregates (for example, customer tier metrics)
-
Composable multi-
select/2aggregate query construction -
Top-N product metrics from
order_items
flowchart LR
base[Configure query]
selects[Add aggregate selects]
filters[Apply filters]
groups[Group dimensions]
order[Order by aggregate]
run[Execute and inspect]
base --> selects --> filters --> groups --> order --> run
graph LR
orders[orders]
customers[customers]
order_items[order_items]
products[products]
orders -->|customer_id to id| customers
orders -->|id to order_id| order_items
order_items -->|product_id to id| products
Before running, initialize the sample database:
cd selecto_examples
mix setup
Setup 2) Connect to Repo and Build Aggregate Domains
This cell defines a Repo plus two focused domains:
-
order_domainfor order-level aggregates and customer join dimensions -
order_items_domainfor product-level sales aggregates
defmodule AggregateLab.Repo do
use Ecto.Repo,
otp_app: :aggregate_lab_livebook,
adapter: Ecto.Adapters.Postgres
end
defmodule AggregateLab.Domains do
@moduledoc false
def order_domain do
%{
name: "Orders",
source: %{
source_table: "orders",
primary_key: :id,
fields: [:id, :order_number, :status, :total, :shipping_country, :inserted_at, :customer_id],
columns: %{
id: %{type: :integer},
order_number: %{type: :string},
status: %{type: :string},
total: %{type: :decimal},
shipping_country: %{type: :string},
inserted_at: %{type: :utc_datetime},
customer_id: %{type: :integer}
},
associations: %{
customer: %{field: :customer, queryable: :customers, owner_key: :customer_id, related_key: :id},
order_items: %{field: :order_items, queryable: :order_items, owner_key: :id, related_key: :order_id}
}
},
schemas: %{
customers: %{
source_table: "customers",
primary_key: :id,
fields: [:id, :name, :tier, :country, :active],
redact_fields: [],
columns: %{
id: %{type: :integer},
name: %{type: :string},
tier: %{type: :string},
country: %{type: :string},
active: %{type: :boolean}
}
},
order_items: %{
source_table: "order_items",
primary_key: :id,
fields: [:id, :order_id, :product_id, :quantity, :line_total],
redact_fields: [],
columns: %{
id: %{type: :integer},
order_id: %{type: :integer},
product_id: %{type: :integer},
quantity: %{type: :integer},
line_total: %{type: :decimal}
}
}
},
joins: %{
customer: %{
name: "Customer",
type: :left,
source: "customers",
on: [%{left: "customer_id", right: "id"}],
fields: %{
name: %{type: :string},
tier: %{type: :string},
country: %{type: :string},
active: %{type: :boolean}
}
},
order_items: %{
name: "Order Items",
type: :left,
source: "order_items",
on: [%{left: "id", right: "order_id"}],
fields: %{
product_id: %{type: :integer},
quantity: %{type: :integer},
line_total: %{type: :decimal}
}
}
},
default_selected: ["order_number", "status", "total", "inserted_at"],
default_order_by: [{"inserted_at", :desc}]
}
end
def order_items_domain do
%{
name: "OrderItems",
source: %{
source_table: "order_items",
primary_key: :id,
fields: [:id, :order_id, :product_id, :quantity, :unit_price, :line_total],
columns: %{
id: %{type: :integer},
order_id: %{type: :integer},
product_id: %{type: :integer},
quantity: %{type: :integer},
unit_price: %{type: :decimal},
line_total: %{type: :decimal}
},
associations: %{
product: %{field: :product, queryable: :products, owner_key: :product_id, related_key: :id},
order: %{field: :order, queryable: :orders, owner_key: :order_id, related_key: :id}
}
},
schemas: %{
products: %{
source_table: "products",
primary_key: :id,
fields: [:id, :name, :sku, :price],
redact_fields: [],
columns: %{
id: %{type: :integer},
name: %{type: :string},
sku: %{type: :string},
price: %{type: :decimal}
}
},
orders: %{
source_table: "orders",
primary_key: :id,
fields: [:id, :status, :shipping_country, :inserted_at],
redact_fields: [],
columns: %{
id: %{type: :integer},
status: %{type: :string},
shipping_country: %{type: :string},
inserted_at: %{type: :utc_datetime}
}
}
},
joins: %{
product: %{
name: "Product",
type: :left,
source: "products",
on: [%{left: "product_id", right: "id"}],
fields: %{
name: %{type: :string},
sku: %{type: :string}
}
},
order: %{
name: "Order",
type: :left,
source: "orders",
on: [%{left: "order_id", right: "id"}],
fields: %{
status: %{type: :string},
shipping_country: %{type: :string},
inserted_at: %{type: :utc_datetime}
}
}
}
}
end
end
repo_config = [
database: System.get_env("SELECTO_EXAMPLES_DB", "selecto_examples_dev"),
username: System.get_env("SELECTO_EXAMPLES_DB_USER", "postgres"),
password: System.get_env("SELECTO_EXAMPLES_DB_PASS", "postgres"),
hostname: System.get_env("SELECTO_EXAMPLES_DB_HOST", "localhost"),
port: String.to_integer(System.get_env("SELECTO_EXAMPLES_DB_PORT", "5432")),
pool_size: 5
]
case Process.whereis(AggregateLab.Repo) do
nil -> {:ok, _pid} = AggregateLab.Repo.start_link(repo_config)
_pid -> :ok
end
config = %{
repo: AggregateLab.Repo,
order_domain: AggregateLab.Domains.order_domain(),
order_items_domain: AggregateLab.Domains.order_items_domain()
}
{:ok, order_result} = Ecto.Adapters.SQL.query(config.repo, "select count(*) from orders", [])
{:ok, item_result} = Ecto.Adapters.SQL.query(config.repo, "select count(*) from order_items", [])
[[order_count]] = order_result.rows
[[item_count]] = item_result.rows
IO.puts("Connected. Orders: #{order_count}, Order items: #{item_count}")
config
Setup 3) Define Aggregate Helpers
This helper cell prints SQL/params and executes a query with a compact preview.
defmodule AggregateLab.Helpers do
@moduledoc false
def explain(label, query) do
{sql, params} = Selecto.to_sql(query)
IO.puts("\n=== #{label} ===")
IO.puts(String.trim(sql))
IO.puts("Params: #{inspect(params)}")
{sql, params}
end
def run(label, query, preview_count \\ 10) do
explain(label, query)
case Selecto.execute(query) do
{:ok, {rows, columns, aliases}} = ok ->
IO.puts("Rows: #{length(rows)}")
IO.puts("Columns: #{inspect(columns)}")
IO.puts("Aliases: #{inspect(aliases)}")
IO.inspect(Enum.take(rows, preview_count), label: "Preview (up to #{preview_count} rows)")
ok
{:error, error} = failure ->
IO.puts("Error: #{inspect(error)}")
failure
end
end
def money(nil), do: "$0.00"
def money(%Decimal{} = value), do: "$" <> (value |> Decimal.round(2) |> Decimal.to_string())
def money(value) when is_number(value), do: "$" <> :erlang.float_to_binary(value / 1, decimals: 2)
def money(value), do: inspect(value)
end
1) Global Totals (No Group By)
Start with a baseline aggregate query over all orders to get total count, distinct customers, and revenue metrics in one row.
global_totals_query =
Selecto.configure(config.order_domain, config.repo)
|> Selecto.select([
{:count, "*"},
{:count_distinct, "customer_id"},
{:sum, "total"},
{:avg, "total"},
{:min, "total"},
{:max, "total"}
])
AggregateLab.Helpers.run("Global totals", global_totals_query)
2) Orders by Status
Group by status to build a compact operational dashboard that shows volume and revenue by lifecycle stage.
status_summary_query =
Selecto.configure(config.order_domain, config.repo)
|> Selecto.select([
"status",
{:count, "*"},
{:sum, "total"},
{:avg, "total"}
])
|> Selecto.group_by(["status"])
|> Selecto.order_by({{:count, "*"}, :desc})
AggregateLab.Helpers.run("Order summary by status", status_summary_query)
3) Country and Status Matrix (Two-Dimension Group By)
Group across two dimensions (shipping_country and status) to inspect where order outcomes are concentrated.
country_status_query =
Selecto.configure(config.order_domain, config.repo)
|> Selecto.select([
"shipping_country",
"status",
{:count, "*"},
{:sum, "total"}
])
|> Selecto.group_by(["shipping_country", "status"])
|> Selecto.order_by({{:sum, "total"}, :desc})
|> Selecto.limit(20)
AggregateLab.Helpers.run("Country + status matrix", country_status_query)
4) Join-Aware Aggregates by Customer Tier
Use joined customer fields as grouping dimensions and filters while still aggregating order totals from the root table.
tier_country_summary_query =
Selecto.configure(config.order_domain, config.repo)
|> Selecto.select([
"customer.tier",
"customer.country",
{:count, "*"},
{:sum, "total"},
{:avg, "total"}
])
|> Selecto.filter({"customer.active", true})
|> Selecto.group_by(["customer.tier", "customer.country"])
|> Selecto.order_by({{:sum, "total"}, :desc})
|> Selecto.limit(15)
AggregateLab.Helpers.run("Join-aware tier + country summary", tier_country_summary_query)
5) Composable Aggregate Query with Multiple select/2 Calls
This cell intentionally builds the select list in stages to show composability: you can add dimensions and aggregates incrementally before execution.
composable_aggregate_query =
Selecto.configure(config.order_domain, config.repo)
|> Selecto.select(["status"])
|> Selecto.select([{:count, "*"}])
|> Selecto.select([{:sum, "total"}])
|> Selecto.select([{:avg, "total"}])
|> Selecto.group_by(["status"])
|> Selecto.order_by({{:sum, "total"}, :desc})
AggregateLab.Helpers.run("Composable multi-select aggregate", composable_aggregate_query)
6) Distinct Reach per Customer Tier
Pair COUNT(*) with COUNT DISTINCT to compare raw order volume against breadth (how many unique shipping countries each tier reaches).
distinct_reach_query =
Selecto.configure(config.order_domain, config.repo)
|> Selecto.select([
"customer.tier",
{:count, "*"},
{:count_distinct, "shipping_country"},
{:sum, "total"}
])
|> Selecto.group_by(["customer.tier"])
|> Selecto.order_by({{:sum, "total"}, :desc})
AggregateLab.Helpers.run("Distinct reach by tier", distinct_reach_query)
7) Top Products by Revenue from Order Items
Switch to the order_items domain to aggregate at line-item granularity and rank products by revenue and units sold.
top_products_query =
Selecto.configure(config.order_items_domain, config.repo)
|> Selecto.select([
"product.name",
"product.sku",
{:count_distinct, "order_id"},
{:sum, "quantity"},
{:sum, "line_total"}
])
|> Selecto.filter({"order.status", {:in, ["shipped", "delivered"]}})
|> Selecto.group_by(["product.name", "product.sku"])
|> Selecto.order_by({{:sum, "line_total"}, :desc})
|> Selecto.limit(10)
AggregateLab.Helpers.run("Top products by shipped/delivered revenue", top_products_query)
8) Side-by-Side Aggregate Comparison in Elixir
Run two aggregate queries (delivered-only and pending-only) and compare the resulting totals in Elixir for quick ad hoc analysis.
delivered_total_query =
Selecto.configure(config.order_domain, config.repo)
|> Selecto.select([{:count, "*"}, {:sum, "total"}, {:avg, "total"}])
|> Selecto.filter({"status", "delivered"})
pending_total_query =
Selecto.configure(config.order_domain, config.repo)
|> Selecto.select([{:count, "*"}, {:sum, "total"}, {:avg, "total"}])
|> Selecto.filter({"status", "pending"})
{:ok, {[delivered_row], _cols1, _aliases1}} = Selecto.execute(delivered_total_query)
{:ok, {[pending_row], _cols2, _aliases2}} = Selecto.execute(pending_total_query)
[delivered_count, delivered_sum, delivered_avg] = delivered_row
[pending_count, pending_sum, pending_avg] = pending_row
IO.puts("Delivered: #{delivered_count} orders, total #{AggregateLab.Helpers.money(delivered_sum)}, avg #{AggregateLab.Helpers.money(delivered_avg)}")
IO.puts("Pending: #{pending_count} orders, total #{AggregateLab.Helpers.money(pending_sum)}, avg #{AggregateLab.Helpers.money(pending_avg)}")
Next Steps
To extend this workbook, add:
- Time-bucketed aggregates (for example month-level trends) using SQL function selectors
- Pivot + aggregate flows to move from order-level to product-level analysis in one query
- Export-focused cells that transform aggregate rows into maps for charts