Powered by AppSignal & Oban Pro

SelectoUpdato Complete Feature Tour

selecto_updato_feature_tour.livemd

SelectoUpdato Complete Feature Tour

find_local_package = fn package_name ->
  cwd = File.cwd!()
  ancestors = [cwd | Path.split(cwd) |> Enum.scan("/", &Path.join(&2, &1)) |> Enum.reverse()]

  candidate_paths =
    ancestors
    |> Enum.flat_map(fn base ->
      [
        Path.join(base, "vendor/#{package_name}"),
        Path.join(base, package_name)
      ]
    end)
    |> Enum.uniq()

  Enum.find(candidate_paths, &(File.exists?(Path.join(&1, "mix.exs"))))
end

local_selecto_path = find_local_package.("selecto")
local_updato_path = find_local_package.("selecto_updato")

# `selecto_updato` declares `{:selecto, path: "../selecto"}`; force one source of truth here.
selecto_dep =
  if local_selecto_path do
    {:selecto, path: local_selecto_path, override: true}
  else
    {:selecto, github: "seeken/selecto", branch: "main", override: true}
  end

updato_dep =
  if local_updato_path do
    {:selecto_updato, path: local_updato_path}
  else
    {:selecto_updato, github: "seeken/selecto_updato", branch: "main"}
  end

Mix.install([
  selecto_dep,
  updato_dep,
  {:postgrex, "~> 0.17"},
  {:ecto_sql, "~> 3.11"},
  {:jason, "~> 1.4"},
  {:decimal, "~> 2.0"},
  {:kino, "~> 0.12"}
])

IO.puts("Using Selecto dependency: #{inspect(selecto_dep)}")
IO.puts("Using SelectoUpdato dependency: #{inspect(updato_dep)}")

Overview

This notebook exercises the full public SelectoUpdato API against the selecto_examples PostgreSQL dataset.

Feature coverage checklist:

  • new/2, insert/2, insert_all/2, insert_from_query/2
  • update/2 with expression tuples and grouped filters
  • upsert/2, upsert_all/2, conflict_target/2, on_conflict/2
  • delete/1, soft_delete/1, confirm_bulk_delete/2, cascade/3
  • nested_strategy/3 for nested updates
  • filter/2, filters/2, filter_from_selecto/2
  • returning/2, with_changeset/2, audit/2
  • execute/3, execute_in_transaction/3, transaction/3
  • multi/0, multi_insert/4, multi_insert_all/4, multi_update/4, multi_delete/4, multi_run/3, run/3
  • dry_run/2, to_sql/1

Before running this notebook:

cd selecto_examples
mix setup
import Ecto.Query

defmodule SelectoUpdatoLivebook.Repo do
  use Ecto.Repo,
    otp_app: :selecto_updato_livebook,
    adapter: Ecto.Adapters.Postgres
end

repo_config = [
  database: System.get_env("SELECTO_EXAMPLES_DB", "selecto_examples_dev"),
  username: System.get_env("SELECTO_EXAMPLES_DB_USER", "postgres"),
  password: System.get_env("SELECTO_EXAMPLES_DB_PASS", "postgres"),
  hostname: System.get_env("SELECTO_EXAMPLES_DB_HOST", "localhost"),
  port: String.to_integer(System.get_env("SELECTO_EXAMPLES_DB_PORT", "5432")),
  pool_size: 5
]

case Process.whereis(SelectoUpdatoLivebook.Repo) do
  nil -> {:ok, _pid} = SelectoUpdatoLivebook.Repo.start_link(repo_config)
  _pid -> :ok
end

case Process.whereis(:selecto_updato_hook_log) do
  nil -> {:ok, _pid} = Agent.start_link(fn -> [] end, name: :selecto_updato_hook_log)
  _pid -> Agent.update(:selecto_updato_hook_log, fn _ -> [] end)
end

support_sql = [
  """
  CREATE TABLE IF NOT EXISTS updato_audit_entries (
    id bigserial PRIMARY KEY,
    operation text NOT NULL,
    source_table text,
    record_id bigint,
    changes jsonb,
    actor_id text,
    reason text,
    metadata jsonb DEFAULT '{}'::jsonb,
    inserted_at timestamp without time zone NOT NULL,
    updated_at timestamp without time zone NOT NULL
  )
  """,
  """
  CREATE TABLE IF NOT EXISTS updato_order_snapshots (
    id bigserial PRIMARY KEY,
    order_number text NOT NULL,
    status text NOT NULL,
    total numeric(12, 2) NOT NULL DEFAULT 0,
    inserted_at timestamp without time zone NOT NULL DEFAULT now(),
    updated_at timestamp without time zone NOT NULL DEFAULT now()
  )
  """,
  """
  CREATE TABLE IF NOT EXISTS updato_soft_delete_samples (
    id bigserial PRIMARY KEY,
    name text NOT NULL,
    status text NOT NULL DEFAULT 'active',
    deleted_at timestamp without time zone,
    inserted_at timestamp without time zone NOT NULL DEFAULT now(),
    updated_at timestamp without time zone NOT NULL DEFAULT now()
  )
  """
]

