Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

RAG Elixir Phoenix Liveview documentation

rag-elixir.livemd

RAG Elixir Phoenix Liveview documentation

Mix.install(
  [
    {:req, "~> 0.5.6"},
    {:bumblebee, "~> 0.5.3"},
    {:ollama, "~> 0.7.1"},
    {:text_chunker, "~> 0.3.1"},
    {:postgrex, "~> 0.19.1"},
    {:pgvector, "~> 0.3.0"},
    {:ecto_sql, "~> 3.12"},
    {:exla, "~> 0.7.3"},
    {:kino_bumblebee, "~> 0.5.0"},
    {:scholar, "~> 0.3.1"},
    {:explorer, "~> 0.9.2"},
    {:tucan, "~> 0.3.1"}
  ],
  config: [nx: [default_backend: EXLA.Backend]]
)

Nx.Defn.global_default_options(compiler: EXLA, client: :host)

Vector extension to Postgres with Docker

1) pgvector

To add the pgvector extension to your PostgreSQL container, you’ll need to use a PostgreSQL image that includes this extension.

The official PostgreSQL image doesn’t include pg_vector by default, so we’ll extend the Postgres image and build use a custom image that has the extension pgvector pre-installed.

Create a Dockerfile with the following content:

Dockerfile

FROM postgres:16

RUN apt-get update && apt-get install -y \
    git \
    build-essential \
    postgresql-server-dev-16

RUN git clone https://github.com/pgvector/pgvector.git && \
    cd pgvector && \
    make && \
    make install

CMD ["postgres"]

Build the custom image named “postgres-with-vector”: we have a 1.5Gb image.

> docker build -t postgres-with-vector .

Run a container in detached mode named “postgres-rag” from this custom “postgres-with-vector” image, create the database “rag_example”, and open the port 5432 for the Elixir backend to be able to connect to:

> docker run \
   -d --rm \
   --name postgres-rag \
   -e POSTGRES_PASSWORD=secret \
   -e POSTGRES_DB=rag_example \
   -p 5432:5432 \
   postgres-with-vector

Check the logs:

> docker logs postgres-rag

LOG:  database system is ready to accept connections

In another terminal, connect to the running “postgres-rag” container and execute psql on the “rag_example” database:

> docker exec -it postgres-rag psql -U postgres -d rag_example

We execute the psql CLI in the container (with the default username “postgres” and password as above) to connect to the database “rag_example”:

rag_example=#

2) Use an Ecto.Repo

The extension will define a custom type:

Postgrex.Types.define(
  RAG.PostgrexTypes, 
  Pgvector.extensions() ++ Ecto.Adapters.Postgres.extensions(), 
  []
)

Note that you can also use the Postgres adaptor Postgrex directly with raw SQL commands.

Postgrex code without Ecto

{:ok, pg} = Postgrex.start_link(
  username: "postgres",
  password: "secret", 
  database: "rag_example", 
  type: "RAG.PostgrexTypes"
)

Postgrex.query!(pg, "create extension if not exists vector;", [])
Postgrex.query!(pg, "drop table if exists documents;", [])
Postgrex.query!(pg, "create table documents ....", [])

We use Ecto.Repo behaviour. We can use a more friendly DSL than raw SQL commands.

defmodule RAG.Repo do
  use Ecto.Repo,
    otp_app: :rag,
    adapter: Ecto.Adapters.Postgres
end

defmodule RAG.Document do
  use Ecto.Schema

  schema "documents" do
    field :content, :string
    field :embedding, Pgvector.Ecto.Vector
  end
end

{:ok, pg} = 
  RAG.Repo.start_link(
    hostname: "localhost",
    username: "postgres",
    password: "secret",
    database: "rag_example",
    types: RAG.PostgrexTypes
  )

We create the extension:

RAG.Repo.query!("create extension if not exists vector;")

We check in the terminal that the index HNSW method is available:

rag_example=# select * from pg_am where amname='hnsw';

16450 | hnsw   | hnswhandler | i

We create a table with two columns, “content” and “embedding” where the datatypes are respectively “text” and “vector(384)”. The later is because we will be using an embedding model with 384 dimensions (see further).

We create an hnsw index on the “embedding” column using the “cosine” distance.

cf documentation: an HNSW index creates a multilayer graph. It has better query performance than IVFFlat (in terms of speed-recall tradeoff), but has slower build times and uses more memory. Also, an index can be created without any data in the table

# reset the table
RAG.Repo.query!("drop table if exists documents;")

RAG.Repo.query!("""
  CREATE TABLE IF NOT EXISTS documents (
    id SERIAL PRIMARY KEY,
    content TEXT,
    embedding vector(384)
  )
""")

