Powered by AppSignal & Oban Pro

OTP Approach: DynamicSupervisor with Worker Pool

otp_pool.livemd

OTP Approach: DynamicSupervisor with Worker Pool

Setup

Mix.install(
  [
    {:ecto, "~> 3.13.2"},
    {:ecto_sql, "~> 3.13.2"},
    {:postgrex, "~> 0.19.3"},
    {:kino, "~> 0.13.2"},
    {:pgvector, "~> 0.3.1"},
    {:bumblebee, "~> 0.6.3"},
    {:nx, "~> 0.10.0"},
    {:exla, "~> 0.10.0"}
  ],
  config: [
    nx: [
      default_backend: EXLA.Backend,
      default_defn_options: [compiler: EXLA]
    ]
  ]
)
Postgrex.Types.define(
  SentenceTransformer.PostgrexTypes,
  Pgvector.extensions() ++ Ecto.Adapters.Postgres.extensions()
)

defmodule SentenceTransformer.Repo do
  use Ecto.Repo,
    otp_app: :sentence_transformer_app,
    adapter: Ecto.Adapters.Postgres,
    types: SentenceTransformer.PostgrexTypes
end
Kino.start_child({SentenceTransformer.Repo, [
  url: "postgres://postgres:postgres@db/vector_db",
  types: SentenceTransformer.PostgrexTypes
]})
defmodule Migrations.CreateVectorExtension do
  use Ecto.Migration

  def up do
    execute "CREATE EXTENSION IF NOT EXISTS vector"
  end

  def down do
    execute "DROP EXTENSION IF EXISTS vector"
  end
end

Ecto.Migrator.up(SentenceTransformer.Repo, 1, Migrations.CreateVectorExtension)

Solution - Database Schema

defmodule SentenceTransformer.Document do
  use Ecto.Schema

  import Ecto.Changeset

  @title_max_length 255
  @title_min_length 1

  schema "documents" do
    field :title, :string
    field :content, :string

    has_many :chunks, SentenceTransformer.DocumentChunk

    timestamps()
  end

  def changeset(document, attrs) do
    document
    |> cast(attrs, [:title, :content])
    |> validate_required([:title, :content], message: "is required")
    |> validate_length(:title,
        min: @title_min_length,
        max: @title_max_length,
        message: "must be between #{@title_min_length} and #{@title_max_length} characters")
    |> validate_length(:content, min: 1, message: "cannot be empty")
    |> validate_format(:title, ~r/^[^<>]*$/, message: "contains invalid characters")
  end
end

defmodule SentenceTransformer.DocumentChunk do
  use Ecto.Schema

  import Ecto.Changeset

  @content_min_length 1
  @chunk_index_min 0

  schema "document_chunks" do
    field :content, :string
    field :chunk_index, :integer
    field :embedding, Pgvector.Ecto.Vector

    belongs_to :document, SentenceTransformer.Document

    timestamps()
  end

  def changeset(chunk, attrs) do
    chunk
    |> cast(attrs, [:content, :embedding, :chunk_index, :document_id])
    |> validate_required([:content, :document_id], message: "is required")
    |> validate_length(:content, min: @content_min_length, message: "cannot be empty")
    |> validate_number(:chunk_index, greater_than_or_equal_to: @chunk_index_min,
        message: "must be non-negative")
    |> validate_embedding()
  end

  defp validate_embedding(changeset) do
    case get_field(changeset, :embedding) do
      nil -> add_error(changeset, :embedding, "is required")
      embedding when is_list(embedding) ->
        if length(embedding) > 0, do: changeset, else: add_error(changeset, :embedding, "cannot be empty")
      _ -> changeset
    end
  end
end
defmodule Migrations.CreateDocuments do
  use Ecto.Migration

  def change do
    create table(:documents) do
      add :title, :string, null: false
      add :content, :text

      timestamps()
    end
  end
end