Enum.each(support_sql, fn sql ->
  Ecto.Adapters.SQL.query!(SelectoUpdatoLivebook.Repo, sql, [])
end)

:ok
defmodule SelectoUpdatoLivebook.Customer do
  use Ecto.Schema
  import Ecto.Changeset

  schema "customers" do
    field :name, :string
    field :email, :string
    field :phone, :string
    field :tier, :string
    field :company_name, :string
    field :address, :string
    field :city, :string
    field :region, :string
    field :postal_code, :string
    field :country, :string
    field :notes, :string
    field :preferences, :map
    field :active, :boolean
    field :verified_at, :utc_datetime

    has_many :orders, SelectoUpdatoLivebook.Order

    timestamps()
  end

  def changeset(customer, attrs) do
    customer
    |> cast(attrs, [
      :name,
      :email,
      :tier,
      :city,
      :country,
      :preferences,
      :active,
      :verified_at
    ])
    |> validate_required([:name, :email])
    |> validate_format(:email, ~r/@/)
  end
end

defmodule SelectoUpdatoLivebook.Product do
  use Ecto.Schema
  import Ecto.Changeset

  schema "products" do
    field :name, :string
    field :sku, :string
    field :description, :string
    field :price, :decimal
    field :cost, :decimal
    field :stock_quantity, :integer
    field :reorder_level, :integer
    field :active, :boolean
    field :featured, :boolean
    field :tags, {:array, :string}
    field :metadata, :map
    field :category_id, :integer
    field :supplier_id, :integer

    timestamps()
  end

  def changeset(product, attrs) do
    product
    |> cast(attrs, [
      :name,
      :sku,
      :description,
      :price,
      :cost,
      :stock_quantity,
      :reorder_level,
      :active,
      :featured,
      :tags,
      :metadata,
      :category_id,
      :supplier_id
    ])
    |> validate_required([:name, :sku, :price])
  end
end

defmodule SelectoUpdatoLivebook.Order do
  use Ecto.Schema
  import Ecto.Changeset

  schema "orders" do
    field :order_number, :string
    field :status, :string
    field :subtotal, :decimal
    field :tax, :decimal
    field :shipping, :decimal
    field :discount, :decimal
    field :total, :decimal
    field :shipping_country, :string
    field :notes, :string
    field :customer_id, :integer

    has_many :order_items, SelectoUpdatoLivebook.OrderItem
    belongs_to :customer, SelectoUpdatoLivebook.Customer,
      foreign_key: :customer_id,
      define_field: false

    timestamps()
  end

  def changeset(order, attrs) do
    order
    |> cast(attrs, [
      :order_number,
      :status,
      :subtotal,
      :tax,
      :shipping,
      :discount,
      :total,
      :shipping_country,
      :notes,
      :customer_id
    ])
    |> validate_required([:order_number, :total, :customer_id])
  end
end

defmodule SelectoUpdatoLivebook.OrderItem do
  use Ecto.Schema
  import Ecto.Changeset

  schema "order_items" do
    field :quantity, :integer
    field :unit_price, :decimal
    field :discount, :decimal
    field :line_total, :decimal
    field :line_number, :integer
    field :order_id, :integer
    field :product_id, :integer

    timestamps()
  end

  def changeset(item, attrs) do
    item
    |> cast(attrs, [:quantity, :unit_price, :discount, :line_total, :line_number, :order_id, :product_id])
    |> validate_required([:quantity, :unit_price, :line_total, :product_id])
  end
end

defmodule SelectoUpdatoLivebook.OrderSnapshot do
  use Ecto.Schema
  import Ecto.Changeset

  schema "updato_order_snapshots" do
    field :order_number, :string
    field :status, :string
    field :total, :decimal

    timestamps()
  end

  def changeset(snapshot, attrs) do
    snapshot
    |> cast(attrs, [:order_number, :status, :total])
    |> validate_required([:order_number, :status, :total])
  end
end

