Powered by AppSignal & Oban Pro

Selecto Set Operations Workbook

selecto_set_operations_workbook.livemd

Selecto Set Operations Workbook

Setup 1) Install Dependencies

Run this cell first. It installs Selecto and related packages from Hex for this Livebook session.

selecto_dep = {:selecto, "~> 0.3.5"}

Mix.install([
  selecto_dep,
  {:postgrex, "~> 0.17"},
  {:ecto_sql, "~> 3.11"},
  {:jason, "~> 1.4"},
  {:kino, "~> 0.12"}
])

IO.puts("Using Selecto dependency: #{inspect(selecto_dep)}")

Overview

Set operations combine two compatible query shapes.

  1. union/3 for A ∪ B
  2. intersect/3 for A ∩ B
  3. except/3 for A − B
  4. all: true variants to keep duplicates
flowchart LR
  a[Query A]
  b[Query B]
  compat[Same selected columns]
  setop[UNION / INTERSECT / EXCEPT]
  out[Combined result set]

  a --> compat
  b --> compat
  compat --> setop --> out
graph TD
  cohortA[Employees with manager]
  cohortB[Employees with direct reports]
  union[UNION: A or B]
  inter[INTERSECT: A and B]
  exceptA[EXCEPT: A not B]

  cohortA --> union
  cohortB --> union
  cohortA --> inter
  cohortB --> inter
  cohortA --> exceptA

Before running, initialize the sample database:

cd selecto_examples
mix setup

Setup 2) Connect to Repo and Build Employee Domain

This cell defines a local Repo and employee domain used to build compatible cohorts.

defmodule SetOpsLab.Repo do
  use Ecto.Repo,
    otp_app: :set_ops_lab_livebook,
    adapter: Ecto.Adapters.Postgres
end

defmodule SetOpsLab.EmployeeDomain do
  @moduledoc false

  def domain do
    %{
      name: "Employees",
      source: %{
        source_table: "employees",
        primary_key: :id,
        fields: [:id, :first_name, :last_name, :title, :department, :manager_id, :active],
        columns: %{
          id: %{type: :integer},
          first_name: %{type: :string},
          last_name: %{type: :string},
          title: %{type: :string},
          department: %{type: :string},
          manager_id: %{type: :integer},
          active: %{type: :boolean}
        },
        associations: %{}
      },
      schemas: %{},
      joins: %{},
      default_selected: ["id", "first_name", "last_name", "title"],
      default_order_by: [{"id", :asc}]
    }
  end
end

repo_config = [
  database: System.get_env("SELECTO_EXAMPLES_DB", "selecto_examples_dev"),
  username: System.get_env("SELECTO_EXAMPLES_DB_USER", "postgres"),
  password: System.get_env("SELECTO_EXAMPLES_DB_PASS", "postgres"),
  hostname: System.get_env("SELECTO_EXAMPLES_DB_HOST", "localhost"),
  port: String.to_integer(System.get_env("SELECTO_EXAMPLES_DB_PORT", "5432")),
  pool_size: 5
]

case Process.whereis(SetOpsLab.Repo) do
  nil -> {:ok, _pid} = SetOpsLab.Repo.start_link(repo_config)
  _pid -> :ok
end

config = %{
  repo: SetOpsLab.Repo,
  employee_domain: SetOpsLab.EmployeeDomain.domain()
}

{:ok, result} = Ecto.Adapters.SQL.query(config.repo, "select count(*) from employees", [])
[[employee_count]] = result.rows
IO.puts("Connected. Employees in dataset: #{employee_count}")

config

Setup 3) Define Set-Op Helpers

This helper cell prints SQL/params and executes a query with a compact preview.

defmodule SetOpsLab.Helpers do
  @moduledoc false

  def run(label, meaning, query, preview_count \\ 10) do
    IO.puts("Meaning: #{meaning}")
    Selecto.Livebook.run(label, query, preview_count: preview_count)
  end
end

1) Build Compatible Cohorts

Both cohorts select the same columns so they can participate in set operations.

employees_with_manager =
  Selecto.configure(config.employee_domain, config.repo)
  |> Selecto.select(["id", "first_name", "last_name", "title"])
  |> Selecto.filter({"manager_id", :not_null})

