Powered by AppSignal & Oban Pro

Exmeralda Livebook

exmeralda_livebook.livemd

Exmeralda Livebook

Mix.install(
  [
    {:req, "~> 0.5"},
    {:req_hex, "~> 0.2.1"},
    {:langchain, "~> 0.3.0"},
    {:ecto_sql, "~> 3.10"},
    {:postgrex, ">= 0.0.0"},
    {:rag, "~> 0.2.2"},
    {:pgvector, "~> 0.3.0"},
    {:kino, "~> 0.15.3"},
    {:bitcrowd_ecto, "~> 1.0"}
  ],
  config: [
    logger: [level: :info]
  ]
)

Setup

  1. run postgres with pgvector
  2. create the database exmeralda_livebook
  3. set GROQ_API_KEY and JINA_API_KEY (as secrets in livebook, in the sidebar)
  4. run livebook
  5. run ingestion
  6. chat

RAG

Postgrex.Types.define(
  Exmeralda.PostgrexTypes,
  Pgvector.extensions() ++ Ecto.Adapters.Postgres.extensions(),
  []
)
defmodule Exmeralda.Repo do
  use Ecto.Repo,
    otp_app: :exmeralda,
    adapter: Ecto.Adapters.Postgres
end

Kino.start_child(
  {Exmeralda.Repo,
   url: "postgres://postgres:postgres@localhost/exmeralda_livebook",
   types: Exmeralda.PostgrexTypes}
)
defmodule Exmeralda.Repo.Migrations.CreateChunks do
  use Ecto.Migration

  def change do
    execute("CREATE EXTENSION IF NOT EXISTS vector;", "DROP EXTENSION vector")

    execute(
      "CREATE TYPE chunk_type AS ENUM ('code', 'docs');",
      "DROP TYPE chunk_type"
    )

    create table(:chunks, primary_key: false) do
      add(:id, :uuid, primary_key: true)
      add(:source, :text, null: false)
      add(:content, :text, null: false)
      add(:embedding, :vector, size: 768, null: false)
      add(:type, :chunk_type, null: false)
      add(:library, :text, null: false)
      add(:search, :tsvector, generated: "ALWAYS AS (to_tsvector('english', content)) STORED")
    end

    create(index(:chunks, :search, using: "GIN"))
  end
end
Ecto.Migrator.run(
  Exmeralda.Repo,
  [
    {0, Exmeralda.Repo.Migrations.CreateChunks}
  ],
  :up,
  all: true
)
defmodule Exmeralda.Topics.Chunk do
  use Ecto.Schema

  @primary_key false
  schema "chunks" do
    field(:id, Ecto.UUID, primary_key: true)
    field(:type, Ecto.Enum, values: [:code, :docs])
    field(:source, :string)
    field(:content, :string)
    field(:embedding, Pgvector.Ecto.Vector)
    field(:library, :string)
  end
end
defmodule Exmeralda.Chats.LLM do
  alias LangChain.Chains.LLMChain

  def stream_responses(messages, handler) do
    %{llm: llm()}
    |> LLMChain.new!()
    |> LLMChain.add_message(system_prompt() |> LangChain.Message.new_system!())
    |> LLMChain.add_messages(Enum.map(messages, &to_langchain_message/1))
    |> LLMChain.add_callback(handler)
    |> LLMChain.run()
  end

  defp to_langchain_message(%{role: :system, content: content}),
    do: LangChain.Message.new_system!(content)

  defp to_langchain_message(%{role: :user, content: content}),
    do: LangChain.Message.new_user!(content)

  defp to_langchain_message(%{role: :assistant, content: content}),
    do: LangChain.Message.new_assistant!(content)

  defp llm do
    LangChain.ChatModels.ChatOpenAI.new!(%{
      endpoint: "https://api.groq.com/openai/v1/chat/completions",
      api_key: System.fetch_env!("LB_GROQ_API_KEY"),
      model: "qwen-2.5-coder-32b",
      stream: true
    })
  end

  defp system_prompt do
    """
    You are an expert in Elixir programming with in-depth knowledge of Elixir.
    Provide accurate information based on the provided context to assist Elixir
    developers. Include code snippets and examples to illustrate your points.
    Respond in a professional yet approachable manner.
    Be concise for straightforward queries, but elaborate when necessary to
    ensure clarity and understanding. Adapt your responses to the complexity of
    the question. For basic usage, provide clear examples. For advanced topics,
    offer detailed explanations and multiple solutions if applicable.
    Include references to official documentation or reliable sources to support
    your answers. Ensure information is current, reflecting the latest updates
    in the library. If the context does not provide enough information, state
    this in your answer and keep it short. If you are unsure what kind of
    information the user needs, please ask follow-up questions.
    """
  end
