Selecto CTEs Workbook
Setup 1) Install Dependencies
Run this cell first. It installs Selecto and related packages from Hex for this Livebook session.
selecto_dep = {:selecto, "~> 0.3.5"}
Mix.install([
selecto_dep,
{:postgrex, "~> 0.17"},
{:ecto_sql, "~> 3.11"},
{:jason, "~> 1.4"},
{:kino, "~> 0.12"}
])
IO.puts("Using Selecto dependency: #{inspect(selecto_dep)}")
Overview
This workbook focuses on CTE workflows in Selecto:
-
Non-recursive CTE (
with_cte/4) for reusable aggregate subqueries - CTE-enriched result sets via explicit CTE joins
-
Recursive CTE (
with_recursive_cte/3) for hierarchy traversal -
Multiple CTE composition (
with_ctes/2) -
Shaped CTE output with
select_shape/2andexecute_shape/2
flowchart LR
base[Base query]
addcte[Attach CTE]
joincte[Join CTE into main query]
select[Select fields]
execute[Execute]
base --> addcte --> joincte --> select --> execute
graph LR
employees[employees]
manager[employees as manager]
tree[employee tree CTE]
employees -->|manager_id to id| manager
employees -->|id to id| tree
Before running, initialize the sample database:
cd selecto_examples
mix setup
Setup 2) Connect to Repo and Build Domains
This cell defines:
-
order_domainfor order-focused CTE examples -
employee_domainfor recursive hierarchy CTE examples
defmodule CteLab.Repo do
use Ecto.Repo,
otp_app: :cte_lab_livebook,
adapter: Ecto.Adapters.Postgres
end
defmodule CteLab.Domains do
@moduledoc false
def order_domain do
%{
name: "Orders",
source: %{
source_table: "orders",
primary_key: :id,
fields: [:id, :order_number, :status, :total, :customer_id, :inserted_at],
columns: %{
id: %{type: :integer},
order_number: %{type: :string},
status: %{type: :string},
total: %{type: :decimal},
customer_id: %{type: :integer},
inserted_at: %{type: :utc_datetime}
},
associations: %{
customer: %{field: :customer, queryable: :customers, owner_key: :customer_id, related_key: :id}
}
},
schemas: %{
customers: %{
source_table: "customers",
primary_key: :id,
fields: [:id, :name, :tier, :active],
redact_fields: [],
columns: %{
id: %{type: :integer},
name: %{type: :string},
tier: %{type: :string},
active: %{type: :boolean}
}
}
},
joins: %{
customer: %{
name: "Customer",
type: :left,
source: "customers",
on: [%{left: "customer_id", right: "id"}],
fields: %{
name: %{type: :string},
tier: %{type: :string},
active: %{type: :boolean}
}
}
},
default_selected: ["order_number", "status", "total", "inserted_at"],
default_order_by: [{"inserted_at", :desc}]
}
end
def employee_domain do
%{
name: "Employees",
source: %{
source_table: "employees",
primary_key: :id,
fields: [:id, :first_name, :last_name, :email, :title, :department, :salary, :manager_id, :active],
columns: %{
id: %{type: :integer},
first_name: %{type: :string},
last_name: %{type: :string},
email: %{type: :string},
title: %{type: :string},
department: %{type: :string},
salary: %{type: :decimal},
manager_id: %{type: :integer},
active: %{type: :boolean}
},
associations: %{
manager: %{field: :manager, queryable: :employees, owner_key: :manager_id, related_key: :id}
}
},
schemas: %{
employees: %{
source_table: "employees",
primary_key: :id,
fields: [:id, :first_name, :last_name, :email, :title, :department, :salary, :manager_id, :active],
redact_fields: [],
columns: %{
id: %{type: :integer},
first_name: %{type: :string},
last_name: %{type: :string},
email: %{type: :string},
title: %{type: :string},
department: %{type: :string},
salary: %{type: :decimal},
manager_id: %{type: :integer},
active: %{type: :boolean}
}
}
},
joins: %{
manager: %{
name: "Manager",
type: :left,
source: "employees",
on: [%{left: "manager_id", right: "id"}],
fields: %{
first_name: %{type: :string},
last_name: %{type: :string},
title: %{type: :string}
}
}
},
default_selected: ["first_name", "last_name", "title", "department"],
default_order_by: [{"last_name", :asc}]
}
end
end
repo_config = [
database: System.get_env("SELECTO_EXAMPLES_DB", "selecto_examples_dev"),
username: System.get_env("SELECTO_EXAMPLES_DB_USER", "postgres"),
password: System.get_env("SELECTO_EXAMPLES_DB_PASS", "postgres"),
hostname: System.get_env("SELECTO_EXAMPLES_DB_HOST", "localhost"),
port: String.to_integer(System.get_env("SELECTO_EXAMPLES_DB_PORT", "5432")),
pool_size: 5
]
case Process.whereis(CteLab.Repo) do
nil -> {:ok, _pid} = CteLab.Repo.start_link(repo_config)
_pid -> :ok
end
config = %{
repo: CteLab.Repo,
order_domain: CteLab.Domains.order_domain(),
employee_domain: CteLab.Domains.employee_domain()
}
{:ok, order_result} = Ecto.Adapters.SQL.query(config.repo, "select count(*) from orders", [])
{:ok, employee_result} = Ecto.Adapters.SQL.query(config.repo, "select count(*) from employees", [])
[[order_count]] = order_result.rows
[[employee_count]] = employee_result.rows
IO.puts("Connected. Orders: #{order_count}, Employees: #{employee_count}")
config
Setup 3) Define CTE Helpers
This helper cell prints SQL/params and executes the query with a compact preview.
defmodule CteLab.Helpers do
@moduledoc false
def explain(label, query) do
{sql, params} = Selecto.to_sql(query)
IO.puts("\n=== #{label} ===")
IO.puts(String.trim(sql))
IO.puts("Params: #{inspect(params)}")
IO.puts("Has WITH clause: #{String.contains?(String.downcase(sql), "with ")}")
{sql, params}
end
def run(label, query, preview_count \\ 10) do
explain(label, query)
case Selecto.execute(query) do
{:ok, {rows, columns, aliases}} = ok ->
IO.puts("Rows: #{length(rows)}")
IO.puts("Columns: #{inspect(columns)}")
IO.puts("Aliases: #{inspect(aliases)}")
IO.inspect(Enum.take(rows, preview_count), label: "Preview (up to #{preview_count} rows)")
ok
{:error, error} = failure ->
IO.puts("Error: #{inspect(error)}")
failure
end
end
def run_shape(label, query, preview_count \\ 10) do
explain(label, query)
IO.inspect(query.set.selection_shape, label: "Compiled shape plan", pretty: true, limit: :infinity)
case Selecto.execute_shape(query) do
{:ok, shaped_rows} = ok ->
IO.puts("Shaped rows: #{length(shaped_rows)}")
IO.inspect(Enum.take(shaped_rows, preview_count), label: "Shape preview (up to #{preview_count} rows)")
ok
{:error, error} = failure ->
IO.puts("Error: #{inspect(error)}")
failure
end
end
end
1) Non-Recursive CTE: Manager Team Metrics
Create a CTE that aggregates direct reports per manager, then join it back to employees to retrieve manager identity fields.
manager_metrics_query =
Selecto.configure(config.employee_domain, config.repo)
|> Selecto.with_cte("manager_team_metrics", fn ->
Selecto.configure(config.employee_domain, config.repo)
|> Selecto.select([
"manager_id",
{:count, "*"},
{:sum, "salary"}
])
|> Selecto.filter({"manager_id", :not_null})
|> Selecto.group_by(["manager_id"])
end, columns: ["manager_id", "direct_report_count", "team_salary_total"])
|> Selecto.join(:manager_team_metrics,
source: "manager_team_metrics",
type: :inner,
owner_key: :id,
related_key: :manager_id,
fields: %{
manager_id: %{type: :integer},
direct_report_count: %{type: :integer},
team_salary_total: %{type: :decimal}
}
)
|> Selecto.select([
"first_name",
"last_name",
"title",
"manager_team_metrics.direct_report_count",
"manager_team_metrics.team_salary_total"
])
|> Selecto.filter({"manager_team_metrics.direct_report_count", {:>=, 2}})
|> Selecto.order_by({"manager_team_metrics.direct_report_count", :desc})
|> Selecto.limit(10)
CteLab.Helpers.run("Non-recursive CTE: manager metrics", manager_metrics_query)
2) Non-Recursive CTE: Customer Lifetime Spend Buckets
Build a customer-level revenue CTE, join it to orders, and filter for high-value customers.
customer_spend_cte_query =
Selecto.configure(config.order_domain, config.repo)
|> Selecto.with_cte("customer_spend", fn ->
Selecto.configure(config.order_domain, config.repo)
|> Selecto.select([
"customer_id",
{:count, "*"},
{:sum, "total"}
])
|> Selecto.group_by(["customer_id"])
end, columns: ["customer_id", "order_count", "lifetime_spend"])
|> Selecto.join(:customer_spend,
source: "customer_spend",
type: :inner,
owner_key: :customer_id,
related_key: :customer_id,
fields: %{
customer_id: %{type: :integer},
order_count: %{type: :integer},
lifetime_spend: %{type: :decimal}
}
)
|> Selecto.select([
"order_number",
"status",
"customer.name",
"customer.tier",
"customer_spend.order_count",
"customer_spend.lifetime_spend"
])
|> Selecto.filter({"customer_spend.order_count", {:>=, 2}})
|> Selecto.order_by({"customer_spend.lifetime_spend", :desc})
|> Selecto.limit(10)
CteLab.Helpers.run("Non-recursive CTE: customer spend", customer_spend_cte_query)
3) Recursive CTE: Reporting Tree from an Anchor Employee
This example anchors on one email and expands the reporting tree recursively through manager_id.
anchor_email = "sarah.connor@company.com"
base_query = fn ->
Selecto.configure(config.employee_domain, config.repo)
|> Selecto.select([
"id",
"first_name",
"last_name",
"manager_id",
{:literal, 0}
])
|> Selecto.filter({"email", anchor_email})
end
recursive_query = fn _cte_ref ->
Selecto.configure(config.employee_domain, config.repo)
|> Selecto.join(:employee_tree,
source: "employee_tree",
type: :inner,
owner_key: :manager_id,
related_key: :id,
fields: %{
id: %{type: :integer},
first_name: %{type: :string},
last_name: %{type: :string},
manager_id: %{type: :integer},
depth: %{type: :integer}
}
)
|> Selecto.select([
"id",
"first_name",
"last_name",
"manager_id",
{:raw_sql, "employee_tree.depth + 1"}
])
|> Selecto.filter({"employee_tree.id", :not_null})
end
recursive_tree_query =
Selecto.configure(config.employee_domain, config.repo)
|> Selecto.with_recursive_cte("employee_tree",
base_query: base_query,
recursive_query: recursive_query,
columns: ["id", "first_name", "last_name", "manager_id", "depth"]
)
|> Selecto.join(:employee_tree,
source: "employee_tree",
type: :inner,
owner_key: :id,
related_key: :id,
fields: %{
id: %{type: :integer},
first_name: %{type: :string},
last_name: %{type: :string},
manager_id: %{type: :integer},
depth: %{type: :integer}
}
)
|> Selecto.select([
"employee_tree.depth",
"employee_tree.first_name",
"employee_tree.last_name",
"title",
"manager.first_name",
"manager.last_name"
])
|> Selecto.order_by({"employee_tree.depth", :asc})
|> Selecto.order_by({"employee_tree.last_name", :asc})
|> Selecto.limit(20)
CteLab.Helpers.run("Recursive CTE: employee reporting tree", recursive_tree_query)
4) Multiple CTEs with with_ctes/2
Compose two CTEs, attach both to the same query, and consume both through explicit joins.
alias Selecto.Advanced.CTE
high_value_orders_cte =
CTE.create_cte("high_value_orders", fn ->
Selecto.configure(config.order_domain, config.repo)
|> Selecto.select(["id", "customer_id", "total"])
|> Selecto.filter({"total", {:>, 500}})
end)
customer_order_counts_cte =
CTE.create_cte("customer_order_counts", fn ->
Selecto.configure(config.order_domain, config.repo)
|> Selecto.select(["customer_id", {:count, "*"}])
|> Selecto.group_by(["customer_id"])
end)
multi_cte_query =
Selecto.configure(config.order_domain, config.repo)
|> Selecto.with_ctes([high_value_orders_cte, customer_order_counts_cte])
|> Selecto.join(:high_value_orders,
source: "high_value_orders",
type: :inner,
owner_key: :id,
related_key: :id,
fields: %{
total: %{type: :decimal},
customer_id: %{type: :integer}
}
)
|> Selecto.join(:customer_order_counts,
source: "customer_order_counts",
type: :inner,
owner_key: :customer_id,
related_key: :customer_id,
fields: %{
customer_id: %{type: :integer},
count: %{type: :integer}
}
)
|> Selecto.select([
"order_number",
"status",
"customer.name",
"high_value_orders.total",
"customer_order_counts.count"
])
|> Selecto.filter({"customer_order_counts.count", {:>=, 2}})
|> Selecto.order_by({"high_value_orders.total", :desc})
|> Selecto.limit(10)
CteLab.Helpers.run("Multiple CTE composition", multi_cte_query)
5) Multiple CTEs with select_shape/2
Use the same multi-CTE pattern but return a structured, nested row shape so each result is easier to consume in Elixir.
alias Selecto.Advanced.CTE
high_value_orders_cte_shape =
CTE.create_cte("high_value_orders", fn ->
Selecto.configure(config.order_domain, config.repo)
|> Selecto.select(["id", "customer_id", "total"])
|> Selecto.filter({"total", {:>, 500}})
end)
customer_order_counts_cte_shape =
CTE.create_cte("customer_order_counts", fn ->
Selecto.configure(config.order_domain, config.repo)
|> Selecto.select(["customer_id", {:count, "*"}])
|> Selecto.group_by(["customer_id"])
end)
multi_cte_shape_query =
Selecto.configure(config.order_domain, config.repo)
|> Selecto.with_ctes([high_value_orders_cte_shape, customer_order_counts_cte_shape])
|> Selecto.join(:high_value_orders,
source: "high_value_orders",
type: :inner,
owner_key: :id,
related_key: :id,
fields: %{
total: %{type: :decimal},
customer_id: %{type: :integer}
}
)
|> Selecto.join(:customer_order_counts,
source: "customer_order_counts",
type: :inner,
owner_key: :customer_id,
related_key: :customer_id,
fields: %{
customer_id: %{type: :integer},
count: %{type: :integer}
}
)
|> Selecto.select_shape([
"order_number",
{"status", "customer.name"},
{"high_value_orders.total", "customer_order_counts.count"}
])
|> Selecto.filter({"customer_order_counts.count", {:>=, 2}})
|> Selecto.order_by({"high_value_orders.total", :desc})
|> Selecto.limit(10)
{:ok, shaped_rows} = CteLab.Helpers.run_shape("Multiple CTE composition with select_shape", multi_cte_shape_query)
Enum.each(shaped_rows, fn [order_number, {status, customer_name}, {order_total, repeat_count}] ->
IO.puts(
"order=#{order_number} status=#{status} customer=#{customer_name} " <>
"cte_total=#{inspect(order_total)} repeat_orders=#{repeat_count}"
)
end)
:ok
Next Steps
To extend this CTE workbook, add:
- A CTE feeding a pivot query for post-filter pivot analysis
- A CTE that pre-aggregates per month and then applies window functions
-
A section comparing
with_cte/4versus inline subqueries for readability and maintainability