defmodule SelectoUpdatoLivebook.SoftDeleteSample do
  use Ecto.Schema
  import Ecto.Changeset

  schema "updato_soft_delete_samples" do
    field :name, :string
    field :status, :string
    field :deleted_at, :utc_datetime

    timestamps()
  end

  def changeset(sample, attrs) do
    sample
    |> cast(attrs, [:name, :status, :deleted_at])
    |> validate_required([:name, :status])
  end
end

defmodule SelectoUpdatoLivebook.Hooks do
  import Ecto.Changeset

  def before_insert_order(changeset) do
    put_change(changeset, :notes, "created_by_selecto_updato")
  end

  def before_update_order(changeset, _original) do
    if get_change(changeset, :notes) do
      changeset
    else
      put_change(changeset, :notes, "updated_by_selecto_updato")
    end
  end

  def after_insert_order(order) do
    log({:after_insert, Map.get(order, :id)})
    :ok
  end

  def after_update_order(order) do
    log({:after_update, Map.get(order, :id)})
    :ok
  end

  def before_delete_order(record) do
    log({:before_delete, Map.get(record, :id)})
    :ok
  end

  def after_delete_order(record) do
    log({:after_delete, Map.get(record, :id)})
    :ok
  end

  defp log(event) do
    if Process.whereis(:selecto_updato_hook_log) do
      Agent.update(:selecto_updato_hook_log, fn events -> [event | events] end)
    end
  end
end

:ok
alias SelectoUpdatoLivebook.{
  Repo,
  Customer,
  Product,
  Order,
  OrderItem,
  OrderSnapshot,
  SoftDeleteSample,
  Hooks
}

d = &Decimal.new/1

read_key = fn
  %_{} = struct, key ->
    wanted = to_string(key)

    struct
    |> Map.from_struct()
    |> Enum.find_value(fn {k, v} -> if to_string(k) == wanted, do: v end)

  %{} = map, key ->
    wanted = to_string(key)
    Enum.find_value(map, fn {k, v} -> if to_string(k) == wanted, do: v end)

  _other, _key ->
    nil
end

run_id = "updato-#{System.system_time(:millisecond)}"

customer_domain = %{
  source: Customer,
  columns: %{
    "id" => %{type: :integer},
    "name" => %{type: :string},
    "email" => %{type: :string, validations: [format: ~r/@/]},
    "tier" => %{type: :string, validations: [inclusion: ["standard", "premium", "vip"]]},
    "city" => %{type: :string},
    "country" => %{type: :string},
    "preferences" => %{type: :jsonb},
    "active" => %{type: :boolean},
    "verified_at" => %{type: :utc_datetime},
    "inserted_at" => %{type: :utc_datetime},
    "updated_at" => %{type: :utc_datetime}
  },
  writable: ["name", "email", "tier", "city", "country", "preferences", "active", "verified_at"],
  readonly: ["id", "inserted_at", "updated_at"],
  required_on_insert: ["name", "email"]
}

product_domain = %{
  source: Product,
  columns: %{
    "id" => %{type: :integer},
    "name" => %{type: :string},
    "sku" => %{type: :string},
    "description" => %{type: :string},
    "price" => %{type: :decimal},
    "cost" => %{type: :decimal},
    "stock_quantity" => %{type: :integer},
    "reorder_level" => %{type: :integer},
    "active" => %{type: :boolean},
    "featured" => %{type: :boolean},
    "tags" => %{type: {:array, :string}},
    "metadata" => %{type: :jsonb},
    "category_id" => %{type: :integer},
    "supplier_id" => %{type: :integer},
    "inserted_at" => %{type: :utc_datetime},
    "updated_at" => %{type: :utc_datetime}
  },
  writable: [
    "name",
    "sku",
    "description",
    "price",
    "cost",
    "stock_quantity",
    "reorder_level",
    "active",
    "featured",
    "tags",
    "metadata",
    "category_id",
    "supplier_id"
  ],
  readonly: ["id", "inserted_at", "updated_at"],
  required_on_insert: ["name", "sku", "price"]
}

