Powered by AppSignal & Oban Pro

ExDatalog v0.5.0 — DSL Edition

livebooks/ex_datalog_v050.livemd

ExDatalog v0.5.0 — DSL Edition

Section

A walkthrough of v0.5.0 features using the Schema DSL (use ExDatalog.Schema).

Mix.install([
    {:ex_datalog, path: Path.expand("..", __DIR__), env: :prod},
])
:ok

Introduction

ExDatalog v0.5.0 adds four major features:

  • Aggregatescount, sum, min, max over grouped bindings
  • BEAM callbacks — call deterministic Elixir functions from rule bodies
  • Magic sets — goal-directed evaluation that computes only relevant facts
  • Query planner — inspect and explain evaluation plans before running them

All examples below use the Schema DSL exclusively.


Setup

alias ExDatalog.{Knowledge, MagicSets, Planner}
[ExDatalog.Knowledge, ExDatalog.MagicSets, ExDatalog.Planner]

1. Aggregates

Aggregates compute a single value per group of bindings. In the DSL they appear in the rule body as count(X, N), sum(A, T), min(S, V), max(S, V). The result variable (N, T, V) must also appear in the rule head.

Restrictions (enforced at validation):

  • Only one aggregate per rule.
  • No aggregate over a self-recursive relation.
  • The aggregate input variable must be bound by a positive body atom.

Count — employees per department

defmodule DeptCount do
  use ExDatalog.Schema

  relation :emp do
    field(:name, :atom)
    field(:dept, :atom)
  end

  relation :dept_count do
    field(:dept, :atom)
    field(:n, :integer)
  end

  fact(emp(:alice, :eng))
  fact(emp(:bob, :eng))
  fact(emp(:carol, :ops))
  fact(emp(:dave, :eng))
  fact(emp(:eve, :ops))

  rule dept_count(D, N) do
    emp(E, D)
    count(E, N)
  end
end

{:ok, knowledge} = DeptCount.materialize()
Knowledge.get(knowledge, "dept_count") |> MapSet.to_list() |> Enum.sort()
[eng: 3, ops: 2]

Sum — total salary per department

defmodule DeptTotal do
  use ExDatalog.Schema

  relation :salary do
    field(:name, :atom)
    field(:dept, :atom)
    field(:amount, :integer)
  end

  relation :dept_total do
    field(:dept, :atom)
    field(:total, :integer)
  end

  fact(salary(:alice, :eng, 120))
  fact(salary(:bob, :eng, 80))
  fact(salary(:carol, :ops, 50))
  fact(salary(:dave, :eng, 100))

  rule dept_total(D, T) do
    salary(E, D, A)
    sum(A, T)
  end
end

{:ok, knowledge} = DeptTotal.materialize()
Knowledge.get(knowledge, "dept_total") |> MapSet.to_list() |> Enum.sort()
[eng: 300, ops: 50]

Min and Max

defmodule DeptScores do
  use ExDatalog.Schema

  relation :score do
    field(:name, :atom)
    field(:dept, :atom)
    field(:value, :integer)
  end

  relation :lowest do
    field(:dept, :atom)
    field(:value, :integer)
  end

  relation :highest do
    field(:dept, :atom)
    field(:value, :integer)
  end

  fact(score(:alice, :eng, 90))
  fact(score(:bob, :eng, 70))
  fact(score(:carol, :ops, 60))
  fact(score(:dave, :eng, 85))
  fact(score(:eve, :ops, 55))

  rule lowest(D, V) do
    score(E, D, S)
    min(S, V)
  end

  rule highest(D, V) do
    score(E, D, S)
    max(S, V)
  end
end

{:ok, knowledge} = DeptScores.materialize()

IO.puts("Lowest per dept:")
Knowledge.get(knowledge, "lowest") |> MapSet.to_list() |> Enum.sort() |> IO.inspect()

IO.puts("\nHighest per dept:")
Knowledge.get(knowledge, "highest") |> MapSet.to_list() |> Enum.sort() |> IO.inspect()
Lowest per dept:
[eng: 70, ops: 55]

Highest per dept:
[eng: 90, ops: 60]
[eng: 90, ops: 60]

Aggregate with a filter constraint

Constraints are evaluated before grouping. Count only passing scores (≥ 60):

