Powered by AppSignal & Oban Pro

Selecto CTEs Workbook

selecto_ctes_workbook.livemd

Selecto CTEs Workbook

Setup 1) Install Dependencies

Run this cell first. It installs Selecto and related packages from Hex for this Livebook session.

selecto_dep = {:selecto, "~> 0.3.5"}

Mix.install([
  selecto_dep,
  {:postgrex, "~> 0.17"},
  {:ecto_sql, "~> 3.11"},
  {:jason, "~> 1.4"},
  {:kino, "~> 0.12"}
])

IO.puts("Using Selecto dependency: #{inspect(selecto_dep)}")

Overview

This workbook focuses on CTE workflows in Selecto:

  1. Non-recursive CTE (with_cte/4) for reusable aggregate subqueries
  2. CTE-enriched result sets via explicit CTE joins
  3. Recursive CTE (with_recursive_cte/3) for hierarchy traversal
  4. Multiple CTE composition (with_ctes/2)
  5. Shaped CTE output with select_shape/2 and execute_shape/2
flowchart LR
  base[Base query]
  addcte[Attach CTE]
  joincte[Join CTE into main query]
  select[Select fields]
  execute[Execute]

  base --> addcte --> joincte --> select --> execute
graph LR
  employees[employees]
  manager[employees as manager]
  tree[employee tree CTE]

  employees -->|manager_id to id| manager
  employees -->|id to id| tree

Before running, initialize the sample database:

cd selecto_examples
mix setup

Setup 2) Connect to Repo and Build Domains

This cell defines:

  1. order_domain for order-focused CTE examples
  2. employee_domain for recursive hierarchy CTE examples
defmodule CteLab.Repo do
  use Ecto.Repo,
    otp_app: :cte_lab_livebook,
    adapter: Ecto.Adapters.Postgres
end

defmodule CteLab.Domains do
  @moduledoc false

  def order_domain do
    %{
      name: "Orders",
      source: %{
        source_table: "orders",
        primary_key: :id,
        fields: [:id, :order_number, :status, :total, :customer_id, :inserted_at],
        columns: %{
          id: %{type: :integer},
          order_number: %{type: :string},
          status: %{type: :string},
          total: %{type: :decimal},
          customer_id: %{type: :integer},
          inserted_at: %{type: :utc_datetime}
        },
        associations: %{
          customer: %{field: :customer, queryable: :customers, owner_key: :customer_id, related_key: :id}
        }
      },
      schemas: %{
        customers: %{
          source_table: "customers",
          primary_key: :id,
          fields: [:id, :name, :tier, :active],
          redact_fields: [],
          columns: %{
            id: %{type: :integer},
            name: %{type: :string},
            tier: %{type: :string},
            active: %{type: :boolean}
          }
        }
      },
      joins: %{
        customer: %{
          name: "Customer",
          type: :left,
          source: "customers",
          on: [%{left: "customer_id", right: "id"}],
          fields: %{
            name: %{type: :string},
            tier: %{type: :string},
            active: %{type: :boolean}
          }
        }
      },
      default_selected: ["order_number", "status", "total", "inserted_at"],
      default_order_by: [{"inserted_at", :desc}]
    }
  end

  def employee_domain do
    %{
      name: "Employees",
      source: %{
        source_table: "employees",
        primary_key: :id,
        fields: [:id, :first_name, :last_name, :email, :title, :department, :salary, :manager_id, :active],
        columns: %{
          id: %{type: :integer},
          first_name: %{type: :string},
          last_name: %{type: :string},
          email: %{type: :string},
          title: %{type: :string},
          department: %{type: :string},
          salary: %{type: :decimal},
          manager_id: %{type: :integer},
          active: %{type: :boolean}
        },
        associations: %{
          manager: %{field: :manager, queryable: :employees, owner_key: :manager_id, related_key: :id}
        }
      },
      schemas: %{
        employees: %{
          source_table: "employees",
          primary_key: :id,
          fields: [:id, :first_name, :last_name, :email, :title, :department, :salary, :manager_id, :active],
          redact_fields: [],
          columns: %{
            id: %{type: :integer},
            first_name: %{type: :string},
            last_name: %{type: :string},
            email: %{type: :string},
            title: %{type: :string},
            department: %{type: :string},
            salary: %{type: :decimal},
            manager_id: %{type: :integer},
            active: %{type: :boolean}
          }
        }
      },
      joins: %{
        manager: %{
          name: "Manager",
          type: :left,
          source: "employees",
          on: [%{left: "manager_id", right: "id"}],
          fields: %{
            first_name: %{type: :string},
            last_name: %{type: :string},
            title: %{type: :string}
          }
        }
      },
      default_selected: ["first_name", "last_name", "title", "department"],
      default_order_by: [{"last_name", :asc}]
    }
  end