order_domain = %{
  source: Order,
  columns: %{
    "id" => %{type: :integer},
    "order_number" => %{type: :string},
    "status" => %{type: :string},
    "subtotal" => %{type: :decimal},
    "tax" => %{type: :decimal},
    "shipping" => %{type: :decimal},
    "discount" => %{type: :decimal},
    "total" => %{type: :decimal},
    "shipping_country" => %{type: :string},
    "notes" => %{type: :string},
    "customer_id" => %{type: :integer},
    "inserted_at" => %{type: :utc_datetime},
    "updated_at" => %{type: :utc_datetime}
  },
  writable: [
    "order_number",
    "status",
    "subtotal",
    "tax",
    "shipping",
    "discount",
    "total",
    "shipping_country",
    "notes",
    "customer_id"
  ],
  readonly: ["id", "inserted_at", "updated_at"],
  required_on_insert: ["order_number", "total", "customer_id"],
  schemas: %{order_items: OrderItem, customer: Customer},
  joins: %{
    order_items: %{
      source: "order_items",
      owner_key: :id,
      my_key: :order_id,
      cardinality: :many,
      config: %{writable: true}
    },
    customer: %{
      source: "customers",
      owner_key: :customer_id,
      my_key: :id,
      cardinality: :one,
      config: %{writable: false}
    }
  },
  hooks: %{
    before_insert: [&Hooks.before_insert_order/1],
    after_insert: [&Hooks.after_insert_order/1],
    before_update: [&Hooks.before_update_order/2],
    after_update: [&Hooks.after_update_order/1],
    before_delete: [&Hooks.before_delete_order/1],
    after_delete: [&Hooks.after_delete_order/1]
  },
  audit: %{enabled: true, table: "updato_audit_entries"}
}

order_snapshot_domain = %{
  source: OrderSnapshot,
  columns: %{
    "id" => %{type: :integer},
    "order_number" => %{type: :string},
    "status" => %{type: :string},
    "total" => %{type: :decimal},
    "inserted_at" => %{type: :utc_datetime},
    "updated_at" => %{type: :utc_datetime}
  },
  writable: ["order_number", "status", "total"],
  readonly: ["id", "inserted_at", "updated_at"],
  required_on_insert: ["order_number", "status", "total"]
}

soft_delete_domain = %{
  source: SoftDeleteSample,
  columns: %{
    "id" => %{type: :integer},
    "name" => %{type: :string},
    "status" => %{type: :string},
    "deleted_at" => %{type: :utc_datetime},
    "inserted_at" => %{type: :utc_datetime},
    "updated_at" => %{type: :utc_datetime}
  },
  writable: ["name", "status", "deleted_at"],
  readonly: ["id", "inserted_at", "updated_at"],
  required_on_insert: ["name", "status"],
  soft_delete_field: "deleted_at"
}

%{run_id: run_id}

Insert, Returning, and with_changeset

base_customer_email = "#{run_id}-base@example.test"
vip_customer_email = "#{run_id}-vip@example.test"

{:ok, base_customer} =
  SelectoUpdato.new(customer_domain)
  |> SelectoUpdato.insert(%{
    "name" => "Base #{run_id}",
    "email" => base_customer_email,
    "tier" => "standard",
    "city" => "Boston",
    "country" => "US",
    "preferences" => %{"newsletter" => true},
    "active" => true
  })
  |> SelectoUpdato.returning(["id", "name", "email", "tier"])
  |> SelectoUpdato.execute(Repo)

base_customer_id = read_key.(base_customer, :id)

{:ok, vip_customer} =
  SelectoUpdato.new(customer_domain)
  |> SelectoUpdato.insert(%{
    "name" => "Custom Changeset #{run_id}",
    "email" => vip_customer_email,
    "tier" => "standard",
    "city" => "Denver",
    "country" => "US",
    "preferences" => %{},
    "active" => true
  })
  |> SelectoUpdato.with_changeset(fn customer, attrs ->
    customer
    |> Customer.changeset(attrs)
    |> Ecto.Changeset.put_change(:tier, "vip")
  end)
  |> SelectoUpdato.execute(Repo)

%{
  base_customer: base_customer,
  base_customer_id: base_customer_id,
  vip_customer: vip_customer
}

insert_all

{:ok, inserted_products} =
  SelectoUpdato.new(product_domain)
  |> SelectoUpdato.insert_all([
    %{
      "name" => "Notebook A #{run_id}",
      "sku" => "#{run_id}-SKU-A",
      "price" => d.("10.00"),
      "cost" => d.("5.00"),
      "stock_quantity" => 40,
      "reorder_level" => 5,
      "active" => true,
      "featured" => false,
      "tags" => ["seed", "a"],
      "metadata" => %{"run_id" => run_id}
    },
    %{
      "name" => "Notebook B #{run_id}",
      "sku" => "#{run_id}-SKU-B",
      "price" => d.("20.00"),
      "cost" => d.("11.00"),
      "stock_quantity" => 25,
      "reorder_level" => 4,
      "active" => true,
      "featured" => false,
      "tags" => ["seed", "b"],
      "metadata" => %{"run_id" => run_id}
    }
  ])
  |> SelectoUpdato.returning(["id", "sku", "name", "price"])
  |> SelectoUpdato.execute(Repo)