RAG.Repo.query!(
  "create index if not exists embedding_idx on documents using hnsw (embedding vector_l2_ops);"
)

Check in the terminal (that runs psql in the container) the details of the created table “documents” and the indexes we created

rag_example=# \d documents

 id        | integer     |   | not null | nextval('documents_id_seq'::regclass)
 content   | text        |   |          |
 embedding | vector(384) |   |          |
rag_example=# select * from pg_indexes where tablename='documents';

 public | documents | documents_pkey           |  | CREATE UNIQUE INDEX documents_pkey ON public.documents USING btree (id)
 public | documents | documents_embedding_idx  |  | CREATE INDEX documents_embedding_idx ON public.documents USING hnsw (embedding vector_cosine_ops)

Fetching and chunking documents

We implement the logic to fetch documents from the Phoenix LiveView GitHub repo and chunk them with TextChunker.

defmodule RAG.DataCollector do
  def process_directory(url, extractor) do
    Req.get!(url).body
    #|> Enum.flat_map(&extract_chunks/1)
    |> Enum.flat_map(fn file -> extractor.(file) end)
  end
end

Generate & insert embeddings from the sources

We use Bumblebee to load a sentence transformer model, and then compute the embeddings and insert into the database

defmodule RAG.Embedder do
  def load_model do
    repo = {:hf, "sentence-transformers/all-MiniLM-L6-v2"}
    {:ok, model_info} = Bumblebee.load_model(repo) 
    {:ok, tokenizer} = Bumblebee.load_tokenizer(repo)

    embedding_serving = 
      Bumblebee.Text.text_embedding(
        model_info, 
        tokenizer,
        output_pool: :mean_pooling,
        output_attribute: :hidden_state,
        embedding_processor: :l2_norm,
        compile: [batch_size: 1, sequence_length: [2000]],
        defn_options: [compiler: EXLA]
      )

    Kino.start_child({Nx.Serving, serving: embedding_serving, name: ChunkEmbedder})
  end

  def generate_embedding(text) do
    %{embedding: vector} = Nx.Serving.batched_run(ChunkEmbedder, String.trim(text))
    Nx.to_flat_list(vector)
  end
end

Test the embedding against Python

Python check

Lets firstly test that our embedding works correctly.

We use the Python results running this model as our source of truth.

We use the Python library llm to compute an embedding of a given chunk.

We install a plugin to bring in an embedding model “sentence-transformers”:

> llm install llm-sentence-transformers

We check the installation:

> llm plugins

[
  {
    "name": "llm-sentence-transformers",
    "hooks": [
      "register_commands",
      "register_embedding_models"
    ],
    "version": "0.2"
  }
]

We load the model and use the llm CLI to test the output of the chunk “phoenix liveview”:

> llm embed -c 'phoenix liveview' -m sentence-transformers/all-MiniLM-L6-v2

We obtain a vector of length 384 (as expected when we craeted the row “embedding” in our “documents” table)

[-0.009706685319542885, -0.052094198763370514, -0.09055887907743454, -0.020933324471116066, -0.009688383899629116, 0.013350575231015682, 0.025953974574804306, -0.16938750445842743, -0.010423310101032257, -0.011145276017487049, 0.027349309995770454, -0.001918078283779323, -0.021567553281784058, -0.003199926810339093, -0.0008285145158879459, -0.015139210037887096, 0.06255557388067245, -0.06932919472455978, 0.013888751156628132, -0.004555793013423681, -0.07562420517206192, -0.009811706840991974, -0.012136539444327354, 0.04693487659096718,...]

We now test our Bumblebee settings.

We load the model:

RAG.Embedder.load_model()

and we check that we obtain the same (first!) values as above when we run our Bumblebee based embedder against the same chunk:

RAG.Embedder.generate_embedding("phoenix liveview")

Build the RAG source

We setup the foundations of our RAG by chunking and inserting our documents as strngs and embeddings, their numerical representation, into our vector database.

We read each Github folder and download the markdown file, chunk it into a list of strings, and then compute an embedding for each chunk and save it into the vector database.