end

repo_config = [
  database: System.get_env("SELECTO_EXAMPLES_DB", "selecto_examples_dev"),
  username: System.get_env("SELECTO_EXAMPLES_DB_USER", "postgres"),
  password: System.get_env("SELECTO_EXAMPLES_DB_PASS", "postgres"),
  hostname: System.get_env("SELECTO_EXAMPLES_DB_HOST", "localhost"),
  port: String.to_integer(System.get_env("SELECTO_EXAMPLES_DB_PORT", "5432")),
  pool_size: 5
]

case Process.whereis(CteLab.Repo) do
  nil -> {:ok, _pid} = CteLab.Repo.start_link(repo_config)
  _pid -> :ok
end

config = %{
  repo: CteLab.Repo,
  order_domain: CteLab.Domains.order_domain(),
  employee_domain: CteLab.Domains.employee_domain()
}

{:ok, order_result} = Ecto.Adapters.SQL.query(config.repo, "select count(*) from orders", [])
{:ok, employee_result} = Ecto.Adapters.SQL.query(config.repo, "select count(*) from employees", [])
[[order_count]] = order_result.rows
[[employee_count]] = employee_result.rows

IO.puts("Connected. Orders: #{order_count}, Employees: #{employee_count}")

config

Setup 3) Define CTE Helpers

This helper cell prints SQL/params and executes the query with a compact preview.

defmodule CteLab.Helpers do
  @moduledoc false

  def explain(label, query) do
    {sql, params} = Selecto.to_sql(query)

    IO.puts("\n=== #{label} ===")
    IO.puts(String.trim(sql))
    IO.puts("Params: #{inspect(params)}")
    IO.puts("Has WITH clause: #{String.contains?(String.downcase(sql), "with ")}")

    {sql, params}
  end

  def run(label, query, preview_count \\ 10) do
    explain(label, query)

    case Selecto.execute(query) do
      {:ok, {rows, columns, aliases}} = ok ->
        IO.puts("Rows: #{length(rows)}")
        IO.puts("Columns: #{inspect(columns)}")
        IO.puts("Aliases: #{inspect(aliases)}")
        IO.inspect(Enum.take(rows, preview_count), label: "Preview (up to #{preview_count} rows)")
        ok

      {:error, error} = failure ->
        IO.puts("Error: #{inspect(error)}")
        failure
    end
  end

  def run_shape(label, query, preview_count \\ 10) do
    explain(label, query)
    IO.inspect(query.set.selection_shape, label: "Compiled shape plan", pretty: true, limit: :infinity)

    case Selecto.execute_shape(query) do
      {:ok, shaped_rows} = ok ->
        IO.puts("Shaped rows: #{length(shaped_rows)}")
        IO.inspect(Enum.take(shaped_rows, preview_count), label: "Shape preview (up to #{preview_count} rows)")
        ok

      {:error, error} = failure ->
        IO.puts("Error: #{inspect(error)}")
        failure
    end
  end
end

1) Non-Recursive CTE: Manager Team Metrics

Create a CTE that aggregates direct reports per manager, then join it back to employees to retrieve manager identity fields.

manager_metrics_query =
  Selecto.configure(config.employee_domain, config.repo)
  |> Selecto.with_cte("manager_team_metrics", fn ->
    Selecto.configure(config.employee_domain, config.repo)
    |> Selecto.select([
      "manager_id",
      {:count, "*"},
      {:sum, "salary"}
    ])
    |> Selecto.filter({"manager_id", :not_null})
    |> Selecto.group_by(["manager_id"])
  end, columns: ["manager_id", "direct_report_count", "team_salary_total"])
  |> Selecto.join(:manager_team_metrics,
    source: "manager_team_metrics",
    type: :inner,
    owner_key: :id,
    related_key: :manager_id,
    fields: %{
      manager_id: %{type: :integer},
      direct_report_count: %{type: :integer},
      team_salary_total: %{type: :decimal}
    }
  )
  |> Selecto.select([
    "first_name",
    "last_name",
    "title",
    "manager_team_metrics.direct_report_count",
    "manager_team_metrics.team_salary_total"
  ])
  |> Selecto.filter({"manager_team_metrics.direct_report_count", {:>=, 2}})
  |> Selecto.order_by({"manager_team_metrics.direct_report_count", :desc})
  |> Selecto.limit(10)

