Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

RAG with PgVector and ChatGPT

rag-with-pgvector-and-chatgpt.livemd

RAG with PgVector and ChatGPT

Mix.install(
  [
    {:postgrex, ">= 0.0.0"},
    {:ecto_sql, "~> 3.11"},
    {:pgvector, "~> 0.2.1"},
    {:kino, "~> 0.12.3"},
    {:nx, "~> 0.7.1"},
    {:exla, "~> 0.7.1"},
    {:bumblebee, "~> 0.5.3"},
    {:readability, "~> 0.12.1"},
    {:ex_openai, "~> 1.5"}
  ],
  config: [
    demo: [
      {:ecto_repos, [Demo.Repo]},
      {Demo.Repo, [types: Demo.PostgrexTypes]}
    ],
    ex_openai: [api_key: System.fetch_env!("LB_OPENAI_API_KEY")]
  ]
)

Nx.global_default_backend(EXLA.Backend)

DataBase Setup

Postgrex.Types.define(
  Demo.PostgrexTypes,
  [Pgvector.Extensions.Vector] ++ Ecto.Adapters.Postgres.extensions(),
  []
)

defmodule Demo.Repo do
  use Ecto.Repo, adapter: Ecto.Adapters.Postgres, otp_app: :demo, types: Demo.PostgrexTypes
end

url = "postgres://postgres:postgres@localhost:5433/rag_demo?sslmode=disable"

Kino.start_child!({Demo.Repo, url: url})

defmodule Demo.Migrations.CreateVectorExtension do
  use Ecto.Migration

  def up do
    execute("CREATE EXTENSION IF NOT EXISTS vector")
  end

  def down do
    execute("DROP EXTENSION vector")
  end
end

defmodule Demo.Migrations.CreateDocument do
  use Ecto.Migration

  def change do
    create table(:documents) do
      add(:text, :text)
      add(:url, :string)
      add(:embedding, :vector, size: 384)
    end
  end
end

migrations = [{0, Demo.Migrations.CreateVectorExtension}, {1, Demo.Migrations.CreateDocument}]

Ecto.Migrator.run(Demo.Repo, migrations, :down, all: true)
Ecto.Migrator.run(Demo.Repo, migrations, :up, all: true)
defmodule Demo.Document do
  use Ecto.Schema
  import Ecto.Changeset

  schema "documents" do
    field(:url, :string)
    field(:text, :string)
    field(:embedding, Pgvector.Ecto.Vector)
  end

  def changeset(attrs) do
    cast(%__MODULE__{}, attrs, [:url, :text, :embedding])
  end
end
{:ok, model_info} = Bumblebee.load_model({:hf, "sentence-transformers/all-MiniLM-L6-v2"})
{:ok, tokenizer} = Bumblebee.load_tokenizer({:hf, "sentence-transformers/all-MiniLM-L6-v2"})

serving =
  Bumblebee.Text.TextEmbedding.text_embedding(model_info, tokenizer,
    output_pool: :mean_pooling,
    output_attribute: :hidden_state,
    embedding_processor: :l2_norm
  )

Embedding

urls = [
  "https://hexdocs.pm/livebook/readme.html",
  "https://hexdocs.pm/ecto/Ecto.html",
  "https://hexdocs.pm/phoenix/overview.html",
  "https://hexdocs.pm/broadway/introduction.html",
  "https://hexdocs.pm/elixir/Stream.html",
  "https://hexdocs.pm/elixir/1.16.2/Kernel.html",
  "https://hexdocs.pm/pgvector/readme.html",
  "https://hexdocs.pm/gen_stage/GenStage.html"
]

docs =
  urls
  |> Enum.map(fn url -> %{url: url, text: Readability.summarize(url).article_text} end)
  |> Enum.map(fn d ->
    %{embedding: embedding} = Nx.Serving.run(serving, d.text)
    Map.put(d, :embedding, Nx.to_list(embedding))
  end)
  |> dbg
docs
|> Enum.map(&Demo.Document.changeset/1)
|> Enum.map(&Demo.Repo.insert!/1)

Inference Function

# improve the query with 
#      Only answer on based on the context provided to you, even if the query is
#      elixir related but not mentioned in the above context, say you don't have enough information.

require Logger
alias ExOpenAI.Components.ChatCompletionRequestSystemMessage
alias ExOpenAI.Components.ChatCompletionRequestUserMessage