employees_with_direct_reports =
  Selecto.configure(config.employee_domain, config.repo)
  |> Selecto.join(:direct_report,
    source: "employees",
    type: :inner,
    owner_key: :id,
    related_key: :manager_id,
    fields: %{
      id: %{type: :integer}
    }
  )
  |> Selecto.filter({"direct_report.id", :not_null})
  |> Selecto.select(["id", "first_name", "last_name", "title"])
  |> Selecto.group_by(["id", "first_name", "last_name", "title"])

:ok

2) UNION and UNION ALL

UNION de-duplicates; UNION ALL keeps duplicates.

union_query =
  Selecto.union(employees_with_manager, employees_with_direct_reports)

union_all_query =
  Selecto.union(employees_with_manager, employees_with_direct_reports, all: true)

SetOpsLab.Helpers.run(
  "UNION",
  "Employees who either report to someone or manage at least one direct report.",
  union_query
)

SetOpsLab.Helpers.run(
  "UNION ALL",
  "Same cohorts but keep duplicates (middle managers may appear twice).",
  union_all_query
)

3) INTERSECT

INTERSECT returns employees in both cohorts.

intersect_query =
  Selecto.intersect(employees_with_manager, employees_with_direct_reports)

SetOpsLab.Helpers.run(
  "INTERSECT",
  "Middle managers: employees who report upward and also manage others.",
  intersect_query
)

4) EXCEPT (Directional)

Direction matters: A EXCEPT B is different from B EXCEPT A.

except_a_minus_b =
  Selecto.except(employees_with_manager, employees_with_direct_reports)

except_b_minus_a =
  Selecto.except(employees_with_direct_reports, employees_with_manager)

SetOpsLab.Helpers.run(
  "EXCEPT (A - B)",
  "Employees with managers who do not manage anyone (individual contributors).",
  except_a_minus_b
)

SetOpsLab.Helpers.run(
  "EXCEPT (B - A)",
  "Employees who manage others but do not report upward (top-level leadership).",
  except_b_minus_a
)

5) Summary Runner

Run all five comparisons in sequence to inspect SQL and row counts side by side.

set_queries = [
  {
    "UNION",
    "A ∪ B",
    Selecto.union(employees_with_manager, employees_with_direct_reports)
  },
  {
    "UNION ALL",
    "A ∪ B (duplicates kept)",
    Selecto.union(employees_with_manager, employees_with_direct_reports, all: true)
  },
  {
    "INTERSECT",
    "A ∩ B",
    Selecto.intersect(employees_with_manager, employees_with_direct_reports)
  },
  {
    "EXCEPT (A - B)",
    "A minus B",
    Selecto.except(employees_with_manager, employees_with_direct_reports)
  },
  {
    "EXCEPT (B - A)",
    "B minus A",
    Selecto.except(employees_with_direct_reports, employees_with_manager)
  }
]

Enum.each(set_queries, fn {label, meaning, query} ->
  SetOpsLab.Helpers.run(label, meaning, query, 5)
end)

:ok

6) Cross-Domain Set-Op with column_mapping

Use column_mapping to union compatible value types across domains with different field names.

customer_domain_for_set_ops = %{
  name: "Customers",
  source: %{
    source_table: "customers",
    primary_key: :id,
    fields: [:id, :name, :email, :country, :active],
    columns: %{
      id: %{type: :integer},
      name: %{type: :string},
      email: %{type: :string},
      country: %{type: :string},
      active: %{type: :boolean}
    },
    associations: %{}
  },
  schemas: %{},
  joins: %{}
}

employees_identity_query =
  Selecto.configure(config.employee_domain, config.repo)
  |> Selecto.select(["id", "first_name", "last_name", "department"])
  |> Selecto.filter({"active", true})
  |> Selecto.limit(25)

customers_identity_query =
  Selecto.configure(customer_domain_for_set_ops, config.repo)
  |> Selecto.select(["id", "name", "email", "country"])
  |> Selecto.filter({"active", true})
  |> Selecto.limit(25)

cross_domain_union_query =
  Selecto.union(
    employees_identity_query,
    customers_identity_query,
    all: true,
    column_mapping: [
      {"first_name", "name"},
      {"last_name", "email"},
      {"department", "country"}
    ]
  )