product_rows =
  inserted_products
  |> Map.get(:records, inserted_products)
  |> List.wrap()

product_one_id =
  case Enum.at(product_rows, 0) do
    nil -> Repo.one(from p in Product, where: p.sku == ^"#{run_id}-SKU-A", select: p.id)
    row -> read_key.(row, :id)
  end

product_two_id =
  case Enum.at(product_rows, 1) do
    nil -> Repo.one(from p in Product, where: p.sku == ^"#{run_id}-SKU-B", select: p.id)
    row -> read_key.(row, :id)
  end

%{
  inserted_products: inserted_products,
  product_one_id: product_one_id,
  product_two_id: product_two_id
}

upsert, upsert_all, conflict_target, on_conflict

{:ok, upsert_customer} =
  SelectoUpdato.new(customer_domain)
  |> SelectoUpdato.upsert(%{
    "name" => "Updated Base #{run_id}",
    "email" => base_customer_email,
    "tier" => "premium",
    "city" => "Seattle",
    "country" => "US",
    "preferences" => %{"newsletter" => false},
    "active" => true
  })
  |> SelectoUpdato.conflict_target(["email"])
  |> SelectoUpdato.on_conflict({:replace, ["name", "tier", "city", "preferences"]})
  |> SelectoUpdato.returning(["id", "name", "tier", "city"])
  |> SelectoUpdato.execute(Repo)

{:ok, upsert_products} =
  SelectoUpdato.new(product_domain)
  |> SelectoUpdato.upsert_all([
    %{
      "name" => "Notebook A updated #{run_id}",
      "sku" => "#{run_id}-SKU-A",
      "price" => d.("13.00"),
      "stock_quantity" => 41,
      "tags" => ["seed", "a", "upsert"],
      "metadata" => %{"run_id" => run_id, "version" => 2}
    },
    %{
      "name" => "Notebook C #{run_id}",
      "sku" => "#{run_id}-SKU-C",
      "price" => d.("12.34"),
      "stock_quantity" => 15,
      "tags" => ["seed", "c"],
      "metadata" => %{"run_id" => run_id}
    }
  ])
  |> SelectoUpdato.conflict_target(["sku"])
  |> SelectoUpdato.on_conflict({:replace, ["name", "price", "stock_quantity", "tags", "metadata"]})
  |> SelectoUpdato.execute(Repo)

product_three_id = Repo.one(from p in Product, where: p.sku == ^"#{run_id}-SKU-C", select: p.id)

%{
  upsert_customer: upsert_customer,
  upsert_products: upsert_products,
  product_three_id: product_three_id
}

Nested insert, returning(:all), hooks, and audit

{:ok, nested_insert_result} =
  SelectoUpdato.new(order_domain)
  |> SelectoUpdato.insert(%{
    "order_number" => "#{run_id}-ORD-1",
    "status" => "pending",
    "subtotal" => d.("30.00"),
    "tax" => d.("3.00"),
    "shipping" => d.("5.00"),
    "discount" => d.("0.00"),
    "total" => d.("38.00"),
    "shipping_country" => "US",
    "customer_id" => base_customer_id,
    "order_items" => [
      %{
        "product_id" => product_one_id,
        "quantity" => 2,
        "unit_price" => d.("10.00"),
        "discount" => d.("0.00"),
        "line_total" => d.("20.00"),
        "line_number" => 1
      },
      %{
        "product_id" => product_two_id,
        "quantity" => 1,
        "unit_price" => d.("10.00"),
        "discount" => d.("0.00"),
        "line_total" => d.("10.00"),
        "line_number" => 2
      }
    ]
  })
  |> SelectoUpdato.returning(:all)
  |> SelectoUpdato.audit(actor: "livebook", reason: "create nested order")
  |> SelectoUpdato.execute(Repo)

order = nested_insert_result.order
first_order_item = List.first(nested_insert_result.order_items)

hook_events = Agent.get(:selecto_updato_hook_log, &Enum.reverse/1)

audit_rows =
  Ecto.Adapters.SQL.query!(
    Repo,
    "SELECT operation, source_table, reason FROM updato_audit_entries WHERE record_id = $1 ORDER BY id DESC LIMIT 1",
    [order.id]
  ).rows

%{
  nested_insert_result: nested_insert_result,
  hook_events: hook_events,
  audit_rows: audit_rows
}

update with expression tuples, grouped filters, and nested_strategy(:sync)