defmodule RAG.ExternalSources do

  def extract_chunks(file) do
    case file do
      %{"type" => "file", "name" => name, "download_url" => download_url} ->
        if String.ends_with?(name, ".md") do
          Req.get!(download_url).body
          |> TextChunker.split(format: :markdown, chunk_size: 800, chunk_overlap: 200)
          |> Enum.map(&Map.get(&1, :text))
        else
          []
        end
      _ -> []
    end
  end

  def build(guides) do
    guides
    |> Task.async_stream(fn guide -> 
      chunks = RAG.DataCollector.process_directory(guide, &extract_chunks/1) 
      IO.puts("chunks length: #{length(chunks)}")
      Enum.each(chunks, fn chunk -> 
         Task.start(fn -> 
            embedding = RAG.Embedder.generate_embedding(chunk)
            RAG.Repo.insert!(%RAG.Document{content: chunk, embedding: embedding})  
         end)
      end)
    end, 
    ordered: false,
    timeout: :infinity
    )
    |> Stream.run() 
  end
end
guides  = [
  "https://api.github.com/repos/phoenixframework/phoenix_live_view/contents/guides/server",
  "https://api.github.com/repos/phoenixframework/phoenix_live_view/contents/guides/client",
  "https://api.github.com/repos/phoenixframework/phoenix_live_view/contents/guides/introduction"
]

RAG.ExternalSources.build(guides)

We check the number of insertions. We should have 422.

RAG.Repo.aggregate(RAG.Document, :count, :id) 

Semantic search

We implement the l2 similarity search on the embeddings

top_k = 20
defmodule RAG.SemanticSearch do
  import Ecto.Query

  def search(query, top_k) do
    query_embedding = RAG.Embedder.generate_embedding(query) 
    
    from(d in RAG.Document,
      order_by: fragment("embedding <-> ?", ^query_embedding),
      limit: ^top_k
    )
    |> RAG.Repo.all()
  end
end
# Usage
query = "how to handle forms server-side?"

# a list of %RAG.Document{content: content, embedding: embedding}
top_results = RAG.SemanticSearch.search(query, top_k)

We inspect the first reranking:

List.first(top_results).content

Re-ranking with cross-encoder

For this step, we’ll load another model from Huggingface compatible with Bumblebee to rerank the results.

We use a pretrained model “cross-encoder/ms-marco-MiniLM-L-6-v2” as shown in the SBert documentation on cross-encoders.

defmodule RAG.CrossEncoder do
  @first 5
  
  def load_model do
    repo= {:hf, "cross-encoder/ms-marco-MiniLM-L-6-v2"}
    tokenizer = {:hf, "bert-base-uncased"}
    {:ok, model_info} = Bumblebee.load_model(repo)
    {:ok, tokenizer} = Bumblebee.load_tokenizer(tokenizer)

    {model_info, tokenizer}
  end

  def rerank(documents, query) do
    # Prepare input pairs for cross-encoder
    {model_info, tokenizer} = load_model()
    input_pairs = 
     Bumblebee.apply_tokenizer(tokenizer, 
       Enum.map(documents, fn doc -> 
         {query, doc.content}
       end)
     )

    # Run cross-encoder in batches
    outputs = Axon.predict(model_info.model, model_info.params, input_pairs)
     

    # Combine scores with original documents and sort
    Enum.zip(documents, outputs.logits |> Nx.to_flat_list())
    |> Enum.sort_by(fn {_, score} -> score end, :desc)
    |> Enum.map(fn {doc, _} -> doc.content end)
    |> Enum.take(@first)
  end
end

Check reranking against Python

[TODO]

This model uses the architecture :for_sequence_classification; there is no such function yet coded in Bumblebee at the time of writting.

Build the context by re-ranking

# Load the model
RAG.CrossEncoder.load_model()

# Rerank the results
#query = "how to handle forms server-side?"
context = RAG.CrossEncoder.rerank(top_results, query)

Build the prompt

We define the prompt with a context and a question

defmodule RAG.PromptBuilder do
  def build_prompt(context, query) do
    context_text = Enum.join(context, "\n\n")
    """
    You are a proficient Elixir developer, with full knowledge of the framework Phoenix LiveView.
    You are given a context information below relevant to the query that is submitted to you.
    -----------------------
    #{context_text}
    -----------------------
    You answer to the query using in priority the context informations given above and you should cite it.
    The response should be in markdown format.

    Query: #{query}
    Answer:
    """
  end
end

LLM integration

Most of the LLM are paid solutions accesible via an endpoint. Very few models can be run locally. LLMs tends to be large.

We run the “codellama” model via the ollama plateform

LLama CLI

Install and start ollama server

We install ollama (see the repo) to install de “codellama” LLM.

We pull a model from the registry:

> ollama pull codellama

We start an LLM server:

> ollama serve

This gives us an interactive CLI and a REST API.

We can test this and send a POST request to generate a completion where we pass a json {"model": "codellama", "prompt": "...."}.

> curl http://localhost:11434/api/generate -d \
   '{"model": "codellama", "prompt": "how to handle forms with Phoenix Liveview?", "stream": false}'

