Embrace Non-determinism with Evals
Mix.install([
{:text_chunker, "~> 0.3.1"},
{:explorer, "~> 0.5"},
{:instructor, "~> 0.1"},
{:req, "~> 0.5"},
{:postgrex, "~> 0.19.1"},
{:pgvector, "~> 0.3.0"},
{:ecto_sql, "~> 3.12"},
{:kino, "~> 0.15"},
{:ecto_psql_extras, "~> 0.8"}
])
Initialization and Setup
Setup pgvector extension and explorer
alias Explorer.DataFrame
alias Explorer.Series
require Explorer.DataFrame
Postgrex.Types.define(
RAG.PostgrexTypes,
Pgvector.extensions() ++ Ecto.Adapters.Postgres.extensions(),
[]
)
Dataset
Load the dataset
df =
Kino.FS.file_path("movies.json")
|> DataFrame.from_ndjson!()
df = DataFrame.mutate(df,
release_date: Series.cast(release_date, :date)
)
df =
df
|> DataFrame.filter(year(release_date) >= 2020)
|> DataFrame.filter(not(adult))
|> DataFrame.filter(status == "Released")
|> DataFrame.filter(lengths(split(overview, " ")) > 5)
|> DataFrame.sort_by(desc: popularity)
stream =
DataFrame.to_rows_stream(df, atom_keys: true, chunk_size: 500)
Setup Ecto
defmodule RAG.Repo do
use Ecto.Repo,
otp_app: :rag,
adapter: Ecto.Adapters.Postgres
end
Application.put_env(:rag, RAG.Repo, types: RAG.PostgrexTypes)
Kino.start_child!({RAG.Repo, url: "postgresql://postgres:postgres@postgresql/rag_demo"})
RAG.Repo.query!("CREATE EXTENSION IF NOT EXISTS vector;")
Setup Models module
defmodule RAG.Models do
@client Req.new(base_url: "https://models.arrakis.upmaru.network")
@instructor_config [
adapter: Instructor.Adapters.Llamacpp,
api_url: "https://models.arrakis.upmaru.network"
]
defmodule Query do
use Ecto.Schema
use Instructor
@llm_doc """
Query classification
## Fields:
- reply: Should the reply be list of results or a recommendation?
- domain: Which domain should we search in? description or location?
- improved_query: An improved version of the query that will provide a better answer.
"""
@primary_key false
embedded_schema do
field :reply, Ecto.Enum, values: [:results, :recommendation]
field :domain, Ecto.Enum, values: [:description, :location]
field :improved_query, :string
end
def new(text, n \\ 1, config) do
prompt = """
Correctly identify what domain the query is related to and what type of reply should be made.
Also provide a better quality question that will give better results.
#{text}
"""
results =
Task.async_stream(1..n, fn _i ->
{:ok, output} = Instructor.chat_completion([
model: "mistral-small-24b",
mode: :json_schema,
response_model: __MODULE__,
messages: [
%{role: "user", content: prompt}
]
], config)
output
end, timeout: 15_000)
|> Enum.to_list()
if n == 1 do
{:ok, result} = List.first(results)
result
else
results
end
end
end
defmodule Location do
use Ecto.Schema
use Instructor
@llm_doc """
Output of the movie setting based on a given description.
## Fields:
- description: A description about the setting of the movie in a single paragraph.
- reason: A brief description of the reason why the model thinks the movie took place in those locations.
- locations: Potential locations the movie takes place.
"""
@primary_key false
embedded_schema do
field :description, :string
field :reason, :string
field :locations, {:array, :string}
end
def new(text, config) do
Instructor.chat_completion([
model: "mistral-small-24b",
mode: :json_schema,
response_model: __MODULE__,
messages: [
%{
role: "user",
content: """
Make a best estimate based on the description below on the type of setting the movie is likely to take place, whether it takes place in space, forest, jungle, tundra, desert, in the ocean, or futuristic city?
#{text}
"""
}
]
], config)
|> case do
{:ok, setting} -> setting
error -> error
end
end
end
def client, do: @client
def embedding(text) do
body = %{
input: text,
model: "multilingual-e5-large",
encoding_format: "float"
}
{:ok, %{body: response}} = Req.post(@client, url: "/v1/embeddings", json: body)
%{"data" => [%{"embedding" => embedding}]} = response
embedding
end
def completion(prompt) do
body = %{
model: "mistral-small-24b",
messages: [
%{role: "user", content: prompt}
]
}
{:ok, response} = Req.post(@client, url: "/v1/chat/completions", json: body)
%{body: %{"choices" => choices}} = response
%{"message" => %{"content" => content}} = List.first(choices)
content
end
def rerank(query, documents) do
body = %{
query: query,
documents: documents,
model: "bge-reranker-v2-m3"
}
{:ok, %{body: %{"results" => reranking_results}}} =
Req.post(@client, url: "/v1/rerank", json: body)
reranking_results
|> Enum.sort_by(fn result ->
result["index"]
end)
end
defdelegate to_location(text, config \\ @instructor_config), to: Location, as: :new
defdelegate to_query(text, n \\ 1, config \\ @instructor_config), to: Query, as: :new
end
Setup CItext
defmodule RAG.Migrations.EnableCIText do
use Ecto.Migration
def up do
execute """
CREATE EXTENSION IF NOT EXISTS citext
"""
end
def down do
execute """
DROP EXTENSION citext
"""
end
end
Ecto.Migrator.up(RAG.Repo, 0, RAG.Migrations.EnableCIText)
Setup Movie Schema
Kino.Mermaid.new("""
erDiagram
MOVIES {
int id
string title
string overview
date release_date
}
MOVIE_CHUNKS {
int id
int movie_id
string content
vector embedding
}
MOVIES ||--o{ MOVIE_CHUNKS : "has many"
""")
defmodule RAG.Migrations.CreateMovies do
use Ecto.Migration
def up do
create table(:movies) do
add :title, :string, null: false
add :status, :string, null: false
add :genres, {:array, :string}, default: [], null: false
add :overview, :text, null: false
add :collection, :string
add :release_date, :date
add :rating, :decimal, null: false
add :popularity, :decimal, null: false
add :votes_count, :integer, default: 0
add :production_companies, {:array, :string}, default: [], null: false
timestamps()
end
create index(:movies, [:title], unique: true)
end
def down do
drop(table(:movies))
end
end
defmodule RAG.Movie do
use Ecto.Schema
import Ecto.Changeset
schema "movies" do
field :title, :string
field :status, :string
field :genres, {:array, :string}
field :overview, :string
field :collection, :string
field :release_date, :date
field :rating, :decimal
field :popularity, :decimal
field :votes_count, :integer
field :production_companies, {:array, :string}
timestamps()
end
def changeset(movie, attrs) do
movie
|> cast(attrs, [
:title,
:genres,
:status,
:overview,
:collection,
:release_date,
:rating,
:popularity,
:votes_count,
:production_companies
])
|> validate_required([:title, :overview, :status, :rating, :popularity])
|> unique_constraint([:title])
end
end
Ecto.Migrator.up(RAG.Repo, 1, RAG.Migrations.CreateMovies)
Add Movies to Database
movies =
stream
|> Enum.take(1000)
movies
|> Enum.map(fn movie ->
attrs = %{
title: movie.title,
status: movie.status,
genres: Enum.map(movie.genres, fn g -> g["name"] end),
overview: movie.overview,
collection: movie.belongs_to_collection["name"],
release_date: movie.release_date,
rating: movie.vote_average,
popularity: movie.popularity,
votes_count: movie.vote_count,
production_companies: Enum.map(movie.production_companies, fn pc -> pc["name"] end)
}
changeset = RAG.Movie.changeset(%RAG.Movie{}, attrs)
RAG.Repo.insert(changeset)
end)
Add Chunk Schema
defmodule RAG.Migrations.CreateMovieChunks do
use Ecto.Migration
def up do
create table(:movie_chunks) do
add :relation, :string, null: false
add :content, :text, null: false
add :embedding, :vector, size: 1024, null: false
add :movie_id, references(:movies, on_delete: :restrict), null: false
timestamps()
end
create index(:movie_chunks, ["embedding vector_cosine_ops"], using: :hnsw, options: "m = 24, ef_construction = 100")
create unique_index(:movie_chunks, [:movie_id, :relation])
end
def down do
drop(table(:movie_chunks))
end
end
defmodule RAG.MovieChunk do
use Ecto.Schema
import Ecto.Changeset
@valid_attrs [
:relation,
:content,
:embedding,
:movie_id
]
schema "movie_chunks" do
field :relation, :string
field :content, :string
field :embedding, Pgvector.Ecto.Vector
belongs_to :movie, RAG.Movie
timestamps()
end
def changeset(movie_chunk, attrs) do
movie_chunk
|> cast(attrs, @valid_attrs)
|> validate_required(@valid_attrs)
|> unique_constraint([:movie_id, :relation])
end
end
Ecto.Migrator.up(RAG.Repo, 2, RAG.Migrations.CreateMovieChunks)
defmodule RAG.Movie.Chunk do
def generate_description(%RAG.Movie{} = movie) do
text =
~s"""
Name: #{movie.title}
Genres: #{Enum.join(movie.genres, ", ")}
Overview: #{movie.overview}
Collection: #{movie.collection}
Production companies: #{Enum.join(movie.production_companies, ", ")}
"""
prompt =
"""
Write an accurate description of the movie in a single paragraph, don't use quote marks or special characters in the response.
#{text}
"""
description = RAG.Models.completion(prompt)
attrs = %{
movie_id: movie.id,
relation: "description",
content: description,
embedding: RAG.Models.embedding(description)
}
changeset = RAG.MovieChunk.changeset(%RAG.MovieChunk{}, attrs)
RAG.Repo.insert(changeset)
end
def generate_location(%RAG.Movie{} = movie) do
with %RAG.MovieChunk{content: description_content} <-
RAG.Repo.get_by(RAG.MovieChunk, movie_id: movie.id, relation: "description"),
%RAG.Models.Location{description: location_description} <- RAG.Models.to_location(description_content) do
attrs = %{
movie_id: movie.id,
relation: "location",
content: location_description,
embedding: RAG.Models.embedding(location_description)
}
changeset = RAG.MovieChunk.changeset(%RAG.MovieChunk{}, attrs)
RAG.Repo.insert(changeset)
else
error ->
error
end
end
end
RAG.Movie
|> RAG.Repo.all()
|> Enum.map(fn movie ->
case RAG.Repo.get_by(RAG.MovieChunk, movie_id: movie.id, relation: "description") do
nil ->
RAG.Movie.Chunk.generate_description(movie)
chunk -> {:ok, chunk}
end
end)
RAG.Movie
|> RAG.Repo.all()
|> Enum.map(fn movie ->
case RAG.Repo.get_by(RAG.MovieChunk, movie_id: movie.id, relation: "location") do
nil ->
RAG.Movie.Chunk.generate_location(movie)
chunk -> {:ok, chunk}
end
end)
Querying the index
import Ecto.Query
import Pgvector.Ecto.Query
best_query = "Which movies are set primarily in the sea or the ocean?"
original_query = "Movies set in the sea or the ocean."
model_query = RAG.Models.to_query(original_query)
IO.inspect(model_query)
query_embedding = RAG.Models.embedding(model_query.improved_query)
query = from(
c in RAG.MovieChunk,
where: c.relation == ^"#{model_query.domain}",
order_by: cosine_distance(c.embedding, ^query_embedding),
limit: 20,
preload: [:movie]
)
results = RAG.Repo.all(query)
rerank_results = RAG.Models.rerank(
model_query.improved_query,
Enum.map(results, fn r -> r.content end)
)
Enum.zip(results, rerank_results)
|> Enum.sort_by(fn {_movie, relevance} ->
relevance["relevance_score"]
end, :desc)
|> Enum.map(fn {chunk, relevance} ->
%{
id: chunk.movie.id,
title: chunk.movie.title,
matching_content: chunk.content,
relevance: relevance["relevance_score"]
}
end)
|> Kino.DataTable.new(keys: [:id, :title, :matching_content, :relevance])
Embracing Non-Determinism
Non-determinism may be seen as a challenge, however if well managed could be a clear advantage. We can use the LLM to generate queries in advance and we can evaluate each output and find out which provides the best quality search.
Kino.Mermaid.new("""
erDiagram
QUERY_GROUP {
int id
string identifier
}
QUERY {
int id
int query_group_id
string content
}
QUERY_EVALUATION {
int id
int query_id
decimal amplitude
map distribution
decimal bias
}
QUERY_GROUP ||--o{ QUERY : "has many"
QUERY ||--o{ QUERY_EVALUATION : "has many"
""")
defmodule RAG.Migrations.CreateQueryGroup do
use Ecto.Migration
def up do
create table(:query_groups) do
add :identifier, :text, null: false
timestamps()
end
create unique_index(:query_groups, [:identifier])
end
end
defmodule RAG.Migrations.CreateQueries do
use Ecto.Migration
def up do
create table(:queries) do
add :expectation, :text, null: false
add :content, :citext, null: false
add :domain, :text, null: false
add :embedding, :vector, size: 1024, null: false
add :query_group_id, references(:query_groups, on_delete: :restrict), null: false
timestamps()
end
create index(:queries, [:domain])
create index(:queries, [:query_group_id])
create unique_index(:queries, [:content, :domain])
create index(:queries, ["embedding vector_cosine_ops"], using: :hnsw, options: "m = 24, ef_construction = 100")
end
def down do
drop(table(:queries))
end
end
defmodule RAG.Migrations.CreateQueryEvaluations do
use Ecto.Migration
def up do
create table(:query_evaluations) do
add :amplitude, :decimal, default: 0.0, null: false
add :distribution, :map, default: %{}, null: false
add :bias, :decimal, default: 0.0, null: false
add :query_id, references(:queries, on_delete: :restrict), null: false
timestamps()
end
create index(:query_evaluations, [:query_id])
end
end
defmodule RAG.QueryGroup do
use Ecto.Schema
import Ecto.Changeset
schema "query_groups" do
field :identifier, :string
timestamps()
end
def changeset(group, attrs) do
group
|> cast(attrs, [:identifier])
|> validate_required([:identifier])
end
end
defmodule RAG.Query do
use Ecto.Schema
import Ecto.Changeset
schema "queries" do
field :expectation, :string
field :content, :string
field :domain, :string
field :embedding, Pgvector.Ecto.Vector
belongs_to :query_group, RAG.Query.Group
timestamps()
end
def changeset(query, attrs) do
query
|> cast(attrs, [:expectation, :content, :domain, :embedding])
|> validate_required([:expectation, :content, :domain, :embedding])
|> unique_constraint([:content, :domain], name: :queries_content_domain_index)
end
end
defmodule RAG.QueryEvaluation do
use Ecto.Schema
import Ecto.Changeset
schema "query_evaluations" do
field :amplitude, :decimal
field :distribution, :map
field :bias, :decimal
belongs_to :query, RAG.Query
timestamps()
end
def changeset(query_eval, attrs) do
query_eval
|> cast(attrs, [:amplitude, :distribution, :query_id, :bias])
|> validate_required([:amplitude, :distribution, :query_id])
end
end
Ecto.Migrator.up(RAG.Repo, 3, RAG.Migrations.CreateQueryGroup)
Ecto.Migrator.up(RAG.Repo, 4, RAG.Migrations.CreateQueries)
Ecto.Migrator.up(RAG.Repo, 5, RAG.Migrations.CreateQueryEvaluations)
Querying Pipeline
Kino.Mermaid.new("""
flowchart LR
id1(Query) --> id2(Match Query in DB via vector search)
subgraph MatchFound[Match Found?]
direction TB
id2 -->|Yes| id3a(Find highest performer in the group and use to do vector search)
id2 -->|No| id3b(Create query group and query variations)
end
id3a --> id5(Return the result)
id3b --> id4b(Kick off a Generate Variation / Eval Loop)
id3b --> id4a(Fallback to direct query)
id4a --> id5
""")
defmodule RAG.Query.Manager do
alias Ecto.Multi
alias RAG.Query
alias RAG.QueryGroup
import Ecto.Query, only: [from: 2]
import Pgvector.Ecto.Query
def create_group(query, embedding, from_model) do
Multi.new()
|> Multi.insert(:group,
RAG.QueryGroup.changeset(%RAG.QueryGroup{}, %{identifier: query})
)
|> Multi.insert(:query, fn %{group: group} ->
%RAG.Query{query_group_id: group.id}
|> RAG.Query.changeset(%{
content: query,
embedding: embedding,
expectation: "#{from_model.reply}",
domain: "#{from_model.domain}"
})
end)
|> RAG.Repo.transaction()
end
def generate_variation(%QueryGroup{} = query_group) do
user_query_from_model = RAG.Models.to_query(query_group.identifier)
new_embedding = RAG.Models.embedding(user_query_from_model.improved_query)
changeset =
%RAG.Query{query_group_id: query_group.id}
|> RAG.Query.changeset(%{
content: user_query_from_model.improved_query,
embedding: new_embedding,
domain: "#{user_query_from_model.domain}",
expectation: "#{user_query_from_model.reply}"
})
RAG.Repo.insert(changeset)
end
def evaluate(%Query{embedding: embedding, domain: domain} = query) do
chunks =
from(
mc in RAG.MovieChunk,
where: mc.relation == ^domain,
order_by: cosine_distance(mc.embedding, ^embedding),
limit: 20
)
|> RAG.Repo.all()
reranking_results =
RAG.Models.rerank(
query.content,
Enum.map(chunks, fn c -> c.content end)
)
joined_results =
Enum.zip(chunks, reranking_results)
|> Enum.sort_by(fn {_movie, relevance} ->
relevance["relevance_score"]
end, :desc)
amplitude =
Enum.sum_by(joined_results, fn {_chunk, relevance} ->
relevance["relevance_score"]
end)
distribution =
Enum.reduce(joined_results, %{}, fn {chunk, relevance}, acc ->
Map.put(acc, chunk.id, relevance["relevance_score"])
end)
changeset =
%RAG.QueryEvaluation{}
|> RAG.QueryEvaluation.changeset(%{
query_id: query.id,
amplitude: amplitude,
distribution: distribution
})
RAG.Repo.insert(changeset)
end
def update_group_identifier(%QueryGroup{} = query_group) do
q = from(
q in RAG.Query,
join: eval in RAG.QueryEvaluation, on: eval.query_id == q.id,
where: q.query_group_id == ^query_group.id,
order_by: [desc: eval.amplitude],
limit: 1
)
best_query = RAG.Repo.one(q)
changeset = RAG.QueryGroup.changeset(query_group, %{identifier: best_query.content})
RAG.Repo.update(changeset)
end
end
input = Kino.Input.textarea("Query")
import Ecto.Query
import Pgvector.Ecto.Query
user_query = Kino.Input.read(input)
user_query_embedding = RAG.Models.embedding(user_query)
user_query_from_model = RAG.Models.to_query(user_query)
query = from(
q in RAG.QueryGroup,
where: q.identifier == ^user_query
)
RAG.Repo.all(query)
|> case do
[] ->
RAG.Query.Manager.create_group(user_query, user_query_embedding, user_query_from_model)
queries ->
queries
end
Recursive Eval Loop
Run generation / eval until a sucess condition or stop condition is achieved.
Kino.Mermaid.new("""
flowchart LR
id1(Generate Variation) --> id2(Evaluate)
id2 --> id3{Higher amplitude?}
id3 -->|Yes| id4(Mark as winner\nand reuse for next variation)
id4 --> id1
id3 -->|No| id1
""")
import Ecto.Query, only: [from: 2]
query_groups =
RAG.Repo.all(RAG.QueryGroup)
Enum.map(query_groups, fn qg ->
RAG.Query.Manager.generate_variation(qg)
end)
Run Evaluation
queries = RAG.Repo.all(RAG.Query)
Enum.map(queries, fn q ->
case RAG.Repo.get_by(RAG.QueryEvaluation, query_id: q.id) do
nil ->
{:ok, query_evaluation} = RAG.Query.Manager.evaluate(q)
query_evaluation
%RAG.QueryEvaluation{} = query_evaluation ->
query_evaluation
end
end)
RAG.Repo.all(RAG.QueryGroup)
|> Enum.map(fn query_group ->
RAG.Query.Manager.update_group_identifier(query_group)
end)
Run Query
user_input = Kino.Input.textarea("User Query")
import Ecto.Query, only: [from: 2]
import Pgvector.Ecto.Query
user_query = Kino.Input.read(user_input)
user_query_embedding = RAG.Models.embedding(user_query)
db_query = from(
q in RAG.Query,
order_by: cosine_distance(q.embedding, ^user_query_embedding),
limit: 5
)
matches = RAG.Repo.all(db_query)
rerank_results = RAG.Models.rerank(
user_query,
Enum.map(matches, fn r -> r.content end)
)
Enum.zip(matches, rerank_results)
|> Enum.reject(fn {_q, r} -> r["relevance_score"] < 0.5 end)
|> Enum.sort_by(fn {_q, r} ->
r["relevance_score"]
end, :desc)
|> List.first()
|> case do
nil ->
[]
{query, _} ->
best_db_query =
from(
q in RAG.Query,
join: eval in RAG.QueryEvaluation, on: eval.query_id == q.id,
where: q.query_group_id == ^query.query_group_id,
order_by: [desc: eval.amplitude],
limit: 1
)
best_query = RAG.Repo.one(best_db_query)
movie_query = from(
c in RAG.MovieChunk,
where: c.relation == ^"#{best_query.domain}",
order_by: cosine_distance(c.embedding, ^best_query.embedding),
limit: 20,
preload: [:movie]
)
results = RAG.Repo.all(movie_query)
rerank_results = RAG.Models.rerank(
best_query.content,
Enum.map(results, fn r -> r.content end)
)
Enum.zip(results, rerank_results)
|> Enum.sort_by(fn {_movie, relevance} ->
relevance["relevance_score"]
end, :desc)
|> Enum.map(fn {chunk, relevance} ->
%{
id: chunk.movie.id,
title: chunk.movie.title,
matching_content: chunk.content,
relevance: relevance["relevance_score"]
}
end)
|> Kino.DataTable.new(keys: [:id, :title, :matching_content, :relevance])
end
Anticipating Questions and finding Gaps
defmodule MovieQuestion do
use Ecto.Schema
use Instructor
@llm_doc """
In general what do people ask about movies.
## Fields:
- questions: 5 questions that people ask about movies.
- queries: 5 queries people would type into a movie database.
"""
@primary_key false
embedded_schema do
field :questions, {:array, :string}
field :queries, {:array, :string}
end
end
instructor_config = [
adapter: Instructor.Adapters.Llamacpp,
api_url: "https://models.arrakis.upmaru.network"
]
{:ok, output} = Instructor.chat_completion([
model: "mistral-small-24b",
mode: :json_schema,
response_model: MovieQuestion,
messages: [
%{
role: "user",
content:
"""
You are a curator of a movie database, prepare yourself by anticipating questions and queries that would be asked by the user who wants to find movies or know more about the movie setting, genre, rating, plot, cast and crew.
"""
}
]
], instructor_config)
user_queries =
Enum.concat(output.questions, output.queries)
|> Enum.map(fn user_query ->
user_query_embedding = RAG.Models.embedding(user_query)
user_query_from_model = RAG.Models.to_query(user_query)
db_query = from(
q in RAG.QueryGroup,
where: q.identifier == ^user_query
)
RAG.Repo.all(db_query)
|> case do
[] ->
RAG.Query.Manager.create_group(user_query, user_query_embedding, user_query_from_model)
queries ->
queries
end
end)
Citation
@article{embrace_non_determinism_with_evals,
title={Embracing Non-Determinism with Evals},
url={https://github.com/zacksiri/notebooks/blob/main/embrace-non-determinism-with-evals.livemd}
publisher={Upmaru}
author={Zack Siri}
year={2025}
}