Powered by AppSignal & Oban Pro

OTP Approach: Single GenServer with Dynamic Batching

otp_batched.livemd

OTP Approach: Single GenServer with Dynamic Batching

Setup

Mix.install(
  [
    {:ecto, "~> 3.13.2"},
    {:ecto_sql, "~> 3.13.2"},
    {:postgrex, "~> 0.19.3"},
    {:kino, "~> 0.13.2"},
    {:pgvector, "~> 0.3.1"},
    {:bumblebee, "~> 0.6.3"},
    {:nx, "~> 0.10.0"},
    {:exla, "~> 0.10.0"}
  ],
  config: [
    nx: [
      default_backend: EXLA.Backend,
      default_defn_options: [compiler: EXLA]
    ]
  ]
)
Postgrex.Types.define(
  SentenceTransformer.PostgrexTypes,
  Pgvector.extensions() ++ Ecto.Adapters.Postgres.extensions()
)

defmodule SentenceTransformer.Repo do
  use Ecto.Repo,
    otp_app: :sentence_transformer_app,
    adapter: Ecto.Adapters.Postgres,
    types: SentenceTransformer.PostgrexTypes

  @pool_size 10
  @queue_target 5_000
  @queue_interval 1_000

  def init(_type, config) do
    config = Keyword.put(config, :pool_size, @pool_size)
    config = Keyword.put(config, :queue_target, @queue_target)
    config = Keyword.put(config, :queue_interval, @queue_interval)
    {:ok, config}
  end
end
Kino.start_child({SentenceTransformer.Repo, [
  url: "postgres://postgres:postgres@db/vector_db",
  types: SentenceTransformer.PostgrexTypes
]})
defmodule Migrations.CreateVectorExtension do
  use Ecto.Migration

  def up do
    execute "CREATE EXTENSION IF NOT EXISTS vector"
  end

  def down do
    execute "DROP EXTENSION IF EXISTS vector"
  end
end

Ecto.Migrator.up(SentenceTransformer.Repo, 1, Migrations.CreateVectorExtension)

Solution - Database Schema

defmodule SentenceTransformer.Document do
  use Ecto.Schema

  import Ecto.Changeset

  @title_max_length 255
  @title_min_length 1

  schema "documents" do
    field :title, :string
    field :content, :string

    has_many :chunks, SentenceTransformer.DocumentChunk

    timestamps()
  end

  def changeset(document, attrs) do
    document
    |> cast(attrs, [:title, :content])
    |> validate_required([:title, :content], message: "is required")
    |> validate_length(:title,
        min: @title_min_length,
        max: @title_max_length,
        message: "must be between #{@title_min_length} and #{@title_max_length} characters")
    |> validate_length(:content, min: 1, message: "cannot be empty")
    |> validate_format(:title, ~r/^[^<>]*$/, message: "contains invalid characters")
  end
end

defmodule SentenceTransformer.DocumentChunk do
  use Ecto.Schema

  import Ecto.Changeset

  @content_min_length 1
  @chunk_index_min 0

  schema "document_chunks" do
    field :content, :string
    field :chunk_index, :integer
    field :embedding, Pgvector.Ecto.Vector

    belongs_to :document, SentenceTransformer.Document

    timestamps()
  end

  def changeset(chunk, attrs) do
    chunk
    |> cast(attrs, [:content, :embedding, :chunk_index, :document_id])
    |> validate_required([:content, :document_id], message: "is required")
    |> validate_length(:content, min: @content_min_length, message: "cannot be empty")
    |> validate_number(:chunk_index, greater_than_or_equal_to: @chunk_index_min,
        message: "must be non-negative")
    |> validate_embedding()
  end

  defp validate_embedding(changeset) do
    case get_field(changeset, :embedding) do
      nil -> add_error(changeset, :embedding, "is required")
      embedding when is_list(embedding) ->
        if length(embedding) > 0, do: changeset, else: add_error(changeset, :embedding, "cannot be empty")
      _ -> changeset
    end
  end
end
defmodule Migrations.CreateDocuments do
  use Ecto.Migration

  def change do
    create table(:documents) do
      add :title, :string, null: false
      add :content, :text

      timestamps()
    end
  end
end

