Exmeralda Livebook
Mix.install(
[
{:req, "~> 0.5"},
{:req_hex, "~> 0.2.1"},
{:langchain, "~> 0.3.0"},
{:ecto_sql, "~> 3.10"},
{:postgrex, ">= 0.0.0"},
{:rag, "~> 0.2.2"},
{:pgvector, "~> 0.3.0"},
{:kino, "~> 0.15.3"},
{:bitcrowd_ecto, "~> 1.0"}
],
config: [
logger: [level: :info]
]
)
Setup
-
run postgres with pgvector
-
create the database
exmeralda_livebook
-
set GROQ_API_KEY and JINA_API_KEY (as secrets in livebook, in the sidebar)
-
run livebook
-
run ingestion
-
chat
RAG
Postgrex.Types.define(
Exmeralda.PostgrexTypes,
Pgvector.extensions() ++ Ecto.Adapters.Postgres.extensions(),
[]
)
defmodule Exmeralda.Repo do
use Ecto.Repo,
otp_app: :exmeralda,
adapter: Ecto.Adapters.Postgres
end
Kino.start_child(
{Exmeralda.Repo,
url: "postgres://postgres:postgres@localhost/exmeralda_livebook",
types: Exmeralda.PostgrexTypes}
)
defmodule Exmeralda.Repo.Migrations.CreateChunks do
use Ecto.Migration
def change do
execute("CREATE EXTENSION IF NOT EXISTS vector;", "DROP EXTENSION vector")
execute(
"CREATE TYPE chunk_type AS ENUM ('code', 'docs');",
"DROP TYPE chunk_type"
)
create table(:chunks, primary_key: false) do
add(:id, :uuid, primary_key: true)
add(:source, :text, null: false)
add(:content, :text, null: false)
add(:embedding, :vector, size: 768, null: false)
add(:type, :chunk_type, null: false)
add(:library, :text, null: false)
add(:search, :tsvector, generated: "ALWAYS AS (to_tsvector('english', content)) STORED")
end
create(index(:chunks, :search, using: "GIN"))
end
end
Ecto.Migrator.run(
Exmeralda.Repo,
[
{0, Exmeralda.Repo.Migrations.CreateChunks}
],
:up,
all: true
)
defmodule Exmeralda.Topics.Chunk do
use Ecto.Schema
@primary_key false
schema "chunks" do
field(:id, Ecto.UUID, primary_key: true)
field(:type, Ecto.Enum, values: [:code, :docs])
field(:source, :string)
field(:content, :string)
field(:embedding, Pgvector.Ecto.Vector)
field(:library, :string)
end
end
defmodule Exmeralda.Chats.LLM do
alias LangChain.Chains.LLMChain
def stream_responses(messages, handler) do
%{llm: llm()}
|> LLMChain.new!()
|> LLMChain.add_message(system_prompt() |> LangChain.Message.new_system!())
|> LLMChain.add_messages(Enum.map(messages, &to_langchain_message/1))
|> LLMChain.add_callback(handler)
|> LLMChain.run()
end
defp to_langchain_message(%{role: :system, content: content}),
do: LangChain.Message.new_system!(content)
defp to_langchain_message(%{role: :user, content: content}),
do: LangChain.Message.new_user!(content)
defp to_langchain_message(%{role: :assistant, content: content}),
do: LangChain.Message.new_assistant!(content)
defp llm do
LangChain.ChatModels.ChatOpenAI.new!(%{
endpoint: "https://api.groq.com/openai/v1/chat/completions",
api_key: System.fetch_env!("LB_GROQ_API_KEY"),
model: "qwen-2.5-coder-32b",
stream: true
})
end
defp system_prompt do
"""
You are an expert in Elixir programming with in-depth knowledge of Elixir.
Provide accurate information based on the provided context to assist Elixir
developers. Include code snippets and examples to illustrate your points.
Respond in a professional yet approachable manner.
Be concise for straightforward queries, but elaborate when necessary to
ensure clarity and understanding. Adapt your responses to the complexity of
the question. For basic usage, provide clear examples. For advanced topics,
offer detailed explanations and multiple solutions if applicable.
Include references to official documentation or reliable sources to support
your answers. Ensure information is current, reflecting the latest updates
in the library. If the context does not provide enough information, state
this in your answer and keep it short. If you are unsure what kind of
information the user needs, please ask follow-up questions.
"""
end
end
defmodule Exmeralda.Topics.Rag do
@moduledoc false
alias Exmeralda.Repo
import Ecto.Query
import Pgvector.Ecto.Query
alias Rag.{Embedding, Generation, Retrieval}
alias LangChain.TextSplitter.{RecursiveCharacterTextSplitter, LanguageSeparators}
@doc_types ~w(.html .md .txt)
@embedding_batch_size 200
@chunk_size 2000
@retrieval_weights %{fulltext_results: 1, semantic_results: 1}
@pgvector_limit 3
@fulltext_limit 3
@excluded_docs ~w(404.html)
@default_splitter RecursiveCharacterTextSplitter.new!(%{chunk_size: @chunk_size})
@elixir_splitter RecursiveCharacterTextSplitter.new!(%{
seperators: LanguageSeparators.elixir(),
chunk_size: @chunk_size
})
def ingest_from_hex(name, version) do
full_name = "#{name}-#{version}"
docs_url = "https://repo.hex.pm/docs/#{full_name}.tar.gz"
code_url = "https://repo.hex.pm/tarballs/#{full_name}.tar"
with {:ok, exdocs} <- hex_fetch(docs_url),
{:ok, repo} <- hex_fetch(code_url) do
docs =
for {path, content} <- exdocs,
file = to_string(path),
Path.extname(file) in @doc_types and file not in @excluded_docs,
chunk <- chunk_text(file, content) do
%{source: file, type: :docs, content: chunk, library: name, id: Ecto.UUID.generate()}
end
code =
for {file, content} <- repo["contents.tar.gz"],
String.valid?(content),
chunk <- chunk_text(file, content) do
%{
source: file,
type: :code,
content: Enum.join(["# #{file}\n\n", chunk]),
library: name,
id: Ecto.UUID.generate()
}
end
chunks =
(docs ++ code)
|> Enum.chunk_every(@embedding_batch_size)
|> tap(fn v -> IO.puts("embedding #{Enum.count(v)}") end)
|> Enum.flat_map(fn batch ->
batch
|> Embedding.generate_embeddings_batch(embedding_provider(),
text_key: :content,
embedding_key: :embedding
)
end)
Exmeralda.Repo.insert_all(Exmeralda.Topics.Chunk, chunks)
:ok
end
end
defp hex_fetch(url) do
[url: url]
|> Keyword.merge(Application.get_env(:exmeralda, :hex_req_options, []))
|> Req.new()
|> ReqHex.attach()
|> Req.get!()
|> case do
%Req.Response{status: 200, body: body} -> {:ok, body}
%Req.Response{status: 404} -> {:error, {:repo_not_found, url}}
response -> {:error, {:hex_fetch_error, response}}
end
end
defp chunk_text(file, content) do
file
|> Path.extname()
|> case do
".ex" -> @elixir_splitter
_ -> @default_splitter
end
|> RecursiveCharacterTextSplitter.split_text(content)
end
def build_generation(scope, query, opts \\ []) do
generation =
Generation.new(query, opts)
|> Embedding.generate_embedding(embedding_provider())
|> Retrieval.retrieve(:fulltext_results, &query_fulltext(&1, scope))
|> Retrieval.retrieve(:semantic_results, &query_with_pgvector(&1, scope))
|> Retrieval.reciprocal_rank_fusion(@retrieval_weights, :rrf_result)
|> Retrieval.deduplicate(:rrf_result, [:id])
result = Generation.get_retrieval_result(generation, :rrf_result)
context = Enum.map_join(result, "\n\n", & &1.content)
context_sources = Enum.map(result, & &1.source)
prompt = prompt(query, context)
{result,
generation
|> Generation.put_context(context)
|> Generation.put_context_sources(context_sources)
|> Generation.put_prompt(prompt)}
end
defp query_with_pgvector(%{query_embedding: query_embedding}, scope) do
{:ok,
Repo.all(
scope
|> order_by([c], l2_distance(c.embedding, ^Pgvector.new(query_embedding)))
|> limit(@pgvector_limit)
)}
end
defp query_fulltext(%{query: query}, scope) do
{:ok,
Repo.all(
scope
|> order_by(fragment("search @@ websearch_to_tsquery(?)", ^query))
|> limit(@fulltext_limit)
)}
end
defp prompt(query, context) do
"""
Context information is below.
---------------------
#{context}
---------------------
Given the context information and no prior knowledge, answer the query.
Query: #{query}
Answer:
"""
end
defp embedding_provider do
Rag.Ai.OpenAI.new(%{
embeddings_url: "https://api.jina.ai/v1/embeddings",
api_key: System.fetch_env!("LB_JINA_API_KEY"),
embeddings_model: "jina-embeddings-v2-base-code"
})
end
end
UI
import Kino.Shorts
import Ecto.Query
response_frame = frame()
Kino.Frame.render(response_frame, "")
sources_frame = frame()
Kino.Frame.render(sources_frame, "")
library = Kino.Input.text("Library", default: "ecto")
input = Kino.Input.text("Question", default: "What is Ecto?")
handler = %{
on_llm_new_delta: fn _model, %LangChain.MessageDelta{} = data ->
# content = case data.content do
# nil -> ""
# [] -> ""
# content when is_binary(content) -> content
# end
# Kino.render(response_frame, text(content))
:ok
end,
on_message_processed: fn _chain, %LangChain.Message{} = data ->
Kino.Frame.render(response_frame, markdown(data.content))
end
}
Kino.render(response_frame)
Kino.render(grid([library, input], columns: 2))
Kino.Frame.clear(response_frame)
library = Kino.Input.read(library)
scope = from(c in Exmeralda.Topics.Chunk, where: c.library == ^library)
query = Kino.Input.read(input)
{_, generation} = Exmeralda.Topics.Rag.build_generation(scope, query)
dbg(generation)
messages = [%{role: :user, content: generation.prompt}]
Exmeralda.Chats.LLM.stream_responses(messages, handler)
Kino.nothing()
:ok = Exmeralda.Topics.Rag.ingest_from_hex("ecto", "3.12.5")