RAG Elixir Phoenix Liveview documentation
Mix.install(
[
{:req, "~> 0.5.6"},
{:bumblebee, "~> 0.5.3"},
{:ollama, "~> 0.7.1"},
{:text_chunker, "~> 0.3.1"},
{:postgrex, "~> 0.19.1"},
{:pgvector, "~> 0.3.0"},
{:ecto_sql, "~> 3.12"},
{:exla, "~> 0.7.3"},
{:kino_bumblebee, "~> 0.5.0"},
{:scholar, "~> 0.3.1"},
{:explorer, "~> 0.9.2"},
{:tucan, "~> 0.3.1"}
],
config: [nx: [default_backend: EXLA.Backend]]
)
Nx.Defn.global_default_options(compiler: EXLA, client: :host)
Vector extension to Postgres with Docker
1) pgvector
To add the pgvector extension to your PostgreSQL
container, you’ll need to use a PostgreSQL
image that includes this extension.
The official PostgreSQL image doesn’t include pg_vector
by default, so we’ll extend the Postgres image and build use a custom image that has the extension pgvector pre-installed.
Create a Dockerfile with the following content:
Dockerfile
FROM postgres:16
RUN apt-get update && apt-get install -y \
git \
build-essential \
postgresql-server-dev-16
RUN git clone https://github.com/pgvector/pgvector.git && \
cd pgvector && \
make && \
make install
CMD ["postgres"]
Build the custom image named “postgres-with-vector”: we have a 1.5Gb image.
> docker build -t postgres-with-vector .
Run a container in detached mode named “postgres-rag” from this custom “postgres-with-vector” image, create the database “rag_example”, and open the port 5432 for the Elixir backend to be able to connect to:
> docker run \
-d --rm \
--name postgres-rag \
-e POSTGRES_PASSWORD=secret \
-e POSTGRES_DB=rag_example \
-p 5432:5432 \
postgres-with-vector
Check the logs:
> docker logs postgres-rag
LOG: database system is ready to accept connections
In another terminal, connect to the running “postgres-rag” container and execute psql
on the “rag_example” database:
> docker exec -it postgres-rag psql -U postgres -d rag_example
We execute the psql
CLI in the container (with the default username “postgres” and password as above) to connect to the database “rag_example”:
rag_example=#
2) Use an Ecto.Repo
The extension will define a custom type:
Postgrex.Types.define(
RAG.PostgrexTypes,
Pgvector.extensions() ++ Ecto.Adapters.Postgres.extensions(),
[]
)
Note that you can also use the Postgres adaptor Postgrex directly with raw SQL commands.
Postgrex code without Ecto
{:ok, pg} = Postgrex.start_link(
username: "postgres",
password: "secret",
database: "rag_example",
type: "RAG.PostgrexTypes"
)
Postgrex.query!(pg, "create extension if not exists vector;", [])
Postgrex.query!(pg, "drop table if exists documents;", [])
Postgrex.query!(pg, "create table documents ....", [])
We use Ecto.Repo behaviour. We can use a more friendly DSL than raw SQL commands.
defmodule RAG.Repo do
use Ecto.Repo,
otp_app: :rag,
adapter: Ecto.Adapters.Postgres
end
defmodule RAG.Document do
use Ecto.Schema
schema "documents" do
field :content, :string
field :embedding, Pgvector.Ecto.Vector
end
end
{:ok, pg} =
RAG.Repo.start_link(
hostname: "localhost",
username: "postgres",
password: "secret",
database: "rag_example",
types: RAG.PostgrexTypes
)
We create the extension:
RAG.Repo.query!("create extension if not exists vector;")
We check in the terminal that the index HNSW
method is available:
rag_example=# select * from pg_am where amname='hnsw';
16450 | hnsw | hnswhandler | i
We create a table with two columns, “content” and “embedding” where the datatypes are respectively “text” and “vector(384)”. The later is because we will be using an embedding model with 384 dimensions (see further).
We create an hnsw
index on the “embedding” column using the “cosine” distance.
cf documentation: an HNSW index creates a multilayer graph. It has better query performance than IVFFlat (in terms of speed-recall tradeoff), but has slower build times and uses more memory. Also, an index can be created without any data in the table
# reset the table
RAG.Repo.query!("drop table if exists documents;")
RAG.Repo.query!("""
CREATE TABLE IF NOT EXISTS documents (
id SERIAL PRIMARY KEY,
content TEXT,
embedding vector(384)
)
""")
RAG.Repo.query!(
"create index if not exists embedding_idx on documents using hnsw (embedding vector_l2_ops);"
)
Check in the terminal (that runs psql
in the container) the details of the created table “documents” and the indexes we created
rag_example=# \d documents
id | integer | | not null | nextval('documents_id_seq'::regclass)
content | text | | |
embedding | vector(384) | | |
rag_example=# select * from pg_indexes where tablename='documents';
public | documents | documents_pkey | | CREATE UNIQUE INDEX documents_pkey ON public.documents USING btree (id)
public | documents | documents_embedding_idx | | CREATE INDEX documents_embedding_idx ON public.documents USING hnsw (embedding vector_cosine_ops)
Fetching and chunking documents
We implement the logic to fetch documents from the Phoenix LiveView
GitHub repo and chunk them with TextChunker
.
defmodule RAG.DataCollector do
def process_directory(url, extractor) do
Req.get!(url).body
#|> Enum.flat_map(&extract_chunks/1)
|> Enum.flat_map(fn file -> extractor.(file) end)
end
end
Generate & insert embeddings from the sources
We use Bumblebee
to load a sentence transformer model, and then compute the embeddings and insert into the database
defmodule RAG.Embedder do
def load_model do
repo = {:hf, "sentence-transformers/all-MiniLM-L6-v2"}
{:ok, model_info} = Bumblebee.load_model(repo)
{:ok, tokenizer} = Bumblebee.load_tokenizer(repo)
embedding_serving =
Bumblebee.Text.text_embedding(
model_info,
tokenizer,
output_pool: :mean_pooling,
output_attribute: :hidden_state,
embedding_processor: :l2_norm,
compile: [batch_size: 1, sequence_length: [2000]],
defn_options: [compiler: EXLA]
)
Kino.start_child({Nx.Serving, serving: embedding_serving, name: ChunkEmbedder})
end
def generate_embedding(text) do
%{embedding: vector} = Nx.Serving.batched_run(ChunkEmbedder, String.trim(text))
Nx.to_flat_list(vector)
end
end
Test the embedding against Python
Python check
Lets firstly test that our embedding works correctly.
We use the Python
results running this model as our source of truth.
We use the Python
library llm to compute an embedding of a given chunk.
We install a plugin to bring in an embedding model “sentence-transformers”:
> llm install llm-sentence-transformers
We check the installation:
> llm plugins
[
{
"name": "llm-sentence-transformers",
"hooks": [
"register_commands",
"register_embedding_models"
],
"version": "0.2"
}
]
We load the model and use the llm
CLI to test the output of the chunk “phoenix liveview”:
> llm embed -c 'phoenix liveview' -m sentence-transformers/all-MiniLM-L6-v2
We obtain a vector of length 384 (as expected when we craeted the row “embedding” in our “documents” table)
[-0.009706685319542885, -0.052094198763370514, -0.09055887907743454, -0.020933324471116066, -0.009688383899629116, 0.013350575231015682, 0.025953974574804306, -0.16938750445842743, -0.010423310101032257, -0.011145276017487049, 0.027349309995770454, -0.001918078283779323, -0.021567553281784058, -0.003199926810339093, -0.0008285145158879459, -0.015139210037887096, 0.06255557388067245, -0.06932919472455978, 0.013888751156628132, -0.004555793013423681, -0.07562420517206192, -0.009811706840991974, -0.012136539444327354, 0.04693487659096718,...]
We now test our Bumblebee
settings.
We load the model:
RAG.Embedder.load_model()
and we check that we obtain the same (first!) values as above when we run our Bumblebee
based embedder against the same chunk:
RAG.Embedder.generate_embedding("phoenix liveview")
Build the RAG source
We setup the foundations of our RAG by chunking and inserting our documents as strngs and embeddings, their numerical representation, into our vector database.
We read each Github folder and download the markdown file, chunk it into a list of strings, and then compute an embedding for each chunk and save it into the vector database.
defmodule RAG.ExternalSources do
def extract_chunks(file) do
case file do
%{"type" => "file", "name" => name, "download_url" => download_url} ->
if String.ends_with?(name, ".md") do
Req.get!(download_url).body
|> TextChunker.split(format: :markdown, chunk_size: 800, chunk_overlap: 200)
|> Enum.map(&Map.get(&1, :text))
else
[]
end
_ -> []
end
end
def build(guides) do
guides
|> Task.async_stream(fn guide ->
chunks = RAG.DataCollector.process_directory(guide, &extract_chunks/1)
IO.puts("chunks length: #{length(chunks)}")
Enum.each(chunks, fn chunk ->
Task.start(fn ->
embedding = RAG.Embedder.generate_embedding(chunk)
RAG.Repo.insert!(%RAG.Document{content: chunk, embedding: embedding})
end)
end)
end,
ordered: false,
timeout: :infinity
)
|> Stream.run()
end
end
guides = [
"https://api.github.com/repos/phoenixframework/phoenix_live_view/contents/guides/server",
"https://api.github.com/repos/phoenixframework/phoenix_live_view/contents/guides/client",
"https://api.github.com/repos/phoenixframework/phoenix_live_view/contents/guides/introduction"
]
RAG.ExternalSources.build(guides)
We check the number of insertions. We should have 422.
RAG.Repo.aggregate(RAG.Document, :count, :id)
Semantic search
We implement the l2 similarity search on the embeddings
top_k = 20
defmodule RAG.SemanticSearch do
import Ecto.Query
def search(query, top_k) do
query_embedding = RAG.Embedder.generate_embedding(query)
from(d in RAG.Document,
order_by: fragment("embedding <-> ?", ^query_embedding),
limit: ^top_k
)
|> RAG.Repo.all()
end
end
# Usage
query = "how to handle forms server-side?"
# a list of %RAG.Document{content: content, embedding: embedding}
top_results = RAG.SemanticSearch.search(query, top_k)
We inspect the first reranking:
List.first(top_results).content
Re-ranking with cross-encoder
For this step, we’ll load another model from Huggingface compatible with Bumblebee to rerank the results.
We use a pretrained model “cross-encoder/ms-marco-MiniLM-L-6-v2” as shown in the SBert documentation on cross-encoders.
defmodule RAG.CrossEncoder do
@first 5
def load_model do
repo= {:hf, "cross-encoder/ms-marco-MiniLM-L-6-v2"}
tokenizer = {:hf, "bert-base-uncased"}
{:ok, model_info} = Bumblebee.load_model(repo)
{:ok, tokenizer} = Bumblebee.load_tokenizer(tokenizer)
{model_info, tokenizer}
end
def rerank(documents, query) do
# Prepare input pairs for cross-encoder
{model_info, tokenizer} = load_model()
input_pairs =
Bumblebee.apply_tokenizer(tokenizer,
Enum.map(documents, fn doc ->
{query, doc.content}
end)
)
# Run cross-encoder in batches
outputs = Axon.predict(model_info.model, model_info.params, input_pairs)
# Combine scores with original documents and sort
Enum.zip(documents, outputs.logits |> Nx.to_flat_list())
|> Enum.sort_by(fn {_, score} -> score end, :desc)
|> Enum.map(fn {doc, _} -> doc.content end)
|> Enum.take(@first)
end
end
Check reranking against Python
[TODO]
This model uses the architecture :for_sequence_classification
; there is no such function yet coded in Bumblebee at the time of writting.
Build the context by re-ranking
# Load the model
RAG.CrossEncoder.load_model()
# Rerank the results
#query = "how to handle forms server-side?"
context = RAG.CrossEncoder.rerank(top_results, query)
Build the prompt
We define the prompt with a context and a question
defmodule RAG.PromptBuilder do
def build_prompt(context, query) do
context_text = Enum.join(context, "\n\n")
"""
You are a proficient Elixir developer, with full knowledge of the framework Phoenix LiveView.
You are given a context information below relevant to the query that is submitted to you.
-----------------------
#{context_text}
-----------------------
You answer to the query using in priority the context informations given above and you should cite it.
The response should be in markdown format.
Query: #{query}
Answer:
"""
end
end
LLM integration
Most of the LLM are paid solutions accesible via an endpoint. Very few models can be run locally. LLMs tends to be large.
We run the “codellama” model via the ollama
plateform
LLama CLI
Install and start ollama server
We install ollama
(see the repo) to install de “codellama” LLM.
We pull a model from the registry:
> ollama pull codellama
We start an LLM server:
> ollama serve
This gives us an interactive CLI and a REST API.
We can test this and send a POST request to generate a completion where we pass a json {"model": "codellama", "prompt": "...."}
.
> curl http://localhost:11434/api/generate -d \
'{"model": "codellama", "prompt": "how to handle forms with Phoenix Liveview?", "stream": false}'
We get a response back:
{
"model":"codellama",
"created_at":"2024-08-29T07:25:31.941263Z",
"response":"\nTo handle forms in Phoenix LiveView, you can use the `Phoenix.LiveView.Form` module. This module provides a set of functions for creating and manipulating HTML form elements, as well as handling form data on the server.\n\nHere's an example of how to create a simple form using Phoenix LiveView:\n```\nimport Ecto.Changeset\n\n# Create a changeset for the form\nchangeset = Ecto.Changeset.change(%YourModel{}, %{})\n\n# Render the form in your template\n\u003cform phx-submit=\"save\"\u003e\n \u003cdiv\u003e\n \u003clabel for=\"name\"\u003eName:\u003c/label\u003e\n \u003cinput type=\"text\" id=\"name\" name=\"name\" value={changeset.data[\"name\"]} /\u003e\n \u003c/div\u003e\n\n \u003cdiv\u003e\n \u003clabel for=\"age\"\u003eAge:\u003c/label\u003e\n \u003cinput type=\"number\" id=\"age\" name=\"age\" value={changeset.data[\"age\"]} /\u003e\n \u003c/div\u003e\n\n \u003cbutton type=\"submit\"\u003eSave\u003c/button\u003e\n\u003c/form\u003e\n```\nIn this example, we're creating a changeset for the form, which is used to validate and update the form data on the server. We then render the form in our template using the `phx-submit` attribute, which tells Phoenix to send the form data to the server when the form is submitted.\n\nWhen the form is submitted, Phoenix will automatically handle the form data and update the changeset with any validation errors or updates. You can then use the updated changeset to persist the data in your database.\n\nTo handle the form submission on the server, you can define a `save` function in your LiveView module that will be called when the form is submitted. This function will receive the updated changeset as an argument, and you can use it to update the data in your database or perform any other necessary actions.\n```\ndef save(changeset) do\n # Validate the changeset and return an error if there are any validation errors\n case Ecto.Changeset.apply_action(changeset, :update) do\n {:ok, _model} -\u003e\n # Update the data in your database or perform any other necessary actions\n :ok\n\n {:error, _changeset} -\u003e\n # Render an error page if there were validation errors\n render(:index, changeset: changeset)\n end\nend\n```\nIn this example, we're using the `Ecto.Changeset` module to validate the form data and update the changeset with any validation errors or updates. If there are no validation errors, we can use the updated changeset to persist the data in our database or perform any other necessary actions. If there are validation errors, we render an error page with the updated changeset.\n\nOverall, using Phoenix LiveView forms provides a convenient and efficient way to handle form data on the server, while also providing a seamless user experience for your users.",
"done":true,
...
}
We check that ollama
is running:
lsof -i -P | grep LISTEN | grep 11434
Generate a response via the LLama REST API and Elixir
The Livebook runs a POST request with Req
and pass a json to the :json
key.
As per the documentation, it does Jason.encode_to_iodata(%{model: "codellama", "prompt": "..."})
and sets the adequate headers.
Note that we need to increase the socket timeout above the default 5000.
defmodule LLM do
def generate_response(prompt) do
json = %{stream: false, model: "codellama", prompt: prompt}
res =
Req.post!(
"http://localhost:11434/api/generate",
json: json,
receive_timeout: 120_000
)
case res do
%{status: 200, body: body} ->
body["response"]
_ ->
IO.puts "error"
end
end
end
query
RAG.PromptBuilder.build_prompt(context, query)
|> LLM.generate_response()
Wrap up
Seed the database with external sources
guides = [
"https://api.github.com/repos/phoenixframework/phoenix_live_view/contents/guides/server",
"https://api.github.com/repos/phoenixframework/phoenix_live_view/contents/guides/client",
"https://api.github.com/repos/phoenixframework/phoenix_live_view/contents/guides/introduction"
]
RAG.ExternalSources.build(guides)
defmodule RAG do
def process_query(query) do
top_k = 10
query
|> RAG.SemanticSearch.search(top_k)
# top_results
|> RAG.CrossEncoder.rerank(query)
# context
|> tap(&IO.puts/1)
|> RAG.PromptBuilder.build_prompt(query)
# prompt
|> LLM.generate_response()
end
end
query = "explain Javascript interoperability on the server-side"
RAG.process_query(query)
Dimension reduction & visualization
We will use the scholar
librabry,
require Explorer.DataFrame, as: DF