Ecto.Migrator.up(SentenceTransformer.Repo, 2, Migrations.CreateDocuments)
defmodule Migrations.CreateDocumentChunks do
  use Ecto.Migration

  def up do
    create table(:document_chunks) do
      add :content, :text, null: false
      add :embedding, :vector, size: 384 # dimensions for all-MiniLM-L6-v2
      add :chunk_index, :integer

      add :document_id, references(:documents, on_delete: :delete_all)

      timestamps()
    end

    create index(:document_chunks, [:document_id])

    # Critical: index for fast similarity search using cosine distance
    execute("""
      CREATE INDEX document_chunks_embedding_ivfflat_idx
      ON document_chunks
      USING ivfflat (embedding vector_cosine_ops)
    """)
  end

  def down do
    drop index(:document_chunks, [:document_id])

    execute("DROP INDEX IF EXISTS document_chunks_embedding_ivfflat_idx")

    drop table(:document_chunks)
  end
end

Ecto.Migrator.up(SentenceTransformer.Repo, 3, Migrations.CreateDocumentChunks)

Solution - Embedding Service (Dynamic Batching Approach)

defmodule SentenceTransformer.BatchingEmbeddingService do
  use GenServer

  @model_name "sentence-transformers/all-MiniLM-L6-v2"
  @timeout 60_000
  @batch_size_limit 32
  @batch_timeout_ms 50

  def start_link(opts \\ []),
    do: GenServer.start_link(__MODULE__, opts, name: __MODULE__)

  @doc """
  Gets embeddings for a list of texts using dynamic batching.
  """
  def get_embeddings(texts) when is_list(texts) do
    ensure_started()

    embeddings =
      Enum.map(texts, fn text ->
        {:ok, embedding} = GenServer.call(__MODULE__, {:get_embedding, text}, @timeout)
        embedding
      end)

    {:ok, embeddings}
  end

  def get_embeddings(text), do: get_embeddings([text])

  defp ensure_started do
    case Process.whereis(__MODULE__) do
      nil ->
        {:ok, _} = start_link([])
        :ok
      _pid ->
        :ok
    end
  end

  def init(_opts) do
    with {:ok, model_info} <- Bumblebee.load_model({:hf, @model_name}),
         {:ok, tokenizer} <- Bumblebee.load_tokenizer({:hf, @model_name}) do
      serving = Bumblebee.Text.text_embedding(model_info, tokenizer)

      state = %{
        serving: serving,
        batch_queue: [],
        batch_timer: nil,
        batch_size_limit: @batch_size_limit,
        batch_timeout_ms: @batch_timeout_ms
      }

      {:ok, state}
    else
      error -> {:stop, error}
    end
  end

  def handle_call({:get_embedding, text}, from, state = %{batch_timer: batch_timer, batch_timeout_ms: batch_timeout_ms}) do
    # Add request to queue instead of processing immediately
    new_queue = [{text, from} | state.batch_queue]

    cond do
      # If batch is full, process immediately
      length(new_queue) >= state.batch_size_limit ->
        process_batch(new_queue, state)

      # If this is the first request in batch, start timer
      batch_timer == nil ->
        timer = Process.send_after(self(), :process_batch, batch_timeout_ms)
        {:noreply, %{state | batch_queue: new_queue, batch_timer: timer}}

      # Add to existing batch
      true ->
        {:noreply, %{state | batch_queue: new_queue}}
    end
  end

  # Timer triggers batch processing
  def handle_info(:process_batch, state), do: process_batch(state.batch_queue, state)

  defp process_batch([], state), do: {:noreply, state}

  defp process_batch(queue, state) do
    texts = Enum.map(queue, fn {text, _from} -> text end)
    froms = Enum.map(queue, fn {_text, from} -> from end)

    # Process entire batch and reply at once
    embeddings = run_model_batch(texts, state.serving)
    froms
    |> Enum.zip(embeddings)
    |> Enum.each(fn {from, embedding} ->
         GenServer.reply(from, {:ok, embedding})
       end)

    {:noreply, %{state | batch_queue: [], batch_timer: nil}}
  end

  defp run_model_batch(texts, serving),
    do: Enum.map(texts, &amp;generate_embedding(&amp;1, serving))

  defp generate_embedding(text, serving) do
    serving
    |> Nx.Serving.run(text)
    |> Map.get(:embedding)
    |> Nx.to_flat_list()
  end
end
Kino.start_child({SentenceTransformer.BatchingEmbeddingService, []})

Solution - Document Processing Service