end
defmodule Exmeralda.Topics.Rag do
  @moduledoc false
  alias Exmeralda.Repo
  import Ecto.Query
  import Pgvector.Ecto.Query
  alias Rag.{Embedding, Generation, Retrieval}
  alias LangChain.TextSplitter.{RecursiveCharacterTextSplitter, LanguageSeparators}

  @doc_types ~w(.html .md .txt)
  @embedding_batch_size 200
  @chunk_size 2000
  @retrieval_weights %{fulltext_results: 1, semantic_results: 1}
  @pgvector_limit 3
  @fulltext_limit 3
  @excluded_docs ~w(404.html)

  @default_splitter RecursiveCharacterTextSplitter.new!(%{chunk_size: @chunk_size})
  @elixir_splitter RecursiveCharacterTextSplitter.new!(%{
                     seperators: LanguageSeparators.elixir(),
                     chunk_size: @chunk_size
                   })

  def ingest_from_hex(name, version) do
    full_name = "#{name}-#{version}"
    docs_url = "https://repo.hex.pm/docs/#{full_name}.tar.gz"
    code_url = "https://repo.hex.pm/tarballs/#{full_name}.tar"

    with {:ok, exdocs} <- hex_fetch(docs_url),
         {:ok, repo} <- hex_fetch(code_url) do
      docs =
        for {path, content} <- exdocs,
            file = to_string(path),
            Path.extname(file) in @doc_types and file not in @excluded_docs,
            chunk <- chunk_text(file, content) do
          %{source: file, type: :docs, content: chunk, library: name, id: Ecto.UUID.generate()}
        end

      code =
        for {file, content} <- repo["contents.tar.gz"],
            String.valid?(content),
            chunk <- chunk_text(file, content) do
          %{
            source: file,
            type: :code,
            content: Enum.join(["# #{file}\n\n", chunk]),
            library: name,
            id: Ecto.UUID.generate()
          }
        end

      chunks =
        (docs ++ code)
        |> Enum.chunk_every(@embedding_batch_size)
        |> tap(fn v -> IO.puts("embedding #{Enum.count(v)}") end)
        |> Enum.flat_map(fn batch ->
          batch
          |> Embedding.generate_embeddings_batch(embedding_provider(),
            text_key: :content,
            embedding_key: :embedding
          )
        end)

      Exmeralda.Repo.insert_all(Exmeralda.Topics.Chunk, chunks)

      :ok
    end
  end

  defp hex_fetch(url) do
    [url: url]
    |> Keyword.merge(Application.get_env(:exmeralda, :hex_req_options, []))
    |> Req.new()
    |> ReqHex.attach()
    |> Req.get!()
    |> case do
      %Req.Response{status: 200, body: body} -> {:ok, body}
      %Req.Response{status: 404} -> {:error, {:repo_not_found, url}}
      response -> {:error, {:hex_fetch_error, response}}
    end
  end

  defp chunk_text(file, content) do
    file
    |> Path.extname()
    |> case do
      ".ex" -> @elixir_splitter
      _ -> @default_splitter
    end
    |> RecursiveCharacterTextSplitter.split_text(content)
  end

  def build_generation(scope, query, opts \\ []) do
    generation =
      Generation.new(query, opts)
      |> Embedding.generate_embedding(embedding_provider())
      |> Retrieval.retrieve(:fulltext_results, &amp;query_fulltext(&amp;1, scope))
      |> Retrieval.retrieve(:semantic_results, &amp;query_with_pgvector(&amp;1, scope))
      |> Retrieval.reciprocal_rank_fusion(@retrieval_weights, :rrf_result)
      |> Retrieval.deduplicate(:rrf_result, [:id])

    result = Generation.get_retrieval_result(generation, :rrf_result)
    context = Enum.map_join(result, "\n\n", &amp; &amp;1.content)
    context_sources = Enum.map(result, &amp; &amp;1.source)

    prompt = prompt(query, context)

    {result,
     generation
     |> Generation.put_context(context)
     |> Generation.put_context_sources(context_sources)
     |> Generation.put_prompt(prompt)}
  end

  defp query_with_pgvector(%{query_embedding: query_embedding}, scope) do
    {:ok,
     Repo.all(
       scope
       |> order_by([c], l2_distance(c.embedding, ^Pgvector.new(query_embedding)))
       |> limit(@pgvector_limit)
     )}
  end

  defp query_fulltext(%{query: query}, scope) do
    {:ok,
     Repo.all(
       scope
       |> order_by(fragment("search @@ websearch_to_tsquery(?)", ^query))
       |> limit(@fulltext_limit)
     )}
  end

  defp prompt(query, context) do
    """
    Context information is below.
    ---------------------
    #{context}
    ---------------------
    Given the context information and no prior knowledge, answer the query.
    Query: #{query}
    Answer:
    """
  end

  defp embedding_provider do
    Rag.Ai.OpenAI.new(%{
      embeddings_url: "https://api.jina.ai/v1/embeddings",
      api_key: System.fetch_env!("LB_JINA_API_KEY"),
      embeddings_model: "jina-embeddings-v2-base-code"
    })
  end
end

UI

import Kino.Shorts
import Ecto.Query

response_frame = frame()
Kino.Frame.render(response_frame, "")

sources_frame = frame()
Kino.Frame.render(sources_frame, "")

library = Kino.Input.text("Library", default: "ecto")
input = Kino.Input.text("Question", default: "What is Ecto?")

handler = %{
  on_llm_new_delta: fn _model, %LangChain.MessageDelta{} = data ->
    # content = case data.content do
    #   nil -> ""
    #   [] -> ""
    #   content when is_binary(content) -> content
    # end

    # Kino.render(response_frame, text(content))
    :ok
  end,
  on_message_processed: fn _chain, %LangChain.Message{} = data ->
    Kino.Frame.render(response_frame, markdown(data.content))
  end
}
Kino.render(response_frame)
Kino.render(grid([library, input], columns: 2))

Kino.Frame.clear(response_frame)

library = Kino.Input.read(library)
scope = from(c in Exmeralda.Topics.Chunk, where: c.library == ^library)
query = Kino.Input.read(input)

{_, generation} = Exmeralda.Topics.Rag.build_generation(scope, query)

dbg(generation)

messages = [%{role: :user, content: generation.prompt}]

Exmeralda.Chats.LLM.stream_responses(messages, handler)

Kino.nothing()
:ok = Exmeralda.Topics.Rag.ingest_from_hex("ecto", "3.12.5")