CteLab.Helpers.run("Non-recursive CTE: manager metrics", manager_metrics_query)

2) Non-Recursive CTE: Customer Lifetime Spend Buckets

Build a customer-level revenue CTE, join it to orders, and filter for high-value customers.

customer_spend_cte_query =
  Selecto.configure(config.order_domain, config.repo)
  |> Selecto.with_cte("customer_spend", fn ->
    Selecto.configure(config.order_domain, config.repo)
    |> Selecto.select([
      "customer_id",
      {:count, "*"},
      {:sum, "total"}
    ])
    |> Selecto.group_by(["customer_id"])
  end, columns: ["customer_id", "order_count", "lifetime_spend"])
  |> Selecto.join(:customer_spend,
    source: "customer_spend",
    type: :inner,
    owner_key: :customer_id,
    related_key: :customer_id,
    fields: %{
      customer_id: %{type: :integer},
      order_count: %{type: :integer},
      lifetime_spend: %{type: :decimal}
    }
  )
  |> Selecto.select([
    "order_number",
    "status",
    "customer.name",
    "customer.tier",
    "customer_spend.order_count",
    "customer_spend.lifetime_spend"
  ])
  |> Selecto.filter({"customer_spend.order_count", {:>=, 2}})
  |> Selecto.order_by({"customer_spend.lifetime_spend", :desc})
  |> Selecto.limit(10)

CteLab.Helpers.run("Non-recursive CTE: customer spend", customer_spend_cte_query)

3) Recursive CTE: Reporting Tree from an Anchor Employee

This example anchors on one email and expands the reporting tree recursively through manager_id.

anchor_email = "sarah.connor@company.com"

base_query = fn ->
  Selecto.configure(config.employee_domain, config.repo)
  |> Selecto.select([
    "id",
    "first_name",
    "last_name",
    "manager_id",
    {:literal, 0}
  ])
  |> Selecto.filter({"email", anchor_email})
end

recursive_query = fn _cte_ref ->
  Selecto.configure(config.employee_domain, config.repo)
  |> Selecto.join(:employee_tree,
    source: "employee_tree",
    type: :inner,
    owner_key: :manager_id,
    related_key: :id,
    fields: %{
      id: %{type: :integer},
      first_name: %{type: :string},
      last_name: %{type: :string},
      manager_id: %{type: :integer},
      depth: %{type: :integer}
    }
  )
  |> Selecto.select([
    "id",
    "first_name",
    "last_name",
    "manager_id",
    {:raw_sql, "employee_tree.depth + 1"}
  ])
  |> Selecto.filter({"employee_tree.id", :not_null})
end

recursive_tree_query =
  Selecto.configure(config.employee_domain, config.repo)
  |> Selecto.with_recursive_cte("employee_tree",
    base_query: base_query,
    recursive_query: recursive_query,
    columns: ["id", "first_name", "last_name", "manager_id", "depth"]
  )
  |> Selecto.join(:employee_tree,
    source: "employee_tree",
    type: :inner,
    owner_key: :id,
    related_key: :id,
    fields: %{
      id: %{type: :integer},
      first_name: %{type: :string},
      last_name: %{type: :string},
      manager_id: %{type: :integer},
      depth: %{type: :integer}
    }
  )
  |> Selecto.select([
    "employee_tree.depth",
    "employee_tree.first_name",
    "employee_tree.last_name",
    "title",
    "manager.first_name",
    "manager.last_name"
  ])
  |> Selecto.order_by({"employee_tree.depth", :asc})
  |> Selecto.order_by({"employee_tree.last_name", :asc})
  |> Selecto.limit(20)

CteLab.Helpers.run("Recursive CTE: employee reporting tree", recursive_tree_query)