defmodule SentenceTransformer.DocumentProcessor do
  alias SentenceTransformer.{Repo, Document, DocumentChunk, BatchingEmbeddingService}

  @min_chunk_length 50
  @chunk_size 5
  @batch_size 10
  @max_concurrent_batches 3
  @processing_timeout 300_000

  @doc """
  Processes document from start to finish with dynamic batching:
  1. Save document to DB (committed immediately)
  2. Split into chunks
  3. Generate embeddings for all chunks (using dynamic batching)
  4. Save chunks with embeddings to DB
  """
  def process_document(title, content) do
    with {:ok, document} <- create_document(title, content),
         {:ok, _chunks} <- process_chunks(document, content) do
      log_processing_completed(title, document)
      {:ok, document}
    end
  end

  defp create_document(title, content) do
    %Document{}
    |> Document.changeset(%{title: title, content: content})
    |> Repo.insert()
  end

  defp process_chunks(document, content) do
    content
    |> split_into_chunks()
    |> then(&amp;log_processing_started(&amp;1))
    |> Enum.chunk_every(@batch_size)
    |> Enum.with_index()
    |> Task.async_stream(
      fn {chunk_batch, batch_index} ->
        process_batch(document.id, chunk_batch, batch_index)
      end,
      max_concurrency: @max_concurrent_batches,
      timeout: @processing_timeout
    )
    |> Enum.to_list()
    |> then(&amp;{:ok, &amp;1})
  end

  defp process_batch(document_id, chunk_batch, batch_index) do
    log_processing_batch(chunk_batch, batch_index)

    with {:ok, embeddings} <- BatchingEmbeddingService.get_embeddings(chunk_batch),
         {:ok, chunks} <- save_chunks_with_embeddings(chunk_batch, embeddings, document_id, batch_index) do
      chunks
    end
  end

  defp save_chunks_with_embeddings(chunk_batch, embeddings, document_id, batch_index) do
    # Use bulk insert for better database performance
    chunk_data =
      chunk_batch
      |> Enum.zip(embeddings)
      |> Enum.with_index(batch_index * @batch_size)
      |> Enum.map(fn {{chunk_text, embedding}, index} ->
        now = NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second)
        %{
          content: chunk_text,
          embedding: embedding,
          chunk_index: index,
          document_id: document_id,
          inserted_at: now,
          updated_at: now
        }
      end)

    case Repo.insert_all(DocumentChunk, chunk_data, returning: [:id, :content, :chunk_index]) do
      {count, chunks} when count > 0 -> {:ok, chunks}
      {0, _} -> {:error, :no_chunks_inserted}
    end
  end

  defp split_into_chunks(content) do
    content
    |> String.split(~r/\.\s+/)
    |> Enum.chunk_every(@chunk_size)
    |> Enum.map(&amp;Enum.join(&amp;1, ". "))
    |> Enum.reject(&amp;(String.length(&amp;1) < @min_chunk_length))
  end

  defp log_processing_started(chunks) do
    IO.puts("Processing #{length(chunks)} chunks with dynamic batching...")
    chunks
  end

  defp log_processing_batch(chunk_batch, batch_index),
    do: IO.puts("Processing batch #{batch_index + 1} (#{length(chunk_batch)} chunks)...")

  defp log_processing_completed(title, document),
    do: IO.puts("✓ Completed processing document: #{title} (ID: #{document.id})")
end

Solution - Semantic Search Service

defmodule SentenceTransformer.SemanticSearch do
  import Ecto.Query

  alias SentenceTransformer.{Repo, DocumentChunk, BatchingEmbeddingService}

  @default_limit 5

  @doc """
  Finds semantically similar document chunks with dynamic batching.
  """
  def search(query, limit \\ @default_limit) when is_binary(query) do
    query
    |> BatchingEmbeddingService.get_embeddings()
    |> case do
      {:ok, [embedding]} -> {:ok, search_by_embedding(embedding, limit)}
      {:ok, []} -> {:error, :no_embedding_generated}
      error -> error
    end
  end

  defp search_by_embedding(embedding, limit) do
    embedding
    |> build_search_query(limit)
    |> Repo.all()
  end

  # Converts distance to similarity
  #
  # Cosine Distance (<=> operator) returns values between 0 and 2:
  #   0 = identical vectors (same direction)
  #   2 = opposite vectors (completely different directions)
  #   1 = orthogonal vectors (no relationship)
  # Cosine Similarity returns values between -1 and 1:
  #   1 = identical vectors (same direction)
  #   -1 = opposite vectors (completely different directions)
  #   0 = orthogonal vectors (no relationship)
  defp build_search_query(embedding, limit) do
    from(c in DocumentChunk,
      join: d in assoc(c, :document),
      select: %{
        content: c.content,
        chunk_index: c.chunk_index,
        document_id: c.document_id,
        similarity: fragment("1.0 - (? <=> ?)", c.embedding, ^embedding),
        document_title: d.title
      },
      order_by: [asc: fragment("? <=> ?", c.embedding, ^embedding)],
      limit: ^limit
    )
  end