Ecto.Migrator.up(SentenceTransformer.Repo, 2, Migrations.CreateDocuments)
defmodule Migrations.CreateDocumentChunks do
  use Ecto.Migration

  def up do
    create table(:document_chunks) do
      add :content, :text, null: false
      add :embedding, :vector, size: 384 # dimensions for all-MiniLM-L6-v2
      add :chunk_index, :integer

      add :document_id, references(:documents, on_delete: :delete_all)

      timestamps()
    end

    create index(:document_chunks, [:document_id])

    # Critical: index for fast similarity search using cosine distance
    execute("""
      CREATE INDEX document_chunks_embedding_ivfflat_idx
      ON document_chunks
      USING ivfflat (embedding vector_cosine_ops)
    """)
  end

  def down do
    drop index(:document_chunks, [:document_id])

    execute("DROP INDEX IF EXISTS document_chunks_embedding_ivfflat_idx")

    drop table(:document_chunks)
  end
end

Ecto.Migrator.up(SentenceTransformer.Repo, 3, Migrations.CreateDocumentChunks)

Solution - Embedding Service (DynamicSupervisor Pool Approach)

defmodule SentenceTransformer.EmbeddingWorker do
  use GenServer

  @model_name "sentence-transformers/all-MiniLM-L6-v2"
  @timeout 60_000

  def start_link(opts \\ []), do: GenServer.start_link(__MODULE__, opts)

  @doc """
  Gets embeddings for a list of texts using this worker process.
  """
  def get_embeddings(worker_pid, texts) when is_list(texts),
    do: GenServer.call(worker_pid, {:get_embeddings, texts}, @timeout)


  def init(_opts) do
    with {:ok, model_info} <- Bumblebee.load_model({:hf, @model_name}),
         {:ok, tokenizer} <- Bumblebee.load_tokenizer({:hf, @model_name}) do
      {:ok, %{serving: Bumblebee.Text.text_embedding(model_info, tokenizer)}}
    else
      error -> {:stop, error}
    end
  end

  def handle_call({:get_embeddings, texts}, _from, %{serving: serving} = state) do
    embeddings = Enum.map(texts, &amp;generate_embedding(&amp;1, serving))

    {:reply, {:ok, embeddings}, state}
  end

  defp generate_embedding(text, serving) do
    serving
    |> Nx.Serving.run(text)
    |> Map.get(:embedding)
    |> Nx.to_flat_list()
  end
end

defmodule SentenceTransformer.EmbeddingPool do
  use DynamicSupervisor

  @pool_size 3

  def init(_opts),
    do: DynamicSupervisor.init(strategy: :one_for_one)

  def start_link(opts \\ []),
    do: DynamicSupervisor.start_link(__MODULE__, opts, name: __MODULE__)

  @doc """
  Gets embeddings for a list of texts using the worker pool.
  """
  def get_embeddings(texts) when is_list(texts),
    do: SentenceTransformer.EmbeddingWorker.get_embeddings(get_available_worker(), texts)

  defp get_available_worker do
    ensure_pool_size(current_workers_count())

    case DynamicSupervisor.which_children(__MODULE__) do
      [] ->
        raise "Failed to start workers"
      children ->
        with {_id, pid, _type, _modules} <- Enum.random(children), do: pid
    end
  end

  defp start_worker do
    case DynamicSupervisor.start_child(__MODULE__, {SentenceTransformer.EmbeddingWorker, []}) do
      {:ok, pid} -> pid
      {:error, {:already_started, pid}} -> pid
    end
  end

  defp ensure_pool_size(current_workers)
  when current_workers < @pool_size do
    Enum.each(1..(@pool_size - current_workers), fn _ ->
      start_worker()
    end)
  end

  defp ensure_pool_size(_current_workers), do: :ok

  defp current_workers_count do
    __MODULE__
    |> DynamicSupervisor.which_children()
    |> length()
  end
end
Kino.start_child({SentenceTransformer.EmbeddingPool, []})

Solution - Document Processing Service