defmodule PassingCount do
  use ExDatalog.Schema

  relation :score do
    field(:name, :atom)
    field(:dept, :atom)
    field(:value, :integer)
  end

  relation :passing_count do
    field(:dept, :atom)
    field(:n, :integer)
  end

  fact(score(:alice, :eng, 90))
  fact(score(:bob, :eng, 40))
  fact(score(:carol, :eng, 75))
  fact(score(:dave, :ops, 30))
  fact(score(:eve, :ops, 80))

  rule passing_count(D, N) do
    score(E, D, S)
    gte(S, 60)
    count(E, N)
  end
end

{:ok, knowledge} = PassingCount.materialize()
Knowledge.get(knowledge, "passing_count") |> MapSet.to_list() |> Enum.sort()
[eng: 2, ops: 1]

Aggregate over a derived relation (stratification)

When an aggregate reads from a derived relation, the engine automatically places it in a higher stratum — the derived facts must be fully computed first.

defmodule StratifiedAggregate do
  use ExDatalog.Schema

  relation :base do
    field(:key, :atom)
    field(:val, :atom)
  end

  relation :mid do
    field(:key, :atom)
    field(:val, :atom)
  end

  relation :mid_count do
    field(:key, :atom)
    field(:n, :integer)
  end

  fact(base(:a, :x))
  fact(base(:a, :y))
  fact(base(:b, :z))

  rule mid(K, V) do
    base(K, V)
  end

  rule mid_count(K, N) do
    mid(K, V)
    count(V, N)
  end
end

{:ok, knowledge} = StratifiedAggregate.materialize()
Knowledge.get(knowledge, "mid_count") |> MapSet.to_list() |> Enum.sort()
[a: 2, b: 1]

mid is derived first (stratum 0), then the aggregate counts over it (stratum 1).


2. BEAM Callbacks

BEAM callbacks let rule bodies invoke deterministic, side-effect-free Elixir functions. Declare a predicate with the predicate macro, then use its name in a rule body.

Two kinds:

  • Boolean (:boolean) — returns true to keep the binding, false to drop it.
  • Value (:value) — the last argument in the rule-body call is the result variable; the function’s return value is bound to it.

The engine enforces timeout (default 100 ms, configurable via :callback_timeout_ms) and exception isolation — a callback that times out or raises simply filters the binding.

Boolean filter predicate

First define a helper module with the predicate function:

defmodule MyPredicates do
  def adult?(age), do: age >= 18
  def valid_email?(email), do: String.contains?(email, "@")
end
{:module, MyPredicates, <<70, 79, 82, 49, 0, 0, 7, ...>>, ...}

Now use predicate in the schema:

defmodule AdultOnly do
  use ExDatalog.Schema

  relation :person do
    field(:name, :atom)
    field(:age, :integer)
  end

  relation :adult do
    field(:name, :atom)
  end

  predicate(:adult?, MyPredicates, :adult?, [:integer], :boolean)

  fact(person(:alice, 25))
  fact(person(:bob, 12))
  fact(person(:carol, 19))

  rule adult(Name) do
    person(Name, Age)
    adult?(Age)
  end
end

{:ok, knowledge} = AdultOnly.materialize()
Knowledge.get(knowledge, "adult") |> MapSet.to_list() |> Enum.sort()
[{:alice}, {:carol}]

Only :alice and :carol pass — :bob is 12, so adult? returns false.

Value-returning predicate

The last argument in the rule-body call is the result variable:

defmodule MyMath do
  def double(x), do: x * 2
end

defmodule DoubledNumbers do
  use ExDatalog.Schema

  relation :num do
    field(:x, :integer)
  end

  relation :doubled do
    field(:x, :integer)
    field(:y, :integer)
  end

  predicate(:double, MyMath, :double, [:integer], :value)

  fact(num(3))
  fact(num(5))
  fact(num(7))

  rule doubled(X, Y) do
    num(X)
    double(X, Y)
  end
end

{:ok, knowledge} = DoubledNumbers.materialize()
Knowledge.get(knowledge, "doubled") |> MapSet.to_list() |> Enum.sort()
[{3, 6}, {5, 10}, {7, 14}]

Exception and timeout isolation

Callbacks that raise or exceed the timeout are silently filtered — the evaluator never crashes.

defmodule UnsafePredicates do
  def boom(_x), do: raise("exploded")
  def slow(_x), do: Process.sleep(500) && true
end

defmodule BoomTest do
  use ExDatalog.Schema

  relation :num do
    field(:x, :integer)
  end

  relation :ok do
    field(:x, :integer)
  end

  predicate(:boom, UnsafePredicates, :boom, [:integer], :boolean)

  fact(num(1))

  rule ok(X) do
    num(X)
    boom(X)
  end
end