{:ok, updated_product_one} =
  SelectoUpdato.new(product_domain)
  |> SelectoUpdato.filters([
    {:and, [
      {"id", product_one_id},
      {:or, [{"active", true}, {"active", false}]}
    ]}
  ])
  |> SelectoUpdato.update(%{
    stock_quantity: {:increment, 5},
    price: {:decrement, d.("1.00")},
    reorder_level: {:set, 3},
    tags: {:array_append, "promo"},
    metadata: {:json_merge, %{"campaign" => run_id}},
    description: {:coalesce, "description filled by coalesce"}
  })
  |> SelectoUpdato.execute(Repo)

{:ok, _prepend_tags} =
  SelectoUpdato.new(product_domain)
  |> SelectoUpdato.filter({"id", product_one_id})
  |> SelectoUpdato.update(%{tags: {:array_prepend, "first-tag"}})
  |> SelectoUpdato.execute(Repo)

{:ok, _remove_tags} =
  SelectoUpdato.new(product_domain)
  |> SelectoUpdato.filter({"id", product_one_id})
  |> SelectoUpdato.update(%{tags: {:array_remove, "promo"}})
  |> SelectoUpdato.execute(Repo)

{:ok, nested_update_result} =
  SelectoUpdato.new(order_domain)
  |> SelectoUpdato.filter({"id", order.id})
  |> SelectoUpdato.update(%{
    "status" => "processing",
    "order_items" => [
      %{
        "id" => first_order_item.id,
        "product_id" => product_one_id,
        "quantity" => 5,
        "unit_price" => d.("10.00"),
        "discount" => d.("0.00"),
        "line_total" => d.("50.00"),
        "line_number" => 1
      },
      %{
        "product_id" => product_three_id,
        "quantity" => 1,
        "unit_price" => d.("12.34"),
        "discount" => d.("0.00"),
        "line_total" => d.("12.34"),
        "line_number" => 2
      }
    ],
    "shipping" => Decimal.add(order.shipping || d.("0.00"), d.("2.00"))
  })
  |> SelectoUpdato.nested_strategy(:order_items, :sync)
  |> SelectoUpdato.execute(Repo)

order_item_count = Repo.aggregate(from(i in OrderItem, where: i.order_id == ^order.id), :count)

%{
  updated_product_one: updated_product_one,
  nested_update_result: nested_update_result,
  order_item_count_after_sync: order_item_count
}

dry_run and to_sql

dry_run_insert_error =
  SelectoUpdato.new(customer_domain)
  |> SelectoUpdato.insert(%{"name" => "Invalid #{run_id}"})
  |> SelectoUpdato.dry_run(Repo)

dry_run_update_ok =
  SelectoUpdato.new(product_domain)
  |> SelectoUpdato.filter({"id", product_one_id})
  |> SelectoUpdato.update(%{stock_quantity: {:increment, 2}})
  |> SelectoUpdato.dry_run(Repo)

dry_run_update_missing_filter =
  SelectoUpdato.new(product_domain)
  |> SelectoUpdato.update(%{"stock_quantity" => 1})
  |> SelectoUpdato.dry_run(Repo)

sql_preview =
  SelectoUpdato.new(order_domain)
  |> SelectoUpdato.filter({:and, [
    {"status", "processing"},
    {"order_number", {:ilike, "#{run_id}%"}}
  ]})
  |> SelectoUpdato.update(%{"status" => "shipped"})
  |> SelectoUpdato.to_sql()

%{
  dry_run_insert_error: dry_run_insert_error,
  dry_run_update_ok: dry_run_update_ok,
  dry_run_update_missing_filter: dry_run_update_missing_filter,
  sql_preview: sql_preview
}

insert_from_query and filter_from_selecto

order_selecto_domain = %{
  source: %{
    source_table: "orders",
    primary_key: :id,
    fields: [:id, :order_number, :status, :total],
    columns: %{
      id: %{type: :integer},
      order_number: %{type: :string},
      status: %{type: :string},
      total: %{type: :decimal}
    }
  },
  schemas: %{},
  joins: %{}
}

snapshot_selecto_domain = %{
  source: %{
    source_table: "updato_order_snapshots",
    primary_key: :id,
    fields: [:id, :order_number, :status, :total],
    columns: %{
      id: %{type: :integer},
      order_number: %{type: :string},
      status: %{type: :string},
      total: %{type: :decimal}
    }
  },
  schemas: %{},
  joins: %{}
}

source_query =
  Selecto.configure(order_selecto_domain, Repo)
  |> Selecto.select(["order_number", "status", "total"])
  |> Selecto.filter({"order_number", order.order_number})