defmodule SentenceTransformer.DocumentProcessor do
  alias SentenceTransformer.{Repo, Document, DocumentChunk, EmbeddingPool}

  @min_chunk_length 50
  @chunk_size 5
  @batch_size 3
  @max_concurrent_batches 2
  @processing_timeout 300_000

  @doc """
  Processes document from start to finish with worker pool:
  1. Save document to DB
  2. Split into chunks
  3. Generate embeddings for all chunks (using worker pool)
  4. Save chunks with embeddings to DB
  """
  def process_document(title, content) do
    with {:ok, document} <- create_document(title, content),
         {:ok, _chunks} <- process_chunks(document, content) do
      log_processing_completed(title, document)
      {:ok, document}
    end
  end

  defp create_document(title, content) do
    %Document{}
    |> Document.changeset(%{title: title, content: content})
    |> Repo.insert()
  end

  defp process_chunks(document, content) do
    chunks = content
    |> split_into_chunks()
    |> then(&amp;log_processing_started(&amp;1))
    |> Enum.chunk_every(@batch_size)
    |> Enum.with_index()
    |> Task.async_stream(
      fn {chunk_batch, batch_index} ->
        process_batch(document, chunk_batch, batch_index)
      end,
      max_concurrency: @max_concurrent_batches,
      timeout: @processing_timeout
    )
    |> Enum.to_list()

    failed_results = Enum.filter(chunks, fn
      {:ok, _} -> false
      {:error, _} -> true
      {:exit, _} -> true
    end)

    if Enum.empty?(failed_results) do
      {:ok, chunks}
    else
      {:error, {:chunk_processing_failed, failed_results}}
    end
  end

  defp process_batch(%{id: document_id}, chunk_batch, batch_index) do
    log_processing_batch(chunk_batch, batch_index)

    with {:ok, embeddings} <- EmbeddingPool.get_embeddings(chunk_batch),
         {:ok, chunks} <- save_chunks_with_embeddings(chunk_batch, embeddings, document_id, batch_index) do
      chunks
    end
  end

  defp save_chunks_with_embeddings(chunk_batch, embeddings, document_id, batch_index) do
    chunk_data =
      chunk_batch
      |> Enum.zip(embeddings)
      |> Enum.with_index(batch_index * @batch_size)
      |> Enum.map(fn {{chunk_text, embedding}, index} ->
        now = NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second)
        %{
          content: chunk_text,
          embedding: embedding,
          chunk_index: index,
          document_id: document_id,
          inserted_at: now,
          updated_at: now
        }
      end)

    case Repo.insert_all(DocumentChunk, chunk_data, returning: [:id, :content, :chunk_index]) do
      {count, chunks} when count > 0 -> {:ok, chunks}
      {0, _} -> {:error, :no_chunks_inserted}
    end
  end

  defp split_into_chunks(content) do
    content
    |> String.split(~r/\.\s+/)
    |> Enum.chunk_every(@chunk_size)
    |> Enum.map(&amp;Enum.join(&amp;1, ". "))
    |> Enum.reject(&amp;(String.length(&amp;1) < @min_chunk_length))
  end

  defp log_processing_started(chunks) do
    IO.puts("Processing #{length(chunks)} chunks using worker pool with batches of #{@batch_size}...")
    chunks
  end

  defp log_processing_batch(chunk_batch, batch_index),
    do: IO.puts("Processing batch #{batch_index + 1} (#{length(chunk_batch)} chunks)...")

  defp log_processing_completed(title, document),
    do: IO.puts("✓ Completed processing document: #{title} (ID: #{document.id})")
end

Solution - Semantic Search Service

defmodule SentenceTransformer.SemanticSearch do
  import Ecto.Query

  alias SentenceTransformer.{Repo, DocumentChunk, EmbeddingPool}

  @default_limit 5

  @doc """
  Finds semantically similar document chunks using worker pool.
  """
  def search(query, limit \\ @default_limit) when is_binary(query) do
    [query]
    |> EmbeddingPool.get_embeddings()
    |> case do
      {:ok, [embedding]} -> {:ok, search_by_embedding(embedding, limit)}
      {:ok, []} -> {:error, :no_embedding_generated}
      error -> error
    end
  end

  defp search_by_embedding(embedding, limit) do
    embedding
    |> build_search_query(limit)
    |> Repo.all()
  end

  # Converts distance to similarity
  #
  # Cosine Distance (<=> operator) returns values between 0 and 2:
  #   0 = identical vectors (same direction)
  #   2 = opposite vectors (completely different directions)
  #   1 = orthogonal vectors (no relationship)
  # Cosine Similarity returns values between -1 and 1:
  #   1 = identical vectors (same direction)
  #   -1 = opposite vectors (completely different directions)
  #   0 = orthogonal vectors (no relationship)
  defp build_search_query(embedding, limit) do
    from(c in DocumentChunk,
      join: d in assoc(c, :document),
      select: %{
        content: c.content,
        chunk_index: c.chunk_index,
        document_id: c.document_id,
        similarity: fragment("1.0 - (? <=> ?)", c.embedding, ^embedding),
        document_title: d.title
      },
      order_by: [asc: fragment("? <=> ?", c.embedding, ^embedding)],
      limit: ^limit
    )
  end