system_message = fn context, url ->
  """
      You are an advanced virtual assistant designed to assist Elixir developers to
      find answer to their question in the hex documents.
      you operate based on the context provided by the system. The context is following:
      
      source: #{url}
      context: #{context}
      
      Utilizes the information provided to above to generate relevant and helpful responses.
      
      Ensure that your responses are informative, concise, and tailored to the context given by the user. Whether it's explaining concepts, offering suggestions, or providing explanations, you are here to help based on the information it receives.
      
      At the end of response, always include the source url if and only if you have some Elixir related answer to say

      If you don't know the answer, just say that you don't know

      Only answer on the based on the context provided to you, do not answer more than that.
      BTW DO NOT HALLUCINATE!
      )
  """
end

inference_fn = fn query, context, url ->
  messages = [
    %ChatCompletionRequestSystemMessage{
      role: :system,
      content: system_message.(context, url)
    },
    %ChatCompletionRequestUserMessage{role: :user, content: query}
  ]

  {:ok, response} = ExOpenAI.Chat.create_chat_completion(messages, "gpt-3.5-turbo-0125")
  hd(response.choices).message.content
end

# dsdf

Query without RAG

inputs = [
  input: Kino.Input.text("Query(without RAG)", default: "How do deploy livebook")
]

form = Kino.Control.form(inputs, submit: "Check")

frame = Kino.Frame.new()

Kino.render(form)

Kino.listen(form, fn %{data: %{input: input}} ->
  response = inference_fn.(input, "", "")

  content =
    Kino.Markdown.new("""
    ----

    **query**: 
    > #{input}

    **response**:
    
#{response}
----- """
) Kino.Frame.append(frame, content) end) # d frame

Query with RAG

import Ecto.Query
import Pgvector.Ecto.Query
require Logger

inputs_with_rag = [
  input_with_rag: Kino.Input.text("Query with RAG", default: "How do deploy livebook")
]

form_with_rag = Kino.Control.form(inputs_with_rag, submit: "Check")

frame_with_rag = Kino.Frame.new()

Kino.render(form_with_rag)

Kino.listen(form_with_rag, fn %{data: %{input_with_rag: input}} ->
  %{embedding: query_embedding} = Nx.Serving.run(serving, input)

  results =
    Demo.Repo.all(
      from(i in Demo.Document, order_by: l2_distance(i.embedding, ^query_embedding), limit: 1)
    )

  context = results |> Enum.map(& &1.text) |> Enum.join("\n")
  urls = results |> Enum.map(& &1.url) |> Enum.join("\n")

  Logger.info("Based on query `#{input}`, found the following resources #{urls}")
  response = inference_fn.(input, context, urls)

  content =
    Kino.Markdown.new("""
    ----

    **query**: 
    > #{input}

    **response**:
    
#{response}
----- """
) Kino.Frame.append(frame_with_rag, content) end) frame_with_rag

Query with RAG + MultiQuery

require Logger

expanded_queries_fn = fn query ->
  alias ExOpenAI.Components.ChatCompletionRequestSystemMessage
  alias ExOpenAI.Components.ChatCompletionRequestUserMessage

  system_message =
    """
        As a knowledgeable assistant in elixir development, you aim to aid users seeking 
        information on elixir libararies. Provide five concise, standalone questions related
        to the original inquiry to guide users effectively. Each question should cover 
        a different aspect of the topic, ensuring they are complete and directly related 
        to the initial query.
        
        Format each question on a separate line and Do not number the questions.
    """

  messages = [
    %ChatCompletionRequestSystemMessage{
      role: :system,
      content: system_message
    },
    %ChatCompletionRequestUserMessage{
      role: :user,
      content: query
    }
  ]

  {:ok, response} = ExOpenAI.Chat.create_chat_completion(messages, "gpt-3.5-turbo-0125")
  expanded_queries_str = hd(response.choices).message.content
  Logger.info("expanded queries: #{expanded_queries_str}")

  String.split(expanded_queries_str, "\n")
end

inputs_with_rag2 = [
  input_with_rag:
    Kino.Input.text("Query with RAG + MultiQuery",
      default: "What kind of transformations can be achieved with Elixir"
    )
]

form_with_rag2 = Kino.Control.form(inputs_with_rag2, submit: "Check")

frame_with_rag2 = Kino.Frame.new()

Kino.render(form_with_rag2)