SetOpsLab.Helpers.run(
  "Cross-domain UNION ALL with column_mapping",
  "Employee identity rows and customer identity rows combined via mapped columns.",
  cross_domain_union_query,
  10
)

7) Deduplicating vs Non-Deduplicating Timing Snapshot

Compare execution timing for UNION and UNION ALL over repeated runs.

build_base_cohorts = fn ->
  employees_with_manager_local =
    Selecto.configure(config.employee_domain, config.repo)
    |> Selecto.select(["id", "first_name", "last_name", "title"])
    |> Selecto.filter({"manager_id", :not_null})

  employees_with_direct_reports_local =
    Selecto.configure(config.employee_domain, config.repo)
    |> Selecto.join(:direct_report,
      source: "employees",
      type: :inner,
      owner_key: :id,
      related_key: :manager_id,
      fields: %{id: %{type: :integer}}
    )
    |> Selecto.filter({"direct_report.id", :not_null})
    |> Selecto.select(["id", "first_name", "last_name", "title"])
    |> Selecto.group_by(["id", "first_name", "last_name", "title"])

  {
    Selecto.union(employees_with_manager_local, employees_with_direct_reports_local),
    Selecto.union(employees_with_manager_local, employees_with_direct_reports_local, all: true)
  }
end

{bench_union_query, bench_union_all_query} = build_base_cohorts.()

benchmark = fn label, query, iterations ->
  samples =
    Enum.map(1..iterations, fn _ ->
      {microseconds, result} = :timer.tc(fn -> Selecto.execute(query) end)

      row_count =
        case result do
          {:ok, {rows, _, _}} -> length(rows)
          _ -> :error
        end

      {microseconds / 1000.0, row_count}
    end)

  ms_samples = Enum.map(samples, fn {ms, _} -> ms end)
  avg_ms = Enum.sum(ms_samples) / max(length(ms_samples), 1)

  IO.puts("\n#{label}")
  IO.puts("avg ms over #{iterations} runs: #{Float.round(avg_ms, 3)}")
  IO.inspect(Enum.take(samples, 3), label: "sample runs (ms, row_count)")
end

benchmark.("UNION (deduplicating)", bench_union_query, 6)
benchmark.("UNION ALL (non-deduplicating)", bench_union_all_query, 6)

8) Set Operations with CTE-Staged Cohorts

Build cohorts through CTEs first, then combine them with set operations.

staged_has_manager_query =
  Selecto.configure(config.employee_domain, config.repo)
  |> Selecto.with_cte("has_manager_ids", fn ->
    Selecto.configure(config.employee_domain, config.repo)
    |> Selecto.select(["id"])
    |> Selecto.filter({"manager_id", :not_null})
  end, columns: ["id"])
  |> Selecto.join(:has_manager_ids,
    source: "has_manager_ids",
    type: :inner,
    owner_key: :id,
    related_key: :id,
    fields: %{id: %{type: :integer}}
  )
  |> Selecto.select(["id", "first_name", "last_name", "title"])

staged_is_manager_query =
  Selecto.configure(config.employee_domain, config.repo)
  |> Selecto.with_cte("manager_ids", fn ->
    Selecto.configure(config.employee_domain, config.repo)
    |> Selecto.select(["manager_id"])
    |> Selecto.filter({"manager_id", :not_null})
    |> Selecto.group_by(["manager_id"])
  end, columns: ["id"])
  |> Selecto.join(:manager_ids,
    source: "manager_ids",
    type: :inner,
    owner_key: :id,
    related_key: :id,
    fields: %{id: %{type: :integer}}
  )
  |> Selecto.select(["id", "first_name", "last_name", "title"])

staged_intersection_query =
  Selecto.intersect(staged_has_manager_query, staged_is_manager_query)

SetOpsLab.Helpers.run(
  "CTE-staged INTERSECT",
  "Employees who both report upward and manage someone, with cohorts prepared by CTEs.",
  staged_intersection_query,
  10
)

Next Steps

To extend this workbook, add:

  1. Set-op examples with EXCEPT ALL and INTERSECT ALL where duplicates are meaningful
  2. A workbook cell that visualizes overlaps as a small Venn-style summary table
  3. A paging strategy section for large set-op outputs