end

Demo and Testing

defmodule SentenceTransformer.FileLoader do
  @base_path "/data/data/markdown"
  @markdown_extension ".md"

  @doc """
  Loads content from markdown files in the data/markdown directory
  """
  def load_markdown_file(filename) do
    filename
    |> build_file_path()
    |> File.read()
  end

  @doc """
  Lists all markdown files in the base directory
  """
  def list_markdown_files do
    @base_path
    |> File.ls()
    |> case do
      {:ok, files} -> {:ok, filter_markdown_files(files)}
      {:error, reason} -> {:error, "Failed to list markdown files: #{reason}"}
    end
  end

  defp build_file_path(filename),
    do: Path.join(@base_path, filename)

  defp filter_markdown_files(files),
    do: Enum.filter(files, &amp;String.ends_with?(&amp;1, @markdown_extension))
end

defmodule SentenceTransformer.Demo do
  @doc """
  Loads and processes all available markdown documents
  """
  def load_documents do
    IO.puts("Loading actual 10-K documents...")

    SentenceTransformer.FileLoader.list_markdown_files()
    |> case do
      {:ok, files} ->
        files
        |> build_document_specs()
        |> process_documents()
      {:error, reason} ->
        log_processing_failed("Failed to list markdown files: #{reason}")
    end
  end

  defp build_document_specs(files),
    do: Enum.map(files, &amp;build_document_spec/1)

  defp build_document_spec(filename),
    do: {filename, generate_title(filename)}

  defp generate_title(filename) do
    case filename do
      "goog-10-k.md" -> "Alphabet Inc. (Google) 10-K - 2024"
      "meta-10-k.md" -> "Meta Platforms Inc. 10-K - 2022"
      _ ->
        filename
        |> String.replace_suffix(".md", "")
        |> String.replace("-", " ")
        |> String.upcase()
    end
  end

  defp process_documents(documents),
    do: Enum.each(documents, &amp;process_document/1)

  defp process_document({filename, title}) do
    filename
    |> SentenceTransformer.FileLoader.load_markdown_file()
    |> case do
      {:ok, content} ->
        log_processing_started(title)
        process_document_content(title, content)
      {:error, reason} ->
        log_processing_failed("Failed to load #{title}: #{reason}")
    end
  end

  defp process_document_content(title, content) do
    case SentenceTransformer.DocumentProcessor.process_document(title, content) do
      {:ok, doc} -> log_processing_completed("Processed #{title}: #{doc.title}")
      {:error, reason} -> log_processing_failed("Failed to process #{title}: #{reason}")
    end
  end

  defp log_processing_started(title), do: IO.puts("Processing #{title}...")
  defp log_processing_completed(message), do: IO.puts("✓ #{message}")
  defp log_processing_failed(message), do: IO.puts("✗ #{message}")
end

SentenceTransformer.Demo.load_documents()
defmodule SentenceTransformer.SearchDemo do
  @document_preview_length 100
  @test_queries [
    "What are the main risk factors mentioned?",
    "How does the company generate revenue?",
    "What are the competitive threats?",
    "What regulatory challenges does the company face?",
    "How is artificial intelligence being used?",
    "What are the cybersecurity risks?",
    "How does the company handle data privacy?",
    "What are the financial performance metrics?",
    "What strategic acquisitions or partnerships are mentioned?",
    "How does the company address environmental sustainability?"
  ]

  @doc """
  Demonstrates semantic search functionality with predefined queries
  """
  def run_search_demo do
    IO.puts("\n=== SEMANTIC SEARCH DEMONSTRATIONS ===\n")

    Enum.each(@test_queries, &amp;demonstrate_query/1)
  end

  defp demonstrate_query(query) do
    IO.puts("Query: \"#{query}\"")

    query
    |> SentenceTransformer.SemanticSearch.search(3)
    |> case do
      {:ok, results} -> display_results(results)
      {:error, reason} -> IO.puts("Search failed: #{reason}")
    end

    IO.puts("")
  end

  defp display_results(results) do
    results
    |> Enum.with_index(1)
    |> Enum.each(&amp;display_result/1)
  end

  defp display_result({result, index}) do
    similarity_percent = format_similarity(result.similarity)
    content_preview = format_content_preview(result.content)
    IO.puts("#{index}. #{similarity_percent}% - #{content_preview}...")
  end

  defp format_similarity(similarity) do
    similarity
    |> Kernel.*(100)
    |> Float.round(1)
  end

  defp format_content_preview(content) do
    content
    |> String.slice(0, @document_preview_length)
    |> String.replace("\n", " ")
  end