We get a response back:

{
  "model":"codellama",
  "created_at":"2024-08-29T07:25:31.941263Z",
  "response":"\nTo handle forms in Phoenix LiveView, you can use the `Phoenix.LiveView.Form` module. This module provides a set of functions for creating and manipulating HTML form elements, as well as handling form data on the server.\n\nHere's an example of how to create a simple form using Phoenix LiveView:\n```\nimport Ecto.Changeset\n\n# Create a changeset for the form\nchangeset = Ecto.Changeset.change(%YourModel{}, %{})\n\n# Render the form in your template\n\u003cform phx-submit=\"save\"\u003e\n  \u003cdiv\u003e\n    \u003clabel for=\"name\"\u003eName:\u003c/label\u003e\n    \u003cinput type=\"text\" id=\"name\" name=\"name\" value={changeset.data[\"name\"]} /\u003e\n  \u003c/div\u003e\n\n  \u003cdiv\u003e\n    \u003clabel for=\"age\"\u003eAge:\u003c/label\u003e\n    \u003cinput type=\"number\" id=\"age\" name=\"age\" value={changeset.data[\"age\"]} /\u003e\n  \u003c/div\u003e\n\n  \u003cbutton type=\"submit\"\u003eSave\u003c/button\u003e\n\u003c/form\u003e\n```\nIn this example, we're creating a changeset for the form, which is used to validate and update the form data on the server. We then render the form in our template using the `phx-submit` attribute, which tells Phoenix to send the form data to the server when the form is submitted.\n\nWhen the form is submitted, Phoenix will automatically handle the form data and update the changeset with any validation errors or updates. You can then use the updated changeset to persist the data in your database.\n\nTo handle the form submission on the server, you can define a `save` function in your LiveView module that will be called when the form is submitted. This function will receive the updated changeset as an argument, and you can use it to update the data in your database or perform any other necessary actions.\n```\ndef save(changeset) do\n  # Validate the changeset and return an error if there are any validation errors\n  case Ecto.Changeset.apply_action(changeset, :update) do\n    {:ok, _model} -\u003e\n      # Update the data in your database or perform any other necessary actions\n      :ok\n\n    {:error, _changeset} -\u003e\n      # Render an error page if there were validation errors\n      render(:index, changeset: changeset)\n  end\nend\n```\nIn this example, we're using the `Ecto.Changeset` module to validate the form data and update the changeset with any validation errors or updates. If there are no validation errors, we can use the updated changeset to persist the data in our database or perform any other necessary actions. If there are validation errors, we render an error page with the updated changeset.\n\nOverall, using Phoenix LiveView forms provides a convenient and efficient way to handle form data on the server, while also providing a seamless user experience for your users.",
  "done":true,
  ...
}

We check that ollama is running:

lsof -i -P | grep LISTEN | grep 11434

Generate a response via the LLama REST API and Elixir

The Livebook runs a POST request with Req and pass a json to the :json key.

As per the documentation, it does Jason.encode_to_iodata(%{model: "codellama", "prompt": "..."}) and sets the adequate headers.

Note that we need to increase the socket timeout above the default 5000.

defmodule LLM do
  def generate_response(prompt) do
    json = %{stream: false, model: "codellama", prompt: prompt}

    res = 
      Req.post!(
        "http://localhost:11434/api/generate",
        json: json,
        receive_timeout: 120_000
      )
    
      case res do
        %{status: 200, body: body} -> 
          body["response"]
        _ ->
          IO.puts "error"
      end
  end
end
query
RAG.PromptBuilder.build_prompt(context, query)
|> LLM.generate_response()

Wrap up

Seed the database with external sources

guides  = [
  "https://api.github.com/repos/phoenixframework/phoenix_live_view/contents/guides/server",
  "https://api.github.com/repos/phoenixframework/phoenix_live_view/contents/guides/client",
  "https://api.github.com/repos/phoenixframework/phoenix_live_view/contents/guides/introduction"
]

RAG.ExternalSources.build(guides)
defmodule RAG do
  def process_query(query) do
    top_k = 10
    
    query
    |> RAG.SemanticSearch.search(top_k)
    # top_results
    |> RAG.CrossEncoder.rerank(query)
    # context
    |> tap(&amp;IO.puts/1)
    |> RAG.PromptBuilder.build_prompt(query)
    # prompt
    |> LLM.generate_response()
  end
end

query = "explain Javascript interoperability on the server-side"

RAG.process_query(query)

Dimension reduction & visualization

We will use the scholar librabry,

require Explorer.DataFrame, as: DF