4) Multiple CTEs with with_ctes/2

Compose two CTEs, attach both to the same query, and consume both through explicit joins.

alias Selecto.Advanced.CTE

high_value_orders_cte =
  CTE.create_cte("high_value_orders", fn ->
    Selecto.configure(config.order_domain, config.repo)
    |> Selecto.select(["id", "customer_id", "total"])
    |> Selecto.filter({"total", {:>, 500}})
  end)

customer_order_counts_cte =
  CTE.create_cte("customer_order_counts", fn ->
    Selecto.configure(config.order_domain, config.repo)
    |> Selecto.select(["customer_id", {:count, "*"}])
    |> Selecto.group_by(["customer_id"])
  end)

multi_cte_query =
  Selecto.configure(config.order_domain, config.repo)
  |> Selecto.with_ctes([high_value_orders_cte, customer_order_counts_cte])
  |> Selecto.join(:high_value_orders,
    source: "high_value_orders",
    type: :inner,
    owner_key: :id,
    related_key: :id,
    fields: %{
      total: %{type: :decimal},
      customer_id: %{type: :integer}
    }
  )
  |> Selecto.join(:customer_order_counts,
    source: "customer_order_counts",
    type: :inner,
    owner_key: :customer_id,
    related_key: :customer_id,
    fields: %{
      customer_id: %{type: :integer},
      count: %{type: :integer}
    }
  )
  |> Selecto.select([
    "order_number",
    "status",
    "customer.name",
    "high_value_orders.total",
    "customer_order_counts.count"
  ])
  |> Selecto.filter({"customer_order_counts.count", {:>=, 2}})
  |> Selecto.order_by({"high_value_orders.total", :desc})
  |> Selecto.limit(10)

CteLab.Helpers.run("Multiple CTE composition", multi_cte_query)

5) Multiple CTEs with select_shape/2

Use the same multi-CTE pattern but return a structured, nested row shape so each result is easier to consume in Elixir.

alias Selecto.Advanced.CTE

high_value_orders_cte_shape =
  CTE.create_cte("high_value_orders", fn ->
    Selecto.configure(config.order_domain, config.repo)
    |> Selecto.select(["id", "customer_id", "total"])
    |> Selecto.filter({"total", {:>, 500}})
  end)

customer_order_counts_cte_shape =
  CTE.create_cte("customer_order_counts", fn ->
    Selecto.configure(config.order_domain, config.repo)
    |> Selecto.select(["customer_id", {:count, "*"}])
    |> Selecto.group_by(["customer_id"])
  end)

multi_cte_shape_query =
  Selecto.configure(config.order_domain, config.repo)
  |> Selecto.with_ctes([high_value_orders_cte_shape, customer_order_counts_cte_shape])
  |> Selecto.join(:high_value_orders,
    source: "high_value_orders",
    type: :inner,
    owner_key: :id,
    related_key: :id,
    fields: %{
      total: %{type: :decimal},
      customer_id: %{type: :integer}
    }
  )
  |> Selecto.join(:customer_order_counts,
    source: "customer_order_counts",
    type: :inner,
    owner_key: :customer_id,
    related_key: :customer_id,
    fields: %{
      customer_id: %{type: :integer},
      count: %{type: :integer}
    }
  )
  |> Selecto.select_shape([
    "order_number",
    {"status", "customer.name"},
    {"high_value_orders.total", "customer_order_counts.count"}
  ])
  |> Selecto.filter({"customer_order_counts.count", {:>=, 2}})
  |> Selecto.order_by({"high_value_orders.total", :desc})
  |> Selecto.limit(10)

{:ok, shaped_rows} = CteLab.Helpers.run_shape("Multiple CTE composition with select_shape", multi_cte_shape_query)

Enum.each(shaped_rows, fn [order_number, {status, customer_name}, {order_total, repeat_count}] ->
  IO.puts(
    "order=#{order_number} status=#{status} customer=#{customer_name} " <>
      "cte_total=#{inspect(order_total)} repeat_orders=#{repeat_count}"
  )
end)

:ok

Next Steps

To extend this CTE workbook, add:

  1. A CTE feeding a pivot query for post-filter pivot analysis
  2. A CTE that pre-aggregates per month and then applies window functions
  3. A section comparing with_cte/4 versus inline subqueries for readability and maintainability