end

Demo and Testing

defmodule SentenceTransformer.FileLoader do
  @base_path "/data/data/markdown"
  @markdown_extension ".md"

  @doc """
  Loads content from markdown files in the data/markdown directory
  """
  def load_markdown_file(filename) do
    filename
    |> build_file_path()
    |> File.read()
  end

  @doc """
  Lists all markdown files in the base directory
  """
  def list_markdown_files do
    @base_path
    |> File.ls()
    |> case do
      {:ok, files} -> {:ok, filter_markdown_files(files)}
      {:error, reason} -> {:error, "Failed to list markdown files: #{reason}"}
    end
  end

  defp build_file_path(filename),
    do: Path.join(@base_path, filename)

  defp filter_markdown_files(files),
    do: Enum.filter(files, &amp;String.ends_with?(&amp;1, @markdown_extension))
end

defmodule SentenceTransformer.Demo do
  @doc """
  Loads and processes all available markdown documents
  """
  def load_documents do
    IO.puts("Loading actual 10-K documents...")

    SentenceTransformer.FileLoader.list_markdown_files()
    |> case do
      {:ok, files} ->
        files
        |> build_document_specs()
        |> process_documents()
      {:error, reason} ->
        log_processing_failed("Failed to list markdown files: #{reason}")
    end
  end

  defp build_document_specs(files),
    do: Enum.map(files, &amp;build_document_spec/1)

  defp build_document_spec(filename),
    do: {filename, generate_title(filename)}

  defp generate_title(filename) do
    case filename do
      "goog-10-k.md" -> "Alphabet Inc. (Google) 10-K - 2024"
      "meta-10-k.md" -> "Meta Platforms Inc. 10-K - 2022"
      _ ->
        filename
        |> String.replace_suffix(".md", "")
        |> String.replace("-", " ")
        |> String.upcase()
    end
  end

  defp process_documents(documents),
    do: Enum.each(documents, &amp;process_document/1)

  defp process_document({filename, title}) do
    filename
    |> SentenceTransformer.FileLoader.load_markdown_file()
    |> case do
      {:ok, content} ->
        log_processing_started(title)
        process_document_content(title, content)
      {:error, reason} ->
        log_processing_failed("Failed to load #{title}: #{reason}")
    end
  end

  defp process_document_content(title, content) do
    case SentenceTransformer.DocumentProcessor.process_document(title, content) do
      {:ok, doc} -> log_processing_completed("Processed #{title}: #{doc.title}")
      {:error, reason} -> log_processing_failed("Failed to process #{title}: #{reason}")
    end
  end

  defp log_processing_started(title), do: IO.puts("Processing #{title}...")
  defp log_processing_completed(message), do: IO.puts("✓ #{message}")
  defp log_processing_failed(message), do: IO.puts("✗ #{message}")
end

SentenceTransformer.Demo.load_documents()
defmodule SentenceTransformer.SearchDemo do
  @document_preview_length 100
  @test_queries [
    "What are the main risk factors mentioned?",
    "How does the company generate revenue?",
    "What are the competitive threats?",
    "What regulatory challenges does the company face?",
    "How is artificial intelligence being used?",
    "What are the cybersecurity risks?",
    "How does the company handle data privacy?",
    "What are the financial performance metrics?",
    "What strategic acquisitions or partnerships are mentioned?",
    "How does the company address environmental sustainability?"
  ]

  @doc """
  Demonstrates semantic search functionality with predefined queries
  """
  def run_search_demo do
    IO.puts("\n=== SEMANTIC SEARCH DEMONSTRATIONS ===\n")

    Enum.each(@test_queries, &amp;demonstrate_query/1)
  end

  defp demonstrate_query(query) do
    IO.puts("Query: \"#{query}\"")

    query
    |> SentenceTransformer.SemanticSearch.search(3)
    |> case do
      {:ok, results} -> display_results(results)
      {:error, reason} -> IO.puts("Search failed: #{reason}")
    end

    IO.puts("")
  end

  defp display_results(results) do
    results
    |> Enum.with_index(1)
    |> Enum.each(&amp;display_result/1)
  end

  defp display_result({result, index}) do
    similarity_percent = format_similarity(result.similarity)
    content_preview = format_content_preview(result.content)
    IO.puts("#{index}. #{similarity_percent}% - #{content_preview}...")
  end

  defp format_similarity(similarity) do
    similarity
    |> Kernel.*(100)
    |> Float.round(1)
  end

  defp format_content_preview(content) do
    content
    |> String.slice(0, @document_preview_length)
    |> String.replace("\n", " ")
  end