Kino.listen(form_with_rag2, fn %{data: %{input_with_rag: input}} ->
  results =
    [input | expanded_queries_fn.(input)]
    |> Stream.map(fn q ->
      %{embedding: query_embedding} = Nx.Serving.run(serving, q)
      query_embedding
    end)
    |> Stream.map(fn query_embedding ->
      Demo.Repo.all(
        from(i in Demo.Document, order_by: l2_distance(i.embedding, ^query_embedding), limit: 1)
      )
    end)
    |> Enum.to_list()
    |> List.flatten()
    |> Enum.uniq()

  context = results |> Enum.map(& &1.text) |> Enum.join("\n")
  urls = results |> Enum.map(& &1.url) |> Enum.join("\n")

  Logger.info("Based on query `#{input}`, found the following resources #{urls}")
  response = inference_fn.(input, context, urls)

  content =
    Kino.Markdown.new("""
    ----

    **query**: 
    > #{input}

    **response**:
    
#{response}
----- """
) Kino.Frame.append(frame_with_rag2, content) end) frame_with_rag2
alias ExOpenAI.Components.ChatCompletionRequestSystemMessage
alias ExOpenAI.Components.ChatCompletionRequestUserMessage

system_message =
  """
      As a knowledgeable assistant in elixir development, you aim to aid users seeking 
      information on elixir libararies. Provide five concise, standalone questions related
      to the original inquiry to guide users effectively. Each question should cover 
      a different aspect of the topic, ensuring they are complete and directly related 
      to the initial query.
      
      Format each question on a separate line and Do not number the questions.
  """

messages = [
  %ChatCompletionRequestSystemMessage{
    role: :system,
    content: system_message
  },
  %ChatCompletionRequestUserMessage{
    role: :user,
    content: "What kind of transformations can be achieved with Elixir"
  }
]

{:ok, response} = ExOpenAI.Chat.create_chat_completion(messages, "gpt-3.5-turbo-0125")
hd(response.choices).message.content |> IO.puts()

Query with RAG + RAG FUSION(Re-Ranking)

# cross-encoder code snippet from https://github.com/elixir-nx/bumblebee/issues/251#issuecomment-1729359828
{:ok, model_info_reranker} = Bumblebee.load_model({:hf, "cross-encoder/ms-marco-MiniLM-L-6-v2"})
{:ok, tokenizer_reranker} = Bumblebee.load_tokenizer({:hf, "bert-base-uncased"})

log_reranking_fn = fn reranked_result ->
  report =
    Enum.map(reranked_result, fn {rank, doc} -> {rank, doc.url} end)

  Logger.info("after re-ranking #{inspect(report)}")
end

rerank_fn = fn docs, query ->
  args = Enum.map(docs, fn doc -> {query, doc.text} end)

  inputs =
    Bumblebee.apply_tokenizer(tokenizer_reranker, args)

  outputs = Axon.predict(model_info_reranker.model, model_info_reranker.params, inputs)

  outputs.logits
  |> Nx.to_list()
  |> Enum.zip(docs)
  |> Enum.sort_by(fn {rank, _doc} -> rank end, :desc)
  |> tap(log_reranking_fn)
  |> Enum.map(fn {_rank, doc} -> doc end)
end

inputs_with_rag3 = [
  input_with_rag:
    Kino.Input.text("Query with RAG + RAG FUSION(Re-Ranking)",
      default: "What kind of transformations can be achieved with Elixir"
    )
]

form_with_rag3 = Kino.Control.form(inputs_with_rag3, submit: "Check")

frame_with_rag3 = Kino.Frame.new()

Kino.render(form_with_rag3)

Kino.listen(form_with_rag3, fn %{data: %{input_with_rag: input}} ->
  results =
    [input | expanded_queries_fn.(input)]
    |> Stream.map(fn q ->
      %{embedding: query_embedding} = Nx.Serving.run(serving, q)
      query_embedding
    end)
    |> Stream.map(fn query_embedding ->
      Demo.Repo.all(
        from(i in Demo.Document, order_by: l2_distance(i.embedding, ^query_embedding), limit: 1)
      )
    end)
    |> Enum.to_list()
    |> List.flatten()
    |> Enum.uniq()
    |> rerank_fn.(input)

  context = results |> Enum.map(& &1.text) |> Enum.join("\n")
  urls = results |> Enum.map(& &1.url) |> Enum.join("\n")

  Logger.info("Based on query `#{input}`, found the following resources #{urls}")
  response = inference_fn.(input, context, urls)

  content =
    Kino.Markdown.new("""
    ----

    **query**: 
    > #{input}

    **response**:
    
#{response}
----- """
) Kino.Frame.append(frame_with_rag3, content) end) frame_with_rag3