SelectoUpdato Complete Feature Tour
find_local_package = fn package_name ->
cwd = File.cwd!()
ancestors = [cwd | Path.split(cwd) |> Enum.scan("/", &Path.join(&2, &1)) |> Enum.reverse()]
candidate_paths =
ancestors
|> Enum.flat_map(fn base ->
[
Path.join(base, "vendor/#{package_name}"),
Path.join(base, package_name)
]
end)
|> Enum.uniq()
Enum.find(candidate_paths, &(File.exists?(Path.join(&1, "mix.exs"))))
end
local_selecto_path = find_local_package.("selecto")
local_updato_path = find_local_package.("selecto_updato")
# `selecto_updato` declares `{:selecto, path: "../selecto"}`; force one source of truth here.
selecto_dep =
if local_selecto_path do
{:selecto, path: local_selecto_path, override: true}
else
{:selecto, github: "seeken/selecto", branch: "main", override: true}
end
updato_dep =
if local_updato_path do
{:selecto_updato, path: local_updato_path}
else
{:selecto_updato, github: "seeken/selecto_updato", branch: "main"}
end
Mix.install([
selecto_dep,
updato_dep,
{:postgrex, "~> 0.17"},
{:ecto_sql, "~> 3.11"},
{:jason, "~> 1.4"},
{:decimal, "~> 2.0"},
{:kino, "~> 0.12"}
])
IO.puts("Using Selecto dependency: #{inspect(selecto_dep)}")
IO.puts("Using SelectoUpdato dependency: #{inspect(updato_dep)}")
Overview
This notebook exercises the full public SelectoUpdato API against the
selecto_examples PostgreSQL dataset.
Feature coverage checklist:
-
new/2,insert/2,insert_all/2,insert_from_query/2 -
update/2with expression tuples and grouped filters -
upsert/2,upsert_all/2,conflict_target/2,on_conflict/2 -
delete/1,soft_delete/1,confirm_bulk_delete/2,cascade/3 -
nested_strategy/3for nested updates -
filter/2,filters/2,filter_from_selecto/2 -
returning/2,with_changeset/2,audit/2 -
execute/3,execute_in_transaction/3,transaction/3 -
multi/0,multi_insert/4,multi_insert_all/4,multi_update/4,multi_delete/4,multi_run/3,run/3 -
dry_run/2,to_sql/1
Before running this notebook:
cd selecto_examples
mix setup
import Ecto.Query
defmodule SelectoUpdatoLivebook.Repo do
use Ecto.Repo,
otp_app: :selecto_updato_livebook,
adapter: Ecto.Adapters.Postgres
end
repo_config = [
database: System.get_env("SELECTO_EXAMPLES_DB", "selecto_examples_dev"),
username: System.get_env("SELECTO_EXAMPLES_DB_USER", "postgres"),
password: System.get_env("SELECTO_EXAMPLES_DB_PASS", "postgres"),
hostname: System.get_env("SELECTO_EXAMPLES_DB_HOST", "localhost"),
port: String.to_integer(System.get_env("SELECTO_EXAMPLES_DB_PORT", "5432")),
pool_size: 5
]
case Process.whereis(SelectoUpdatoLivebook.Repo) do
nil -> {:ok, _pid} = SelectoUpdatoLivebook.Repo.start_link(repo_config)
_pid -> :ok
end
case Process.whereis(:selecto_updato_hook_log) do
nil -> {:ok, _pid} = Agent.start_link(fn -> [] end, name: :selecto_updato_hook_log)
_pid -> Agent.update(:selecto_updato_hook_log, fn _ -> [] end)
end
support_sql = [
"""
CREATE TABLE IF NOT EXISTS updato_audit_entries (
id bigserial PRIMARY KEY,
operation text NOT NULL,
source_table text,
record_id bigint,
changes jsonb,
actor_id text,
reason text,
metadata jsonb DEFAULT '{}'::jsonb,
inserted_at timestamp without time zone NOT NULL,
updated_at timestamp without time zone NOT NULL
)
""",
"""
CREATE TABLE IF NOT EXISTS updato_order_snapshots (
id bigserial PRIMARY KEY,
order_number text NOT NULL,
status text NOT NULL,
total numeric(12, 2) NOT NULL DEFAULT 0,
inserted_at timestamp without time zone NOT NULL DEFAULT now(),
updated_at timestamp without time zone NOT NULL DEFAULT now()
)
""",
"""
CREATE TABLE IF NOT EXISTS updato_soft_delete_samples (
id bigserial PRIMARY KEY,
name text NOT NULL,
status text NOT NULL DEFAULT 'active',
deleted_at timestamp without time zone,
inserted_at timestamp without time zone NOT NULL DEFAULT now(),
updated_at timestamp without time zone NOT NULL DEFAULT now()
)
"""
]
Enum.each(support_sql, fn sql ->
Ecto.Adapters.SQL.query!(SelectoUpdatoLivebook.Repo, sql, [])
end)
:ok
defmodule SelectoUpdatoLivebook.Customer do
use Ecto.Schema
import Ecto.Changeset
schema "customers" do
field :name, :string
field :email, :string
field :phone, :string
field :tier, :string
field :company_name, :string
field :address, :string
field :city, :string
field :region, :string
field :postal_code, :string
field :country, :string
field :notes, :string
field :preferences, :map
field :active, :boolean
field :verified_at, :utc_datetime
has_many :orders, SelectoUpdatoLivebook.Order
timestamps()
end
def changeset(customer, attrs) do
customer
|> cast(attrs, [
:name,
:email,
:tier,
:city,
:country,
:preferences,
:active,
:verified_at
])
|> validate_required([:name, :email])
|> validate_format(:email, ~r/@/)
end
end
defmodule SelectoUpdatoLivebook.Product do
use Ecto.Schema
import Ecto.Changeset
schema "products" do
field :name, :string
field :sku, :string
field :description, :string
field :price, :decimal
field :cost, :decimal
field :stock_quantity, :integer
field :reorder_level, :integer
field :active, :boolean
field :featured, :boolean
field :tags, {:array, :string}
field :metadata, :map
field :category_id, :integer
field :supplier_id, :integer
timestamps()
end
def changeset(product, attrs) do
product
|> cast(attrs, [
:name,
:sku,
:description,
:price,
:cost,
:stock_quantity,
:reorder_level,
:active,
:featured,
:tags,
:metadata,
:category_id,
:supplier_id
])
|> validate_required([:name, :sku, :price])
end
end
defmodule SelectoUpdatoLivebook.Order do
use Ecto.Schema
import Ecto.Changeset
schema "orders" do
field :order_number, :string
field :status, :string
field :subtotal, :decimal
field :tax, :decimal
field :shipping, :decimal
field :discount, :decimal
field :total, :decimal
field :shipping_country, :string
field :notes, :string
field :customer_id, :integer
has_many :order_items, SelectoUpdatoLivebook.OrderItem
belongs_to :customer, SelectoUpdatoLivebook.Customer,
foreign_key: :customer_id,
define_field: false
timestamps()
end
def changeset(order, attrs) do
order
|> cast(attrs, [
:order_number,
:status,
:subtotal,
:tax,
:shipping,
:discount,
:total,
:shipping_country,
:notes,
:customer_id
])
|> validate_required([:order_number, :total, :customer_id])
end
end
defmodule SelectoUpdatoLivebook.OrderItem do
use Ecto.Schema
import Ecto.Changeset
schema "order_items" do
field :quantity, :integer
field :unit_price, :decimal
field :discount, :decimal
field :line_total, :decimal
field :line_number, :integer
field :order_id, :integer
field :product_id, :integer
timestamps()
end
def changeset(item, attrs) do
item
|> cast(attrs, [:quantity, :unit_price, :discount, :line_total, :line_number, :order_id, :product_id])
|> validate_required([:quantity, :unit_price, :line_total, :product_id])
end
end
defmodule SelectoUpdatoLivebook.OrderSnapshot do
use Ecto.Schema
import Ecto.Changeset
schema "updato_order_snapshots" do
field :order_number, :string
field :status, :string
field :total, :decimal
timestamps()
end
def changeset(snapshot, attrs) do
snapshot
|> cast(attrs, [:order_number, :status, :total])
|> validate_required([:order_number, :status, :total])
end
end
defmodule SelectoUpdatoLivebook.SoftDeleteSample do
use Ecto.Schema
import Ecto.Changeset
schema "updato_soft_delete_samples" do
field :name, :string
field :status, :string
field :deleted_at, :utc_datetime
timestamps()
end
def changeset(sample, attrs) do
sample
|> cast(attrs, [:name, :status, :deleted_at])
|> validate_required([:name, :status])
end
end
defmodule SelectoUpdatoLivebook.Hooks do
import Ecto.Changeset
def before_insert_order(changeset) do
put_change(changeset, :notes, "created_by_selecto_updato")
end
def before_update_order(changeset, _original) do
if get_change(changeset, :notes) do
changeset
else
put_change(changeset, :notes, "updated_by_selecto_updato")
end
end
def after_insert_order(order) do
log({:after_insert, Map.get(order, :id)})
:ok
end
def after_update_order(order) do
log({:after_update, Map.get(order, :id)})
:ok
end
def before_delete_order(record) do
log({:before_delete, Map.get(record, :id)})
:ok
end
def after_delete_order(record) do
log({:after_delete, Map.get(record, :id)})
:ok
end
defp log(event) do
if Process.whereis(:selecto_updato_hook_log) do
Agent.update(:selecto_updato_hook_log, fn events -> [event | events] end)
end
end
end
:ok
alias SelectoUpdatoLivebook.{
Repo,
Customer,
Product,
Order,
OrderItem,
OrderSnapshot,
SoftDeleteSample,
Hooks
}
d = &Decimal.new/1
read_key = fn
%_{} = struct, key ->
wanted = to_string(key)
struct
|> Map.from_struct()
|> Enum.find_value(fn {k, v} -> if to_string(k) == wanted, do: v end)
%{} = map, key ->
wanted = to_string(key)
Enum.find_value(map, fn {k, v} -> if to_string(k) == wanted, do: v end)
_other, _key ->
nil
end
run_id = "updato-#{System.system_time(:millisecond)}"
customer_domain = %{
source: Customer,
columns: %{
"id" => %{type: :integer},
"name" => %{type: :string},
"email" => %{type: :string, validations: [format: ~r/@/]},
"tier" => %{type: :string, validations: [inclusion: ["standard", "premium", "vip"]]},
"city" => %{type: :string},
"country" => %{type: :string},
"preferences" => %{type: :jsonb},
"active" => %{type: :boolean},
"verified_at" => %{type: :utc_datetime},
"inserted_at" => %{type: :utc_datetime},
"updated_at" => %{type: :utc_datetime}
},
writable: ["name", "email", "tier", "city", "country", "preferences", "active", "verified_at"],
readonly: ["id", "inserted_at", "updated_at"],
required_on_insert: ["name", "email"]
}
product_domain = %{
source: Product,
columns: %{
"id" => %{type: :integer},
"name" => %{type: :string},
"sku" => %{type: :string},
"description" => %{type: :string},
"price" => %{type: :decimal},
"cost" => %{type: :decimal},
"stock_quantity" => %{type: :integer},
"reorder_level" => %{type: :integer},
"active" => %{type: :boolean},
"featured" => %{type: :boolean},
"tags" => %{type: {:array, :string}},
"metadata" => %{type: :jsonb},
"category_id" => %{type: :integer},
"supplier_id" => %{type: :integer},
"inserted_at" => %{type: :utc_datetime},
"updated_at" => %{type: :utc_datetime}
},
writable: [
"name",
"sku",
"description",
"price",
"cost",
"stock_quantity",
"reorder_level",
"active",
"featured",
"tags",
"metadata",
"category_id",
"supplier_id"
],
readonly: ["id", "inserted_at", "updated_at"],
required_on_insert: ["name", "sku", "price"]
}
order_domain = %{
source: Order,
columns: %{
"id" => %{type: :integer},
"order_number" => %{type: :string},
"status" => %{type: :string},
"subtotal" => %{type: :decimal},
"tax" => %{type: :decimal},
"shipping" => %{type: :decimal},
"discount" => %{type: :decimal},
"total" => %{type: :decimal},
"shipping_country" => %{type: :string},
"notes" => %{type: :string},
"customer_id" => %{type: :integer},
"inserted_at" => %{type: :utc_datetime},
"updated_at" => %{type: :utc_datetime}
},
writable: [
"order_number",
"status",
"subtotal",
"tax",
"shipping",
"discount",
"total",
"shipping_country",
"notes",
"customer_id"
],
readonly: ["id", "inserted_at", "updated_at"],
required_on_insert: ["order_number", "total", "customer_id"],
schemas: %{order_items: OrderItem, customer: Customer},
joins: %{
order_items: %{
source: "order_items",
owner_key: :id,
my_key: :order_id,
cardinality: :many,
config: %{writable: true}
},
customer: %{
source: "customers",
owner_key: :customer_id,
my_key: :id,
cardinality: :one,
config: %{writable: false}
}
},
hooks: %{
before_insert: [&Hooks.before_insert_order/1],
after_insert: [&Hooks.after_insert_order/1],
before_update: [&Hooks.before_update_order/2],
after_update: [&Hooks.after_update_order/1],
before_delete: [&Hooks.before_delete_order/1],
after_delete: [&Hooks.after_delete_order/1]
},
audit: %{enabled: true, table: "updato_audit_entries"}
}
order_snapshot_domain = %{
source: OrderSnapshot,
columns: %{
"id" => %{type: :integer},
"order_number" => %{type: :string},
"status" => %{type: :string},
"total" => %{type: :decimal},
"inserted_at" => %{type: :utc_datetime},
"updated_at" => %{type: :utc_datetime}
},
writable: ["order_number", "status", "total"],
readonly: ["id", "inserted_at", "updated_at"],
required_on_insert: ["order_number", "status", "total"]
}
soft_delete_domain = %{
source: SoftDeleteSample,
columns: %{
"id" => %{type: :integer},
"name" => %{type: :string},
"status" => %{type: :string},
"deleted_at" => %{type: :utc_datetime},
"inserted_at" => %{type: :utc_datetime},
"updated_at" => %{type: :utc_datetime}
},
writable: ["name", "status", "deleted_at"],
readonly: ["id", "inserted_at", "updated_at"],
required_on_insert: ["name", "status"],
soft_delete_field: "deleted_at"
}
%{run_id: run_id}
Insert, Returning, and with_changeset
base_customer_email = "#{run_id}-base@example.test"
vip_customer_email = "#{run_id}-vip@example.test"
{:ok, base_customer} =
SelectoUpdato.new(customer_domain)
|> SelectoUpdato.insert(%{
"name" => "Base #{run_id}",
"email" => base_customer_email,
"tier" => "standard",
"city" => "Boston",
"country" => "US",
"preferences" => %{"newsletter" => true},
"active" => true
})
|> SelectoUpdato.returning(["id", "name", "email", "tier"])
|> SelectoUpdato.execute(Repo)
base_customer_id = read_key.(base_customer, :id)
{:ok, vip_customer} =
SelectoUpdato.new(customer_domain)
|> SelectoUpdato.insert(%{
"name" => "Custom Changeset #{run_id}",
"email" => vip_customer_email,
"tier" => "standard",
"city" => "Denver",
"country" => "US",
"preferences" => %{},
"active" => true
})
|> SelectoUpdato.with_changeset(fn customer, attrs ->
customer
|> Customer.changeset(attrs)
|> Ecto.Changeset.put_change(:tier, "vip")
end)
|> SelectoUpdato.execute(Repo)
%{
base_customer: base_customer,
base_customer_id: base_customer_id,
vip_customer: vip_customer
}
insert_all
{:ok, inserted_products} =
SelectoUpdato.new(product_domain)
|> SelectoUpdato.insert_all([
%{
"name" => "Notebook A #{run_id}",
"sku" => "#{run_id}-SKU-A",
"price" => d.("10.00"),
"cost" => d.("5.00"),
"stock_quantity" => 40,
"reorder_level" => 5,
"active" => true,
"featured" => false,
"tags" => ["seed", "a"],
"metadata" => %{"run_id" => run_id}
},
%{
"name" => "Notebook B #{run_id}",
"sku" => "#{run_id}-SKU-B",
"price" => d.("20.00"),
"cost" => d.("11.00"),
"stock_quantity" => 25,
"reorder_level" => 4,
"active" => true,
"featured" => false,
"tags" => ["seed", "b"],
"metadata" => %{"run_id" => run_id}
}
])
|> SelectoUpdato.returning(["id", "sku", "name", "price"])
|> SelectoUpdato.execute(Repo)
product_rows =
inserted_products
|> Map.get(:records, inserted_products)
|> List.wrap()
product_one_id =
case Enum.at(product_rows, 0) do
nil -> Repo.one(from p in Product, where: p.sku == ^"#{run_id}-SKU-A", select: p.id)
row -> read_key.(row, :id)
end
product_two_id =
case Enum.at(product_rows, 1) do
nil -> Repo.one(from p in Product, where: p.sku == ^"#{run_id}-SKU-B", select: p.id)
row -> read_key.(row, :id)
end
%{
inserted_products: inserted_products,
product_one_id: product_one_id,
product_two_id: product_two_id
}
upsert, upsert_all, conflict_target, on_conflict
{:ok, upsert_customer} =
SelectoUpdato.new(customer_domain)
|> SelectoUpdato.upsert(%{
"name" => "Updated Base #{run_id}",
"email" => base_customer_email,
"tier" => "premium",
"city" => "Seattle",
"country" => "US",
"preferences" => %{"newsletter" => false},
"active" => true
})
|> SelectoUpdato.conflict_target(["email"])
|> SelectoUpdato.on_conflict({:replace, ["name", "tier", "city", "preferences"]})
|> SelectoUpdato.returning(["id", "name", "tier", "city"])
|> SelectoUpdato.execute(Repo)
{:ok, upsert_products} =
SelectoUpdato.new(product_domain)
|> SelectoUpdato.upsert_all([
%{
"name" => "Notebook A updated #{run_id}",
"sku" => "#{run_id}-SKU-A",
"price" => d.("13.00"),
"stock_quantity" => 41,
"tags" => ["seed", "a", "upsert"],
"metadata" => %{"run_id" => run_id, "version" => 2}
},
%{
"name" => "Notebook C #{run_id}",
"sku" => "#{run_id}-SKU-C",
"price" => d.("12.34"),
"stock_quantity" => 15,
"tags" => ["seed", "c"],
"metadata" => %{"run_id" => run_id}
}
])
|> SelectoUpdato.conflict_target(["sku"])
|> SelectoUpdato.on_conflict({:replace, ["name", "price", "stock_quantity", "tags", "metadata"]})
|> SelectoUpdato.execute(Repo)
product_three_id = Repo.one(from p in Product, where: p.sku == ^"#{run_id}-SKU-C", select: p.id)
%{
upsert_customer: upsert_customer,
upsert_products: upsert_products,
product_three_id: product_three_id
}
Nested insert, returning(:all), hooks, and audit
{:ok, nested_insert_result} =
SelectoUpdato.new(order_domain)
|> SelectoUpdato.insert(%{
"order_number" => "#{run_id}-ORD-1",
"status" => "pending",
"subtotal" => d.("30.00"),
"tax" => d.("3.00"),
"shipping" => d.("5.00"),
"discount" => d.("0.00"),
"total" => d.("38.00"),
"shipping_country" => "US",
"customer_id" => base_customer_id,
"order_items" => [
%{
"product_id" => product_one_id,
"quantity" => 2,
"unit_price" => d.("10.00"),
"discount" => d.("0.00"),
"line_total" => d.("20.00"),
"line_number" => 1
},
%{
"product_id" => product_two_id,
"quantity" => 1,
"unit_price" => d.("10.00"),
"discount" => d.("0.00"),
"line_total" => d.("10.00"),
"line_number" => 2
}
]
})
|> SelectoUpdato.returning(:all)
|> SelectoUpdato.audit(actor: "livebook", reason: "create nested order")
|> SelectoUpdato.execute(Repo)
order = nested_insert_result.order
first_order_item = List.first(nested_insert_result.order_items)
hook_events = Agent.get(:selecto_updato_hook_log, &Enum.reverse/1)
audit_rows =
Ecto.Adapters.SQL.query!(
Repo,
"SELECT operation, source_table, reason FROM updato_audit_entries WHERE record_id = $1 ORDER BY id DESC LIMIT 1",
[order.id]
).rows
%{
nested_insert_result: nested_insert_result,
hook_events: hook_events,
audit_rows: audit_rows
}
update with expression tuples, grouped filters, and nested_strategy(:sync)
{:ok, updated_product_one} =
SelectoUpdato.new(product_domain)
|> SelectoUpdato.filters([
{:and, [
{"id", product_one_id},
{:or, [{"active", true}, {"active", false}]}
]}
])
|> SelectoUpdato.update(%{
stock_quantity: {:increment, 5},
price: {:decrement, d.("1.00")},
reorder_level: {:set, 3},
tags: {:array_append, "promo"},
metadata: {:json_merge, %{"campaign" => run_id}},
description: {:coalesce, "description filled by coalesce"}
})
|> SelectoUpdato.execute(Repo)
{:ok, _prepend_tags} =
SelectoUpdato.new(product_domain)
|> SelectoUpdato.filter({"id", product_one_id})
|> SelectoUpdato.update(%{tags: {:array_prepend, "first-tag"}})
|> SelectoUpdato.execute(Repo)
{:ok, _remove_tags} =
SelectoUpdato.new(product_domain)
|> SelectoUpdato.filter({"id", product_one_id})
|> SelectoUpdato.update(%{tags: {:array_remove, "promo"}})
|> SelectoUpdato.execute(Repo)
{:ok, nested_update_result} =
SelectoUpdato.new(order_domain)
|> SelectoUpdato.filter({"id", order.id})
|> SelectoUpdato.update(%{
"status" => "processing",
"order_items" => [
%{
"id" => first_order_item.id,
"product_id" => product_one_id,
"quantity" => 5,
"unit_price" => d.("10.00"),
"discount" => d.("0.00"),
"line_total" => d.("50.00"),
"line_number" => 1
},
%{
"product_id" => product_three_id,
"quantity" => 1,
"unit_price" => d.("12.34"),
"discount" => d.("0.00"),
"line_total" => d.("12.34"),
"line_number" => 2
}
],
"shipping" => Decimal.add(order.shipping || d.("0.00"), d.("2.00"))
})
|> SelectoUpdato.nested_strategy(:order_items, :sync)
|> SelectoUpdato.execute(Repo)
order_item_count = Repo.aggregate(from(i in OrderItem, where: i.order_id == ^order.id), :count)
%{
updated_product_one: updated_product_one,
nested_update_result: nested_update_result,
order_item_count_after_sync: order_item_count
}
dry_run and to_sql
dry_run_insert_error =
SelectoUpdato.new(customer_domain)
|> SelectoUpdato.insert(%{"name" => "Invalid #{run_id}"})
|> SelectoUpdato.dry_run(Repo)
dry_run_update_ok =
SelectoUpdato.new(product_domain)
|> SelectoUpdato.filter({"id", product_one_id})
|> SelectoUpdato.update(%{stock_quantity: {:increment, 2}})
|> SelectoUpdato.dry_run(Repo)
dry_run_update_missing_filter =
SelectoUpdato.new(product_domain)
|> SelectoUpdato.update(%{"stock_quantity" => 1})
|> SelectoUpdato.dry_run(Repo)
sql_preview =
SelectoUpdato.new(order_domain)
|> SelectoUpdato.filter({:and, [
{"status", "processing"},
{"order_number", {:ilike, "#{run_id}%"}}
]})
|> SelectoUpdato.update(%{"status" => "shipped"})
|> SelectoUpdato.to_sql()
%{
dry_run_insert_error: dry_run_insert_error,
dry_run_update_ok: dry_run_update_ok,
dry_run_update_missing_filter: dry_run_update_missing_filter,
sql_preview: sql_preview
}
insert_from_query and filter_from_selecto
order_selecto_domain = %{
source: %{
source_table: "orders",
primary_key: :id,
fields: [:id, :order_number, :status, :total],
columns: %{
id: %{type: :integer},
order_number: %{type: :string},
status: %{type: :string},
total: %{type: :decimal}
}
},
schemas: %{},
joins: %{}
}
snapshot_selecto_domain = %{
source: %{
source_table: "updato_order_snapshots",
primary_key: :id,
fields: [:id, :order_number, :status, :total],
columns: %{
id: %{type: :integer},
order_number: %{type: :string},
status: %{type: :string},
total: %{type: :decimal}
}
},
schemas: %{},
joins: %{}
}
source_query =
Selecto.configure(order_selecto_domain, Repo)
|> Selecto.select(["order_number", "status", "total"])
|> Selecto.filter({"order_number", order.order_number})
{:ok, insert_from_query_result} =
SelectoUpdato.new(order_snapshot_domain)
|> SelectoUpdato.insert_from_query(source_query)
|> SelectoUpdato.execute(Repo)
snapshot_filter_query =
Selecto.configure(snapshot_selecto_domain, Repo)
|> Selecto.filter({"order_number", {:ilike, "#{run_id}%"}})
|> Selecto.filter({"status", "processing"})
extract_selecto_filters = fn selecto_query ->
legacy_filters = Map.get(selecto_query, :filters, [])
selecto_set =
case Map.get(selecto_query, :set) do
%{} = set -> set
_ -> %{}
end
set_filters = Map.get(selecto_set, :filtered) || Map.get(selecto_set, :filters) || []
post_pivot_filters = Map.get(selecto_set, :post_pivot_filters) || []
[legacy_filters, set_filters, post_pivot_filters]
|> Enum.flat_map(fn
filters when is_list(filters) -> filters
_ -> []
end)
end
snapshot_filters = extract_selecto_filters.(snapshot_filter_query)
snapshot_update_op =
SelectoUpdato.new(order_snapshot_domain)
|> SelectoUpdato.filter_from_selecto(snapshot_filter_query)
snapshot_update_op =
if snapshot_update_op.filters == [] do
SelectoUpdato.filters(snapshot_update_op, snapshot_filters)
else
snapshot_update_op
end
{:ok, filter_from_selecto_result} =
snapshot_update_op
|> SelectoUpdato.update(%{"status" => "copied"})
|> SelectoUpdato.execute(Repo)
snapshots_after_copy =
Repo.all(
from s in OrderSnapshot,
where: ilike(s.order_number, ^"#{run_id}%"),
order_by: [desc: s.id],
select: %{id: s.id, order_number: s.order_number, status: s.status, total: s.total}
)
%{
insert_from_query_result: insert_from_query_result,
filter_from_selecto_result: filter_from_selecto_result,
snapshots_after_copy: snapshots_after_copy
}
execute, execute_in_transaction, transaction, and envelope mode
{:ok, envelope_result} =
SelectoUpdato.new(order_snapshot_domain)
|> SelectoUpdato.insert(%{
"order_number" => "#{run_id}-ENV",
"status" => "envelope",
"total" => d.("1.23")
})
|> SelectoUpdato.execute(Repo, result: :envelope)
tx_execute_result =
SelectoUpdato.new(order_snapshot_domain)
|> SelectoUpdato.insert(%{
"order_number" => "#{run_id}-TX-EXEC",
"status" => "tx-exec",
"total" => d.("2.34")
})
|> SelectoUpdato.execute_in_transaction(Repo)
transaction_result =
SelectoUpdato.transaction(Repo, fn ->
{:ok, tx_row} =
SelectoUpdato.new(order_snapshot_domain)
|> SelectoUpdato.insert(%{
"order_number" => "#{run_id}-TX-WRAP",
"status" => "tx",
"total" => d.("3.45")
})
|> SelectoUpdato.execute(Repo)
SelectoUpdato.new(order_snapshot_domain)
|> SelectoUpdato.filter({"id", tx_row.id})
|> SelectoUpdato.update(%{"status" => "tx-updated"})
|> SelectoUpdato.execute(Repo)
end)
%{
envelope_result: envelope_result,
tx_execute_result: tx_execute_result,
transaction_result: transaction_result
}
delete, confirm_bulk_delete, cascade, and soft_delete
{:ok, delete_target_result} =
SelectoUpdato.new(order_domain)
|> SelectoUpdato.insert(%{
"order_number" => "#{run_id}-ORD-DEL",
"status" => "pending",
"subtotal" => d.("10.00"),
"tax" => d.("1.00"),
"shipping" => d.("1.00"),
"discount" => d.("0.00"),
"total" => d.("12.00"),
"shipping_country" => "US",
"customer_id" => base_customer_id,
"order_items" => [
%{
"product_id" => product_one_id,
"quantity" => 1,
"unit_price" => d.("10.00"),
"discount" => d.("0.00"),
"line_total" => d.("10.00"),
"line_number" => 1
}
]
})
|> SelectoUpdato.execute(Repo)
delete_order_id = delete_target_result.order.id
restrict_delete_result =
SelectoUpdato.new(order_domain)
|> SelectoUpdato.filter({"id", delete_order_id})
|> SelectoUpdato.delete()
|> SelectoUpdato.cascade(:order_items, :restrict)
|> SelectoUpdato.execute(Repo)
cascade_delete_result =
SelectoUpdato.new(order_domain)
|> SelectoUpdato.filter({"id", delete_order_id})
|> SelectoUpdato.delete()
|> SelectoUpdato.cascade(:order_items, :delete)
|> SelectoUpdato.execute(Repo)
{:ok, _bulk_seed} =
SelectoUpdato.new(order_snapshot_domain)
|> SelectoUpdato.insert_all([
%{"order_number" => "#{run_id}-BULK-1", "status" => "bulk-prune", "total" => d.("1.00")},
%{"order_number" => "#{run_id}-BULK-2", "status" => "bulk-prune", "total" => d.("2.00")}
])
|> SelectoUpdato.execute(Repo)
bulk_unconfirmed_result =
SelectoUpdato.new(order_snapshot_domain)
|> SelectoUpdato.filter({"status", "bulk-prune"})
|> SelectoUpdato.delete()
|> SelectoUpdato.execute(Repo)
bulk_confirmed_result =
SelectoUpdato.new(order_snapshot_domain)
|> SelectoUpdato.filter({"status", "bulk-prune"})
|> SelectoUpdato.delete()
|> SelectoUpdato.confirm_bulk_delete(true)
|> SelectoUpdato.execute(Repo)
{:ok, soft_sample} =
SelectoUpdato.new(soft_delete_domain)
|> SelectoUpdato.insert(%{"name" => "Soft #{run_id}", "status" => "active"})
|> SelectoUpdato.execute(Repo)
{:ok, _soft_deleted} =
SelectoUpdato.new(soft_delete_domain)
|> SelectoUpdato.filter({"id", soft_sample.id})
|> SelectoUpdato.soft_delete()
|> SelectoUpdato.execute(Repo)
soft_sample_after = Repo.get!(SoftDeleteSample, soft_sample.id)
%{
restrict_delete_result: restrict_delete_result,
cascade_delete_result: cascade_delete_result,
bulk_unconfirmed_result: bulk_unconfirmed_result,
bulk_confirmed_result: bulk_confirmed_result,
soft_sample_after: soft_sample_after
}
Ecto.Multi integration
multi_result =
SelectoUpdato.multi()
|> SelectoUpdato.multi_insert(:multi_customer, customer_domain, %{
"name" => "Multi #{run_id}",
"email" => "#{run_id}-multi@example.test",
"tier" => "standard",
"city" => "Austin",
"country" => "US",
"preferences" => %{},
"active" => true
})
|> SelectoUpdato.multi_insert_all(:multi_snapshots, order_snapshot_domain, fn %{multi_customer: _customer} ->
[
%{"order_number" => "#{run_id}-MULTI-1", "status" => "multi_cleanup", "total" => d.("4.00")},
%{"order_number" => "#{run_id}-MULTI-2", "status" => "multi_keep", "total" => d.("5.00")}
]
end)
|> SelectoUpdato.multi_update(:promote_base_customer, customer_domain,
filter: [{"id", base_customer_id}],
changes: %{tier: "vip"}
)
|> SelectoUpdato.multi_delete(:delete_multi_cleanup, order_snapshot_domain,
filter: [{"status", "multi_cleanup"}]
)
|> SelectoUpdato.multi_run(:summary, fn _repo, changes ->
{:ok, Map.keys(changes)}
end)
|> SelectoUpdato.run(Repo)
multi_result
Optional cleanup
Run this cell if you want to remove rows created by this notebook run.
Repo.delete_all(from s in SoftDeleteSample, where: ilike(s.name, ^"%#{run_id}%"))
Repo.delete_all(from s in OrderSnapshot, where: ilike(s.order_number, ^"#{run_id}%"))
Repo.delete_all(from o in Order, where: ilike(o.order_number, ^"#{run_id}%"))
Repo.delete_all(from p in Product, where: ilike(p.sku, ^"#{run_id}%"))
Repo.delete_all(from c in Customer, where: ilike(c.email, ^"#{run_id}%"))
Ecto.Adapters.SQL.query!(
Repo,
"DELETE FROM updato_audit_entries WHERE changes::text ILIKE $1 OR reason ILIKE $2",
["%#{run_id}%", "%#{run_id}%"]
)
:ok