# Raising callback — all bindings filtered
{:ok, knowledge} = BoomTest.materialize()
Knowledge.size(knowledge, "ok")
0
defmodule SlowTest do
  use ExDatalog.Schema

  relation :num do
    field(:x, :integer)
  end

  relation :ok do
    field(:x, :integer)
  end

  predicate(:slow, UnsafePredicates, :slow, [:integer], :boolean)

  fact(num(1))

  rule ok(X) do
    num(X)
    slow(X)
  end
end

# Slow callback — filtered by 50 ms timeout
{:ok, knowledge} = SlowTest.materialize(callback_timeout_ms: 50)
Knowledge.size(knowledge, "ok")
0

3. Magic Sets — Goal-Directed Evaluation

Magic sets is a program transformation that rewrites rules so bottom-up evaluation computes only the facts relevant to a specific query goal, instead of the full least fixpoint.

Scope (v0.5.0, experimental):

  • Positive recursive programs only (no negation, no aggregates in recursive rules).
  • A single goal with at least one bound position.
  • Programs outside scope fall back to full semi-naive evaluation — never incorrect results.

Full fixpoint vs goal-restricted

defmodule AncestorChain do
  use ExDatalog.Schema

  relation :parent do
    field(:from, :atom)
    field(:to, :atom)
  end

  relation :ancestor do
    field(:from, :atom)
    field(:to, :atom)
  end

  fact(parent(:a, :b))
  fact(parent(:b, :c))
  fact(parent(:c, :d))
  fact(parent(:d, :e))

  rule ancestor(X, Y) do
    parent(X, Y)
  end

  rule ancestor(X, Z) do
    parent(X, Y)
    ancestor(Y, Z)
  end
end

# Full semi-naive: all ancestor pairs
{:ok, full} = AncestorChain.materialize()
full_ancestors = Knowledge.get(full, "ancestor") |> MapSet.to_list() |> Enum.sort()

IO.puts("Full semi-naive (#{length(full_ancestors)} facts):")
IO.inspect(full_ancestors)
Full semi-naive (10 facts):
[a: :b, a: :c, a: :d, a: :e, b: :c, b: :d, b: :e, c: :d, c: :e, d: :e]
[a: :b, a: :c, a: :d, a: :e, b: :c, b: :d, b: :e, c: :d, c: :e, d: :e]

Now with magic sets — goal: “who are the ancestors of :a?”

{:ok, magic} =
  AncestorChain.materialize(strategy: :magic_sets, goal: {"ancestor", [:a, :_]})

magic_ancestors =
  Knowledge.match(magic, "ancestor", [:a, :_]) |> MapSet.to_list() |> Enum.sort()

IO.puts("Magic sets (#{length(magic_ancestors)} facts for goal {ancestor, [:a, :_]}):")
IO.inspect(magic_ancestors)

# Verify: magic result is exactly the subset of full results where first arg is :a
expected = full_ancestors |> Enum.filter(fn {x, _y} -> x == :a end)
magic_ancestors == expected
Magic sets (4 facts for goal {ancestor, [:a, :_]}):
[a: :b, a: :c, a: :d, a: :e]
true

Inspecting the transformation directly

Call MagicSets.transform/2 directly on compiled IR:

{:ok, ir} = ExDatalog.compile(AncestorChain.program())
{:ok, transformed} = MagicSets.transform(ir, {"ancestor", [:a, :_]})

# The magic relation
magic = Enum.find(transformed.relations, fn r -> r.name == "magic_ancestor_bf" end)
IO.puts("Magic relation: #{magic.name}, arity: #{magic.arity}")

# The seed fact
seed = Enum.find(transformed.facts, fn f -> f.relation == "magic_ancestor_bf" end)
IO.puts("Seed fact: #{inspect(seed.values)}")

# Rewritten rules: ancestor rules now start with the magic predicate
ancestor_rules = Enum.filter(transformed.rules, fn r -> r.head.relation == "ancestor" end)
Enum.each(ancestor_rules, fn r ->
  first_body = hd(r.body)
  IO.puts("Rule head: ancestor, first body atom: #{inspect(elem(first_body, 1).relation)}")
end)
Magic relation: magic_ancestor_bf, arity: 1
Seed fact: [atom: :a]
Rule head: ancestor, first body atom: "magic_ancestor_bf"
Rule head: ancestor, first body atom: "magic_ancestor_bf"
:ok

The adornment bf means the first position is bound (:a) and the second is free (:_). The magic relation magic_ancestor_bf has arity 1 (just the bound position).

Fallback cases

When the program is outside the supported scope, the engine falls back to semi-naive without error:

# All-free goal falls back
{:ok, ms_free} =
  AncestorChain.materialize(strategy: :magic_sets, goal: {"ancestor", [:_, :_]})

Knowledge.get(ms_free, "ancestor") == Knowledge.get(full, "ancestor")
true

4. Runtime Facts (Schema.new + Program.add_fact)

For data-driven workflows where facts come from external sources at runtime, use Schema.new/0 to get a blank program (relations + rules, no facts) and pipe facts in via Program.add_fact/2:

defmodule DeptRuntimeCount do
  use ExDatalog.Schema

  relation :emp do
    field(:name, :atom)
    field(:dept, :atom)
  end

  relation :dept_count do
    field(:dept, :atom)
    field(:n, :integer)
  end

  rule dept_count(D, N) do
    emp(E, D)
    count(E, N)
  end
end

program =
  DeptRuntimeCount.new()
  |> ExDatalog.Program.add_fact(DeptRuntimeCount.emp(:alice, :eng))
  |> ExDatalog.Program.add_fact(DeptRuntimeCount.emp(:bob, :eng))
  |> ExDatalog.Program.add_fact(DeptRuntimeCount.emp(:carol, :ops))

{:ok, knowledge} = ExDatalog.Program.materialize(program)
Knowledge.get(knowledge, "dept_count") |> MapSet.to_list() |> Enum.sort()
[eng: 2, ops: 1]

Bulk add_facts

facts = [
  DeptRuntimeCount.emp(:dave, :eng),
  DeptRuntimeCount.emp(:eve, :ops),
  DeptRuntimeCount.emp(:frank, :eng)
]

program =
  DeptRuntimeCount.new()
  |> ExDatalog.Program.add_fact(DeptRuntimeCount.emp(:alice, :eng))
  |> ExDatalog.Program.add_facts(facts)

{:ok, knowledge} = ExDatalog.Program.materialize(program)
Knowledge.get(knowledge, "dept_count") |> MapSet.to_list() |> Enum.sort()
[eng: 3, ops: 1]

5. Query Planner

The planner sits between the compiled IR and the evaluation engine. It produces an ExDatalog.Planner.Plan describing the chosen strategy, strata, joins, and predicates.

Plan

defmodule TransitivePath do
  use ExDatalog.Schema

  relation :edge do
    field(:from, :atom)
    field(:to, :atom)
  end

  relation :path do
    field(:from, :atom)
    field(:to, :atom)
  end

  rule path(X, Y) do
    edge(X, Y)
  end

  rule path(X, Z) do
    edge(X, Y)
    path(Y, Z)
  end
end

IO.puts(Planner.explain_plan(TransitivePath.program()))
Strategy: semi_naive
  Stratum 0: 2 rule(s), relations: path, edge
Joins: 3
Predicates: 0
:ok

Plan with magic-sets strategy

IO.puts(Planner.explain_plan(TransitivePath.program(), strategy: :magic_sets, goal: {"path", [:a, :_]}))
Strategy: magic_sets
  Stratum 0: 2 rule(s), relations: path, edge
Joins: 3
Predicates: 0
:ok

Plan with aggregates

The planner classifies predicates by kind (:comparison, :arithmetic, :aggregate, :callback, etc.) and shows them in the explain output:

IO.puts(Planner.explain_plan(DeptCount.program()))
Strategy: semi_naive
  Stratum 0: 0 rule(s), relations: emp
  Stratum 1: 1 rule(s), relations: dept_count
Joins: 1
Predicates: 1 (count)
:ok

Plan with callbacks

IO.puts(Planner.explain_plan(AdultOnly.program()))
Strategy: semi_naive
  Stratum 0: 1 rule(s), relations: adult, person
Joins: 1
Predicates: 1 (callback)
:ok

Summary

Feature DSL Syntax
Count count(E, N) in rule body, N in head
Sum sum(A, T) in rule body, T in head
Min min(S, V) in rule body, V in head
Max max(S, V) in rule body, V in head
Filter agg gte(S, 60) before count(E, N) in body
Boolean CB predicate :name, Mod, :fun, [:type], :boolean then name(Arg)
Value CB predicate :name, Mod, :fun, [:type], :value then name(Arg, Result)
CB timeout materialize(callback_timeout_ms: 50)
Magic sets materialize(strategy: :magic_sets, goal: {"rel", [:a, :_]})
Plan Planner.plan(program) or Planner.plan(program, strategy: ..., goal: ...)
Explain plan Planner.explain_plan(program) or Planner.explain_plan(program, opts)