{:ok, insert_from_query_result} =
  SelectoUpdato.new(order_snapshot_domain)
  |> SelectoUpdato.insert_from_query(source_query)
  |> SelectoUpdato.execute(Repo)

snapshot_filter_query =
  Selecto.configure(snapshot_selecto_domain, Repo)
  |> Selecto.filter({"order_number", {:ilike, "#{run_id}%"}})
  |> Selecto.filter({"status", "processing"})

extract_selecto_filters = fn selecto_query ->
  legacy_filters = Map.get(selecto_query, :filters, [])

  selecto_set =
    case Map.get(selecto_query, :set) do
      %{} = set -> set
      _ -> %{}
    end

  set_filters = Map.get(selecto_set, :filtered) || Map.get(selecto_set, :filters) || []
  post_pivot_filters = Map.get(selecto_set, :post_pivot_filters) || []

  [legacy_filters, set_filters, post_pivot_filters]
  |> Enum.flat_map(fn
    filters when is_list(filters) -> filters
    _ -> []
  end)
end

snapshot_filters = extract_selecto_filters.(snapshot_filter_query)

snapshot_update_op =
  SelectoUpdato.new(order_snapshot_domain)
  |> SelectoUpdato.filter_from_selecto(snapshot_filter_query)

snapshot_update_op =
  if snapshot_update_op.filters == [] do
    SelectoUpdato.filters(snapshot_update_op, snapshot_filters)
  else
    snapshot_update_op
  end

{:ok, filter_from_selecto_result} =
  snapshot_update_op
  |> SelectoUpdato.update(%{"status" => "copied"})
  |> SelectoUpdato.execute(Repo)

snapshots_after_copy =
  Repo.all(
    from s in OrderSnapshot,
      where: ilike(s.order_number, ^"#{run_id}%"),
      order_by: [desc: s.id],
      select: %{id: s.id, order_number: s.order_number, status: s.status, total: s.total}
  )

%{
  insert_from_query_result: insert_from_query_result,
  filter_from_selecto_result: filter_from_selecto_result,
  snapshots_after_copy: snapshots_after_copy
}

execute, execute_in_transaction, transaction, and envelope mode

{:ok, envelope_result} =
  SelectoUpdato.new(order_snapshot_domain)
  |> SelectoUpdato.insert(%{
    "order_number" => "#{run_id}-ENV",
    "status" => "envelope",
    "total" => d.("1.23")
  })
  |> SelectoUpdato.execute(Repo, result: :envelope)

tx_execute_result =
  SelectoUpdato.new(order_snapshot_domain)
  |> SelectoUpdato.insert(%{
    "order_number" => "#{run_id}-TX-EXEC",
    "status" => "tx-exec",
    "total" => d.("2.34")
  })
  |> SelectoUpdato.execute_in_transaction(Repo)

transaction_result =
  SelectoUpdato.transaction(Repo, fn ->
    {:ok, tx_row} =
      SelectoUpdato.new(order_snapshot_domain)
      |> SelectoUpdato.insert(%{
        "order_number" => "#{run_id}-TX-WRAP",
        "status" => "tx",
        "total" => d.("3.45")
      })
      |> SelectoUpdato.execute(Repo)

    SelectoUpdato.new(order_snapshot_domain)
    |> SelectoUpdato.filter({"id", tx_row.id})
    |> SelectoUpdato.update(%{"status" => "tx-updated"})
    |> SelectoUpdato.execute(Repo)
  end)

%{
  envelope_result: envelope_result,
  tx_execute_result: tx_execute_result,
  transaction_result: transaction_result
}

delete, confirm_bulk_delete, cascade, and soft_delete

{:ok, delete_target_result} =
  SelectoUpdato.new(order_domain)
  |> SelectoUpdato.insert(%{
    "order_number" => "#{run_id}-ORD-DEL",
    "status" => "pending",
    "subtotal" => d.("10.00"),
    "tax" => d.("1.00"),
    "shipping" => d.("1.00"),
    "discount" => d.("0.00"),
    "total" => d.("12.00"),
    "shipping_country" => "US",
    "customer_id" => base_customer_id,
    "order_items" => [
      %{
        "product_id" => product_one_id,
        "quantity" => 1,
        "unit_price" => d.("10.00"),
        "discount" => d.("0.00"),
        "line_total" => d.("10.00"),
        "line_number" => 1
      }
    ]
  })
  |> SelectoUpdato.execute(Repo)

delete_order_id = delete_target_result.order.id