end

SentenceTransformer.SearchDemo.run_search_demo()
defmodule SentenceTransformer.ConceptDemo do
  @document_preview_length 100
  @concept_groups [
    {"Risk/Regulatory Concepts:", ["regulatory", "compliance", "legal", "litigation", "government"]},
    {"Financial Performance:", ["revenue", "profit", "earnings", "cash flow", "margins"]},
    {"Technology & Innovation:", ["artificial intelligence", "machine learning", "algorithms", "platform", "software"]},
    {"Competition & Market:", ["competitors", "market share", "competitive", "pricing", "customers"]},
    {"Data & Privacy:", ["data privacy", "user data", "personal information", "GDPR", "privacy policy"]}
  ]

  @doc """
  Demonstrates semantic understanding with concept groups
  """
  def run_concept_demo do
    IO.puts("SEMANTIC UNDERSTANDING DEMO")
    IO.puts("Similar concepts return similar results:")

    Enum.each(@concept_groups, &amp;demonstrate_concept_group/1)
  end

  defp demonstrate_concept_group({title, terms}) do
    IO.puts("\n#{title}")
    Enum.each(terms, &amp;demonstrate_concept/1)
  end

  defp demonstrate_concept(query) do
    query
    |> SentenceTransformer.SemanticSearch.search(1)
    |> case do
      {:ok, [result | _]} -> display_concept_result(query, result)
      {:ok, []} -> display_no_results(query)
      {:error, reason} -> display_search_error(query, reason)
    end
  end

  defp display_concept_result(query, %{similarity: similarity, content: content}),
    do: IO.puts("\"#{query}\" -> #{format_similarity(similarity)}% - #{format_content_preview(content)}...")

  defp display_no_results(query),
    do: IO.puts("\"#{query}\" -> No results found")

  defp display_search_error(query, reason),
    do: IO.puts("\"#{query}\" -> Search error: #{reason}")

  defp format_similarity(similarity) do
    similarity
    |> Kernel.*(100)
    |> Float.round(1)
  end

  defp format_content_preview(content, length \\ @document_preview_length) do
    content
    |> String.slice(0, length)
    |> String.replace("\n", " ")
  end
end

SentenceTransformer.ConceptDemo.run_concept_demo()
defmodule SentenceTransformer.SystemStats do
  import Ecto.Query

  @doc """
  Displays system statistics including document and chunk counts
  """
  def display_stats do
    IO.puts("\nSYSTEM STATISTICS")

    {document_count, chunk_count} = get_counts()
    display_basic_stats(document_count, chunk_count)

    if chunk_count > 0, do: display_vector_stats()
  end

  defp get_counts do
    document_count = SentenceTransformer.Repo.aggregate(SentenceTransformer.Document, :count)
    chunk_count = SentenceTransformer.Repo.aggregate(SentenceTransformer.DocumentChunk, :count)
    {document_count, chunk_count}
  end

  defp display_basic_stats(document_count, chunk_count),
    do: IO.puts("Documents: #{document_count} | Chunks: #{chunk_count}")

  defp display_vector_stats do
    from(c in SentenceTransformer.DocumentChunk, limit: 1)
    |> SentenceTransformer.Repo.one()
    |> get_embedding_info()
    |> display_embedding_info()
  end

  defp get_embedding_info(%{embedding: embedding}) do
    embedding
    |> Pgvector.to_list()
    |> then(&amp;{length(&amp;1), calculate_size_kb(&amp;1)})
  end

  defp calculate_size_kb(embedding_list) do
    embedding_list
    |> length()
    |> Kernel.*(4)
    |> Kernel./(1024)
    |> Float.round(1)
  end

  defp display_embedding_info({vector_size, size_kb}),
    do: IO.puts("Vector dimensions: #{vector_size} (#{size_kb} KB per vector)")
end

SentenceTransformer.SystemStats.display_stats()