end

SentenceTransformer.SearchDemo.run_search_demo()
defmodule SentenceTransformer.ConceptDemo do
  @document_preview_length 100
  @concept_groups [
    {"Risk/Regulatory Concepts:", ["regulatory", "compliance", "legal", "litigation", "government"]},
    {"Financial Performance:", ["revenue", "profit", "earnings", "cash flow", "margins"]},
    {"Technology & Innovation:", ["artificial intelligence", "machine learning", "algorithms", "platform", "software"]},
    {"Competition & Market:", ["competitors", "market share", "competitive", "pricing", "customers"]},
    {"Data & Privacy:", ["data privacy", "user data", "personal information", "GDPR", "privacy policy"]}
  ]

  @doc """
  Demonstrates semantic understanding with concept groups
  """
  def run_concept_demo do
    IO.puts("SEMANTIC UNDERSTANDING DEMO")
    IO.puts("Similar concepts return similar results:")

    Enum.each(@concept_groups, &amp;demonstrate_concept_group/1)
  end

  defp demonstrate_concept_group({title, terms}) do
    IO.puts("\n#{title}")
    Enum.each(terms, &amp;demonstrate_concept/1)
  end

  defp demonstrate_concept(query) do
    query
    |> SentenceTransformer.SemanticSearch.search(1)
    |> case do
      {:ok, [result | _]} -> display_concept_result(query, result)
      {:ok, []} -> display_no_results(query)
      {:error, reason} -> display_search_error(query, reason)
    end
  end

  defp display_concept_result(query, %{similarity: similarity, content: content}),
    do: IO.puts("\"#{query}\" -> #{format_similarity(similarity)}% - #{format_content_preview(content)}...")

  defp display_no_results(query),
    do: IO.puts("\"#{query}\" -> No results found")

  defp display_search_error(query, reason),
    do: IO.puts("\"#{query}\" -> Search error: #{reason}")

  defp format_similarity(similarity) do
    similarity
    |> Kernel.*(100)
    |> Float.round(1)
  end

  defp format_content_preview(content, length \\ @document_preview_length) do
    content
    |> String.slice(0, length)
    |> String.replace("\n", " ")
  end
end

SentenceTransformer.ConceptDemo.run_concept_demo()
defmodule SentenceTransformer.SystemStats do
  import Ecto.Query

  @doc """
  Displays system statistics including document and chunk counts
  """
  def display_stats do
    IO.puts("\nSYSTEM STATISTICS")

    {document_count, chunk_count} = get_counts()
    display_basic_stats(document_count, chunk_count)

    if chunk_count > 0, do: display_vector_stats()
  end

  defp get_counts do
    document_count = SentenceTransformer.Repo.aggregate(SentenceTransformer.Document, :count)
    chunk_count = SentenceTransformer.Repo.aggregate(SentenceTransformer.DocumentChunk, :count)
    {document_count, chunk_count}
  end

  defp display_basic_stats(document_count, chunk_count),
    do: IO.puts("Documents: #{document_count} | Chunks: #{chunk_count}")

  defp display_vector_stats do
    from(c in SentenceTransformer.DocumentChunk, limit: 1)
    |> SentenceTransformer.Repo.one()
    |> get_embedding_info()
    |> display_embedding_info()
  end

  defp get_embedding_info(%{embedding: embedding}) do
    embedding
    |> Pgvector.to_list()
    |> then(&amp;{length(&amp;1), calculate_size_kb(&amp;1)})
  end

  defp calculate_size_kb(embedding_list) do
    embedding_list
    |> length()
    |> Kernel.*(4)
    |> Kernel./(1024)
    |> Float.round(1)
  end

  defp display_embedding_info({vector_size, size_kb}),
    do: IO.puts("Vector dimensions: #{vector_size} (#{size_kb} KB per vector)")
end

SentenceTransformer.SystemStats.display_stats()