restrict_delete_result =
  SelectoUpdato.new(order_domain)
  |> SelectoUpdato.filter({"id", delete_order_id})
  |> SelectoUpdato.delete()
  |> SelectoUpdato.cascade(:order_items, :restrict)
  |> SelectoUpdato.execute(Repo)

cascade_delete_result =
  SelectoUpdato.new(order_domain)
  |> SelectoUpdato.filter({"id", delete_order_id})
  |> SelectoUpdato.delete()
  |> SelectoUpdato.cascade(:order_items, :delete)
  |> SelectoUpdato.execute(Repo)

{:ok, _bulk_seed} =
  SelectoUpdato.new(order_snapshot_domain)
  |> SelectoUpdato.insert_all([
    %{"order_number" => "#{run_id}-BULK-1", "status" => "bulk-prune", "total" => d.("1.00")},
    %{"order_number" => "#{run_id}-BULK-2", "status" => "bulk-prune", "total" => d.("2.00")}
  ])
  |> SelectoUpdato.execute(Repo)

bulk_unconfirmed_result =
  SelectoUpdato.new(order_snapshot_domain)
  |> SelectoUpdato.filter({"status", "bulk-prune"})
  |> SelectoUpdato.delete()
  |> SelectoUpdato.execute(Repo)

bulk_confirmed_result =
  SelectoUpdato.new(order_snapshot_domain)
  |> SelectoUpdato.filter({"status", "bulk-prune"})
  |> SelectoUpdato.delete()
  |> SelectoUpdato.confirm_bulk_delete(true)
  |> SelectoUpdato.execute(Repo)

{:ok, soft_sample} =
  SelectoUpdato.new(soft_delete_domain)
  |> SelectoUpdato.insert(%{"name" => "Soft #{run_id}", "status" => "active"})
  |> SelectoUpdato.execute(Repo)

{:ok, _soft_deleted} =
  SelectoUpdato.new(soft_delete_domain)
  |> SelectoUpdato.filter({"id", soft_sample.id})
  |> SelectoUpdato.soft_delete()
  |> SelectoUpdato.execute(Repo)

soft_sample_after = Repo.get!(SoftDeleteSample, soft_sample.id)

%{
  restrict_delete_result: restrict_delete_result,
  cascade_delete_result: cascade_delete_result,
  bulk_unconfirmed_result: bulk_unconfirmed_result,
  bulk_confirmed_result: bulk_confirmed_result,
  soft_sample_after: soft_sample_after
}

Ecto.Multi integration

multi_result =
  SelectoUpdato.multi()
  |> SelectoUpdato.multi_insert(:multi_customer, customer_domain, %{
    "name" => "Multi #{run_id}",
    "email" => "#{run_id}-multi@example.test",
    "tier" => "standard",
    "city" => "Austin",
    "country" => "US",
    "preferences" => %{},
    "active" => true
  })
  |> SelectoUpdato.multi_insert_all(:multi_snapshots, order_snapshot_domain, fn %{multi_customer: _customer} ->
    [
      %{"order_number" => "#{run_id}-MULTI-1", "status" => "multi_cleanup", "total" => d.("4.00")},
      %{"order_number" => "#{run_id}-MULTI-2", "status" => "multi_keep", "total" => d.("5.00")}
    ]
  end)
  |> SelectoUpdato.multi_update(:promote_base_customer, customer_domain,
    filter: [{"id", base_customer_id}],
    changes: %{tier: "vip"}
  )
  |> SelectoUpdato.multi_delete(:delete_multi_cleanup, order_snapshot_domain,
    filter: [{"status", "multi_cleanup"}]
  )
  |> SelectoUpdato.multi_run(:summary, fn _repo, changes ->
    {:ok, Map.keys(changes)}
  end)
  |> SelectoUpdato.run(Repo)

multi_result

Optional cleanup

Run this cell if you want to remove rows created by this notebook run.

Repo.delete_all(from s in SoftDeleteSample, where: ilike(s.name, ^"%#{run_id}%"))
Repo.delete_all(from s in OrderSnapshot, where: ilike(s.order_number, ^"#{run_id}%"))
Repo.delete_all(from o in Order, where: ilike(o.order_number, ^"#{run_id}%"))
Repo.delete_all(from p in Product, where: ilike(p.sku, ^"#{run_id}%"))
Repo.delete_all(from c in Customer, where: ilike(c.email, ^"#{run_id}%"))

Ecto.Adapters.SQL.query!(
  Repo,
  "DELETE FROM updato_audit_entries WHERE changes::text ILIKE $1 OR reason ILIKE $2",
  ["%#{run_id}%", "%#{run_id}%"]
)

:ok