OTP Approach: DynamicSupervisor with Worker Pool
Setup
Mix.install(
[
{:ecto, "~> 3.13.2"},
{:ecto_sql, "~> 3.13.2"},
{:postgrex, "~> 0.19.3"},
{:kino, "~> 0.13.2"},
{:pgvector, "~> 0.3.1"},
{:bumblebee, "~> 0.6.3"},
{:nx, "~> 0.10.0"},
{:exla, "~> 0.10.0"}
],
config: [
nx: [
default_backend: EXLA.Backend,
default_defn_options: [compiler: EXLA]
]
]
)
Postgrex.Types.define(
SentenceTransformer.PostgrexTypes,
Pgvector.extensions() ++ Ecto.Adapters.Postgres.extensions()
)
defmodule SentenceTransformer.Repo do
use Ecto.Repo,
otp_app: :sentence_transformer_app,
adapter: Ecto.Adapters.Postgres,
types: SentenceTransformer.PostgrexTypes
end
Kino.start_child({SentenceTransformer.Repo, [
url: "postgres://postgres:postgres@db/vector_db",
types: SentenceTransformer.PostgrexTypes
]})
defmodule Migrations.CreateVectorExtension do
use Ecto.Migration
def up do
execute "CREATE EXTENSION IF NOT EXISTS vector"
end
def down do
execute "DROP EXTENSION IF EXISTS vector"
end
end
Ecto.Migrator.up(SentenceTransformer.Repo, 1, Migrations.CreateVectorExtension)
Solution - Database Schema
defmodule SentenceTransformer.Document do
use Ecto.Schema
import Ecto.Changeset
@title_max_length 255
@title_min_length 1
schema "documents" do
field :title, :string
field :content, :string
has_many :chunks, SentenceTransformer.DocumentChunk
timestamps()
end
def changeset(document, attrs) do
document
|> cast(attrs, [:title, :content])
|> validate_required([:title, :content], message: "is required")
|> validate_length(:title,
min: @title_min_length,
max: @title_max_length,
message: "must be between #{@title_min_length} and #{@title_max_length} characters")
|> validate_length(:content, min: 1, message: "cannot be empty")
|> validate_format(:title, ~r/^[^<>]*$/, message: "contains invalid characters")
end
end
defmodule SentenceTransformer.DocumentChunk do
use Ecto.Schema
import Ecto.Changeset
@content_min_length 1
@chunk_index_min 0
schema "document_chunks" do
field :content, :string
field :chunk_index, :integer
field :embedding, Pgvector.Ecto.Vector
belongs_to :document, SentenceTransformer.Document
timestamps()
end
def changeset(chunk, attrs) do
chunk
|> cast(attrs, [:content, :embedding, :chunk_index, :document_id])
|> validate_required([:content, :document_id], message: "is required")
|> validate_length(:content, min: @content_min_length, message: "cannot be empty")
|> validate_number(:chunk_index, greater_than_or_equal_to: @chunk_index_min,
message: "must be non-negative")
|> validate_embedding()
end
defp validate_embedding(changeset) do
case get_field(changeset, :embedding) do
nil -> add_error(changeset, :embedding, "is required")
embedding when is_list(embedding) ->
if length(embedding) > 0, do: changeset, else: add_error(changeset, :embedding, "cannot be empty")
_ -> changeset
end
end
end
defmodule Migrations.CreateDocuments do
use Ecto.Migration
def change do
create table(:documents) do
add :title, :string, null: false
add :content, :text
timestamps()
end
end
end
Ecto.Migrator.up(SentenceTransformer.Repo, 2, Migrations.CreateDocuments)
defmodule Migrations.CreateDocumentChunks do
use Ecto.Migration
def up do
create table(:document_chunks) do
add :content, :text, null: false
add :embedding, :vector, size: 384 # dimensions for all-MiniLM-L6-v2
add :chunk_index, :integer
add :document_id, references(:documents, on_delete: :delete_all)
timestamps()
end
create index(:document_chunks, [:document_id])
# Critical: index for fast similarity search using cosine distance
execute("""
CREATE INDEX document_chunks_embedding_ivfflat_idx
ON document_chunks
USING ivfflat (embedding vector_cosine_ops)
""")
end
def down do
drop index(:document_chunks, [:document_id])
execute("DROP INDEX IF EXISTS document_chunks_embedding_ivfflat_idx")
drop table(:document_chunks)
end
end
Ecto.Migrator.up(SentenceTransformer.Repo, 3, Migrations.CreateDocumentChunks)
Solution - Embedding Service (DynamicSupervisor Pool Approach)
defmodule SentenceTransformer.EmbeddingWorker do
use GenServer
@model_name "sentence-transformers/all-MiniLM-L6-v2"
@timeout 60_000
def start_link(opts \\ []), do: GenServer.start_link(__MODULE__, opts)
@doc """
Gets embeddings for a list of texts using this worker process.
"""
def get_embeddings(worker_pid, texts) when is_list(texts),
do: GenServer.call(worker_pid, {:get_embeddings, texts}, @timeout)
def init(_opts) do
with {:ok, model_info} <- Bumblebee.load_model({:hf, @model_name}),
{:ok, tokenizer} <- Bumblebee.load_tokenizer({:hf, @model_name}) do
{:ok, %{serving: Bumblebee.Text.text_embedding(model_info, tokenizer)}}
else
error -> {:stop, error}
end
end
def handle_call({:get_embeddings, texts}, _from, %{serving: serving} = state) do
embeddings = Enum.map(texts, &generate_embedding(&1, serving))
{:reply, {:ok, embeddings}, state}
end
defp generate_embedding(text, serving) do
serving
|> Nx.Serving.run(text)
|> Map.get(:embedding)
|> Nx.to_flat_list()
end
end
defmodule SentenceTransformer.EmbeddingPool do
use DynamicSupervisor
@pool_size 3
def init(_opts),
do: DynamicSupervisor.init(strategy: :one_for_one)
def start_link(opts \\ []),
do: DynamicSupervisor.start_link(__MODULE__, opts, name: __MODULE__)
@doc """
Gets embeddings for a list of texts using the worker pool.
"""
def get_embeddings(texts) when is_list(texts),
do: SentenceTransformer.EmbeddingWorker.get_embeddings(get_available_worker(), texts)
defp get_available_worker do
ensure_pool_size(current_workers_count())
case DynamicSupervisor.which_children(__MODULE__) do
[] ->
raise "Failed to start workers"
children ->
with {_id, pid, _type, _modules} <- Enum.random(children), do: pid
end
end
defp start_worker do
case DynamicSupervisor.start_child(__MODULE__, {SentenceTransformer.EmbeddingWorker, []}) do
{:ok, pid} -> pid
{:error, {:already_started, pid}} -> pid
end
end
defp ensure_pool_size(current_workers)
when current_workers < @pool_size do
Enum.each(1..(@pool_size - current_workers), fn _ ->
start_worker()
end)
end
defp ensure_pool_size(_current_workers), do: :ok
defp current_workers_count do
__MODULE__
|> DynamicSupervisor.which_children()
|> length()
end
end
Kino.start_child({SentenceTransformer.EmbeddingPool, []})
Solution - Document Processing Service
defmodule SentenceTransformer.DocumentProcessor do
alias SentenceTransformer.{Repo, Document, DocumentChunk, EmbeddingPool}
@min_chunk_length 50
@chunk_size 5
@batch_size 3
@max_concurrent_batches 2
@processing_timeout 300_000
@doc """
Processes document from start to finish with worker pool:
1. Save document to DB
2. Split into chunks
3. Generate embeddings for all chunks (using worker pool)
4. Save chunks with embeddings to DB
"""
def process_document(title, content) do
with {:ok, document} <- create_document(title, content),
{:ok, _chunks} <- process_chunks(document, content) do
log_processing_completed(title, document)
{:ok, document}
end
end
defp create_document(title, content) do
%Document{}
|> Document.changeset(%{title: title, content: content})
|> Repo.insert()
end
defp process_chunks(document, content) do
chunks = content
|> split_into_chunks()
|> then(&log_processing_started(&1))
|> Enum.chunk_every(@batch_size)
|> Enum.with_index()
|> Task.async_stream(
fn {chunk_batch, batch_index} ->
process_batch(document, chunk_batch, batch_index)
end,
max_concurrency: @max_concurrent_batches,
timeout: @processing_timeout
)
|> Enum.to_list()
failed_results = Enum.filter(chunks, fn
{:ok, _} -> false
{:error, _} -> true
{:exit, _} -> true
end)
if Enum.empty?(failed_results) do
{:ok, chunks}
else
{:error, {:chunk_processing_failed, failed_results}}
end
end
defp process_batch(%{id: document_id}, chunk_batch, batch_index) do
log_processing_batch(chunk_batch, batch_index)
with {:ok, embeddings} <- EmbeddingPool.get_embeddings(chunk_batch),
{:ok, chunks} <- save_chunks_with_embeddings(chunk_batch, embeddings, document_id, batch_index) do
chunks
end
end
defp save_chunks_with_embeddings(chunk_batch, embeddings, document_id, batch_index) do
chunk_data =
chunk_batch
|> Enum.zip(embeddings)
|> Enum.with_index(batch_index * @batch_size)
|> Enum.map(fn {{chunk_text, embedding}, index} ->
now = NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second)
%{
content: chunk_text,
embedding: embedding,
chunk_index: index,
document_id: document_id,
inserted_at: now,
updated_at: now
}
end)
case Repo.insert_all(DocumentChunk, chunk_data, returning: [:id, :content, :chunk_index]) do
{count, chunks} when count > 0 -> {:ok, chunks}
{0, _} -> {:error, :no_chunks_inserted}
end
end
defp split_into_chunks(content) do
content
|> String.split(~r/\.\s+/)
|> Enum.chunk_every(@chunk_size)
|> Enum.map(&Enum.join(&1, ". "))
|> Enum.reject(&(String.length(&1) < @min_chunk_length))
end
defp log_processing_started(chunks) do
IO.puts("Processing #{length(chunks)} chunks using worker pool with batches of #{@batch_size}...")
chunks
end
defp log_processing_batch(chunk_batch, batch_index),
do: IO.puts("Processing batch #{batch_index + 1} (#{length(chunk_batch)} chunks)...")
defp log_processing_completed(title, document),
do: IO.puts("✓ Completed processing document: #{title} (ID: #{document.id})")
end
Solution - Semantic Search Service
defmodule SentenceTransformer.SemanticSearch do
import Ecto.Query
alias SentenceTransformer.{Repo, DocumentChunk, EmbeddingPool}
@default_limit 5
@doc """
Finds semantically similar document chunks using worker pool.
"""
def search(query, limit \\ @default_limit) when is_binary(query) do
[query]
|> EmbeddingPool.get_embeddings()
|> case do
{:ok, [embedding]} -> {:ok, search_by_embedding(embedding, limit)}
{:ok, []} -> {:error, :no_embedding_generated}
error -> error
end
end
defp search_by_embedding(embedding, limit) do
embedding
|> build_search_query(limit)
|> Repo.all()
end
# Converts distance to similarity
#
# Cosine Distance (<=> operator) returns values between 0 and 2:
# 0 = identical vectors (same direction)
# 2 = opposite vectors (completely different directions)
# 1 = orthogonal vectors (no relationship)
# Cosine Similarity returns values between -1 and 1:
# 1 = identical vectors (same direction)
# -1 = opposite vectors (completely different directions)
# 0 = orthogonal vectors (no relationship)
defp build_search_query(embedding, limit) do
from(c in DocumentChunk,
join: d in assoc(c, :document),
select: %{
content: c.content,
chunk_index: c.chunk_index,
document_id: c.document_id,
similarity: fragment("1.0 - (? <=> ?)", c.embedding, ^embedding),
document_title: d.title
},
order_by: [asc: fragment("? <=> ?", c.embedding, ^embedding)],
limit: ^limit
)
end
end
Demo and Testing
defmodule SentenceTransformer.FileLoader do
@base_path "/data/data/markdown"
@markdown_extension ".md"
@doc """
Loads content from markdown files in the data/markdown directory
"""
def load_markdown_file(filename) do
filename
|> build_file_path()
|> File.read()
end
@doc """
Lists all markdown files in the base directory
"""
def list_markdown_files do
@base_path
|> File.ls()
|> case do
{:ok, files} -> {:ok, filter_markdown_files(files)}
{:error, reason} -> {:error, "Failed to list markdown files: #{reason}"}
end
end
defp build_file_path(filename),
do: Path.join(@base_path, filename)
defp filter_markdown_files(files),
do: Enum.filter(files, &String.ends_with?(&1, @markdown_extension))
end
defmodule SentenceTransformer.Demo do
@doc """
Loads and processes all available markdown documents
"""
def load_documents do
IO.puts("Loading actual 10-K documents...")
SentenceTransformer.FileLoader.list_markdown_files()
|> case do
{:ok, files} ->
files
|> build_document_specs()
|> process_documents()
{:error, reason} ->
log_processing_failed("Failed to list markdown files: #{reason}")
end
end
defp build_document_specs(files),
do: Enum.map(files, &build_document_spec/1)
defp build_document_spec(filename),
do: {filename, generate_title(filename)}
defp generate_title(filename) do
case filename do
"goog-10-k.md" -> "Alphabet Inc. (Google) 10-K - 2024"
"meta-10-k.md" -> "Meta Platforms Inc. 10-K - 2022"
_ ->
filename
|> String.replace_suffix(".md", "")
|> String.replace("-", " ")
|> String.upcase()
end
end
defp process_documents(documents),
do: Enum.each(documents, &process_document/1)
defp process_document({filename, title}) do
filename
|> SentenceTransformer.FileLoader.load_markdown_file()
|> case do
{:ok, content} ->
log_processing_started(title)
process_document_content(title, content)
{:error, reason} ->
log_processing_failed("Failed to load #{title}: #{reason}")
end
end
defp process_document_content(title, content) do
case SentenceTransformer.DocumentProcessor.process_document(title, content) do
{:ok, doc} -> log_processing_completed("Processed #{title}: #{doc.title}")
{:error, reason} -> log_processing_failed("Failed to process #{title}: #{reason}")
end
end
defp log_processing_started(title), do: IO.puts("Processing #{title}...")
defp log_processing_completed(message), do: IO.puts("✓ #{message}")
defp log_processing_failed(message), do: IO.puts("✗ #{message}")
end
SentenceTransformer.Demo.load_documents()
defmodule SentenceTransformer.SearchDemo do
@document_preview_length 100
@test_queries [
"What are the main risk factors mentioned?",
"How does the company generate revenue?",
"What are the competitive threats?",
"What regulatory challenges does the company face?",
"How is artificial intelligence being used?",
"What are the cybersecurity risks?",
"How does the company handle data privacy?",
"What are the financial performance metrics?",
"What strategic acquisitions or partnerships are mentioned?",
"How does the company address environmental sustainability?"
]
@doc """
Demonstrates semantic search functionality with predefined queries
"""
def run_search_demo do
IO.puts("\n=== SEMANTIC SEARCH DEMONSTRATIONS ===\n")
Enum.each(@test_queries, &demonstrate_query/1)
end
defp demonstrate_query(query) do
IO.puts("Query: \"#{query}\"")
query
|> SentenceTransformer.SemanticSearch.search(3)
|> case do
{:ok, results} -> display_results(results)
{:error, reason} -> IO.puts("Search failed: #{reason}")
end
IO.puts("")
end
defp display_results(results) do
results
|> Enum.with_index(1)
|> Enum.each(&display_result/1)
end
defp display_result({result, index}) do
similarity_percent = format_similarity(result.similarity)
content_preview = format_content_preview(result.content)
IO.puts("#{index}. #{similarity_percent}% - #{content_preview}...")
end
defp format_similarity(similarity) do
similarity
|> Kernel.*(100)
|> Float.round(1)
end
defp format_content_preview(content) do
content
|> String.slice(0, @document_preview_length)
|> String.replace("\n", " ")
end
end
SentenceTransformer.SearchDemo.run_search_demo()
defmodule SentenceTransformer.ConceptDemo do
@document_preview_length 100
@concept_groups [
{"Risk/Regulatory Concepts:", ["regulatory", "compliance", "legal", "litigation", "government"]},
{"Financial Performance:", ["revenue", "profit", "earnings", "cash flow", "margins"]},
{"Technology & Innovation:", ["artificial intelligence", "machine learning", "algorithms", "platform", "software"]},
{"Competition & Market:", ["competitors", "market share", "competitive", "pricing", "customers"]},
{"Data & Privacy:", ["data privacy", "user data", "personal information", "GDPR", "privacy policy"]}
]
@doc """
Demonstrates semantic understanding with concept groups
"""
def run_concept_demo do
IO.puts("SEMANTIC UNDERSTANDING DEMO")
IO.puts("Similar concepts return similar results:")
Enum.each(@concept_groups, &demonstrate_concept_group/1)
end
defp demonstrate_concept_group({title, terms}) do
IO.puts("\n#{title}")
Enum.each(terms, &demonstrate_concept/1)
end
defp demonstrate_concept(query) do
query
|> SentenceTransformer.SemanticSearch.search(1)
|> case do
{:ok, [result | _]} -> display_concept_result(query, result)
{:ok, []} -> display_no_results(query)
{:error, reason} -> display_search_error(query, reason)
end
end
defp display_concept_result(query, %{similarity: similarity, content: content}),
do: IO.puts("\"#{query}\" -> #{format_similarity(similarity)}% - #{format_content_preview(content)}...")
defp display_no_results(query),
do: IO.puts("\"#{query}\" -> No results found")
defp display_search_error(query, reason),
do: IO.puts("\"#{query}\" -> Search error: #{reason}")
defp format_similarity(similarity) do
similarity
|> Kernel.*(100)
|> Float.round(1)
end
defp format_content_preview(content, length \\ @document_preview_length) do
content
|> String.slice(0, length)
|> String.replace("\n", " ")
end
end
SentenceTransformer.ConceptDemo.run_concept_demo()
defmodule SentenceTransformer.SystemStats do
import Ecto.Query
@doc """
Displays system statistics including document and chunk counts
"""
def display_stats do
IO.puts("\nSYSTEM STATISTICS")
{document_count, chunk_count} = get_counts()
display_basic_stats(document_count, chunk_count)
if chunk_count > 0, do: display_vector_stats()
end
defp get_counts do
document_count = SentenceTransformer.Repo.aggregate(SentenceTransformer.Document, :count)
chunk_count = SentenceTransformer.Repo.aggregate(SentenceTransformer.DocumentChunk, :count)
{document_count, chunk_count}
end
defp display_basic_stats(document_count, chunk_count),
do: IO.puts("Documents: #{document_count} | Chunks: #{chunk_count}")
defp display_vector_stats do
from(c in SentenceTransformer.DocumentChunk, limit: 1)
|> SentenceTransformer.Repo.one()
|> get_embedding_info()
|> display_embedding_info()
end
defp get_embedding_info(%{embedding: embedding}) do
embedding
|> Pgvector.to_list()
|> then(&{length(&1), calculate_size_kb(&1)})
end
defp calculate_size_kb(embedding_list) do
embedding_list
|> length()
|> Kernel.*(4)
|> Kernel./(1024)
|> Float.round(1)
end
defp display_embedding_info({vector_size, size_kb}),
do: IO.puts("Vector dimensions: #{vector_size} (#{size_kb} KB per vector)")
end
SentenceTransformer.SystemStats.display_stats()