RAG with PgVector and ChatGPT
Mix.install(
[
{:postgrex, ">= 0.0.0"},
{:ecto_sql, "~> 3.11"},
{:pgvector, "~> 0.2.1"},
{:kino, "~> 0.12.3"},
{:nx, "~> 0.7.1"},
{:exla, "~> 0.7.1"},
{:bumblebee, "~> 0.5.3"},
{:readability, "~> 0.12.1"},
{:ex_openai, "~> 1.5"}
],
config: [
demo: [
{:ecto_repos, [Demo.Repo]},
{Demo.Repo, [types: Demo.PostgrexTypes]}
],
ex_openai: [api_key: System.fetch_env!("LB_OPENAI_API_KEY")]
]
)
Nx.global_default_backend(EXLA.Backend)
DataBase Setup
Postgrex.Types.define(
Demo.PostgrexTypes,
[Pgvector.Extensions.Vector] ++ Ecto.Adapters.Postgres.extensions(),
[]
)
defmodule Demo.Repo do
use Ecto.Repo, adapter: Ecto.Adapters.Postgres, otp_app: :demo, types: Demo.PostgrexTypes
end
url = "postgres://postgres:postgres@localhost:5433/rag_demo?sslmode=disable"
Kino.start_child!({Demo.Repo, url: url})
defmodule Demo.Migrations.CreateVectorExtension do
use Ecto.Migration
def up do
execute("CREATE EXTENSION IF NOT EXISTS vector")
end
def down do
execute("DROP EXTENSION vector")
end
end
defmodule Demo.Migrations.CreateDocument do
use Ecto.Migration
def change do
create table(:documents) do
add(:text, :text)
add(:url, :string)
add(:embedding, :vector, size: 384)
end
end
end
migrations = [{0, Demo.Migrations.CreateVectorExtension}, {1, Demo.Migrations.CreateDocument}]
Ecto.Migrator.run(Demo.Repo, migrations, :down, all: true)
Ecto.Migrator.run(Demo.Repo, migrations, :up, all: true)
defmodule Demo.Document do
use Ecto.Schema
import Ecto.Changeset
schema "documents" do
field(:url, :string)
field(:text, :string)
field(:embedding, Pgvector.Ecto.Vector)
end
def changeset(attrs) do
cast(%__MODULE__{}, attrs, [:url, :text, :embedding])
end
end
{:ok, model_info} = Bumblebee.load_model({:hf, "sentence-transformers/all-MiniLM-L6-v2"})
{:ok, tokenizer} = Bumblebee.load_tokenizer({:hf, "sentence-transformers/all-MiniLM-L6-v2"})
serving =
Bumblebee.Text.TextEmbedding.text_embedding(model_info, tokenizer,
output_pool: :mean_pooling,
output_attribute: :hidden_state,
embedding_processor: :l2_norm
)
Embedding
urls = [
"https://hexdocs.pm/livebook/readme.html",
"https://hexdocs.pm/ecto/Ecto.html",
"https://hexdocs.pm/phoenix/overview.html",
"https://hexdocs.pm/broadway/introduction.html",
"https://hexdocs.pm/elixir/Stream.html",
"https://hexdocs.pm/elixir/1.16.2/Kernel.html",
"https://hexdocs.pm/pgvector/readme.html",
"https://hexdocs.pm/gen_stage/GenStage.html"
]
docs =
urls
|> Enum.map(fn url -> %{url: url, text: Readability.summarize(url).article_text} end)
|> Enum.map(fn d ->
%{embedding: embedding} = Nx.Serving.run(serving, d.text)
Map.put(d, :embedding, Nx.to_list(embedding))
end)
|> dbg
docs
|> Enum.map(&Demo.Document.changeset/1)
|> Enum.map(&Demo.Repo.insert!/1)
Inference Function
# improve the query with
# Only answer on based on the context provided to you, even if the query is
# elixir related but not mentioned in the above context, say you don't have enough information.
require Logger
alias ExOpenAI.Components.ChatCompletionRequestSystemMessage
alias ExOpenAI.Components.ChatCompletionRequestUserMessage
system_message = fn context, url ->
"""
You are an advanced virtual assistant designed to assist Elixir developers to
find answer to their question in the hex documents.
you operate based on the context provided by the system. The context is following:
source: #{url}
context: #{context}
Utilizes the information provided to above to generate relevant and helpful responses.
Ensure that your responses are informative, concise, and tailored to the context given by the user. Whether it's explaining concepts, offering suggestions, or providing explanations, you are here to help based on the information it receives.
At the end of response, always include the source url if and only if you have some Elixir related answer to say
If you don't know the answer, just say that you don't know
Only answer on the based on the context provided to you, do not answer more than that.
BTW DO NOT HALLUCINATE!
)
"""
end
inference_fn = fn query, context, url ->
messages = [
%ChatCompletionRequestSystemMessage{
role: :system,
content: system_message.(context, url)
},
%ChatCompletionRequestUserMessage{role: :user, content: query}
]
{:ok, response} = ExOpenAI.Chat.create_chat_completion(messages, "gpt-3.5-turbo-0125")
hd(response.choices).message.content
end
# dsdf
Query without RAG
inputs = [
input: Kino.Input.text("Query(without RAG)", default: "How do deploy livebook")
]
form = Kino.Control.form(inputs, submit: "Check")
frame = Kino.Frame.new()
Kino.render(form)
Kino.listen(form, fn %{data: %{input: input}} ->
response = inference_fn.(input, "", "")
content =
Kino.Markdown.new("""
----
**query**:
> #{input}
**response**:
#{response}
-----
""")
Kino.Frame.append(frame, content)
end)
# d
frame
Query with RAG
import Ecto.Query
import Pgvector.Ecto.Query
require Logger
inputs_with_rag = [
input_with_rag: Kino.Input.text("Query with RAG", default: "How do deploy livebook")
]
form_with_rag = Kino.Control.form(inputs_with_rag, submit: "Check")
frame_with_rag = Kino.Frame.new()
Kino.render(form_with_rag)
Kino.listen(form_with_rag, fn %{data: %{input_with_rag: input}} ->
%{embedding: query_embedding} = Nx.Serving.run(serving, input)
results =
Demo.Repo.all(
from(i in Demo.Document, order_by: l2_distance(i.embedding, ^query_embedding), limit: 1)
)
context = results |> Enum.map(& &1.text) |> Enum.join("\n")
urls = results |> Enum.map(& &1.url) |> Enum.join("\n")
Logger.info("Based on query `#{input}`, found the following resources #{urls}")
response = inference_fn.(input, context, urls)
content =
Kino.Markdown.new("""
----
**query**:
> #{input}
**response**:
#{response}
-----
""")
Kino.Frame.append(frame_with_rag, content)
end)
frame_with_rag
Query with RAG + MultiQuery
require Logger
expanded_queries_fn = fn query ->
alias ExOpenAI.Components.ChatCompletionRequestSystemMessage
alias ExOpenAI.Components.ChatCompletionRequestUserMessage
system_message =
"""
As a knowledgeable assistant in elixir development, you aim to aid users seeking
information on elixir libararies. Provide five concise, standalone questions related
to the original inquiry to guide users effectively. Each question should cover
a different aspect of the topic, ensuring they are complete and directly related
to the initial query.
Format each question on a separate line and Do not number the questions.
"""
messages = [
%ChatCompletionRequestSystemMessage{
role: :system,
content: system_message
},
%ChatCompletionRequestUserMessage{
role: :user,
content: query
}
]
{:ok, response} = ExOpenAI.Chat.create_chat_completion(messages, "gpt-3.5-turbo-0125")
expanded_queries_str = hd(response.choices).message.content
Logger.info("expanded queries: #{expanded_queries_str}")
String.split(expanded_queries_str, "\n")
end
inputs_with_rag2 = [
input_with_rag:
Kino.Input.text("Query with RAG + MultiQuery",
default: "What kind of transformations can be achieved with Elixir"
)
]
form_with_rag2 = Kino.Control.form(inputs_with_rag2, submit: "Check")
frame_with_rag2 = Kino.Frame.new()
Kino.render(form_with_rag2)
Kino.listen(form_with_rag2, fn %{data: %{input_with_rag: input}} ->
results =
[input | expanded_queries_fn.(input)]
|> Stream.map(fn q ->
%{embedding: query_embedding} = Nx.Serving.run(serving, q)
query_embedding
end)
|> Stream.map(fn query_embedding ->
Demo.Repo.all(
from(i in Demo.Document, order_by: l2_distance(i.embedding, ^query_embedding), limit: 1)
)
end)
|> Enum.to_list()
|> List.flatten()
|> Enum.uniq()
context = results |> Enum.map(& &1.text) |> Enum.join("\n")
urls = results |> Enum.map(& &1.url) |> Enum.join("\n")
Logger.info("Based on query `#{input}`, found the following resources #{urls}")
response = inference_fn.(input, context, urls)
content =
Kino.Markdown.new("""
----
**query**:
> #{input}
**response**:
#{response}
-----
""")
Kino.Frame.append(frame_with_rag2, content)
end)
frame_with_rag2
alias ExOpenAI.Components.ChatCompletionRequestSystemMessage
alias ExOpenAI.Components.ChatCompletionRequestUserMessage
system_message =
"""
As a knowledgeable assistant in elixir development, you aim to aid users seeking
information on elixir libararies. Provide five concise, standalone questions related
to the original inquiry to guide users effectively. Each question should cover
a different aspect of the topic, ensuring they are complete and directly related
to the initial query.
Format each question on a separate line and Do not number the questions.
"""
messages = [
%ChatCompletionRequestSystemMessage{
role: :system,
content: system_message
},
%ChatCompletionRequestUserMessage{
role: :user,
content: "What kind of transformations can be achieved with Elixir"
}
]
{:ok, response} = ExOpenAI.Chat.create_chat_completion(messages, "gpt-3.5-turbo-0125")
hd(response.choices).message.content |> IO.puts()
Query with RAG + RAG FUSION(Re-Ranking)
# cross-encoder code snippet from https://github.com/elixir-nx/bumblebee/issues/251#issuecomment-1729359828
{:ok, model_info_reranker} = Bumblebee.load_model({:hf, "cross-encoder/ms-marco-MiniLM-L-6-v2"})
{:ok, tokenizer_reranker} = Bumblebee.load_tokenizer({:hf, "bert-base-uncased"})
log_reranking_fn = fn reranked_result ->
report =
Enum.map(reranked_result, fn {rank, doc} -> {rank, doc.url} end)
Logger.info("after re-ranking #{inspect(report)}")
end
rerank_fn = fn docs, query ->
args = Enum.map(docs, fn doc -> {query, doc.text} end)
inputs =
Bumblebee.apply_tokenizer(tokenizer_reranker, args)
outputs = Axon.predict(model_info_reranker.model, model_info_reranker.params, inputs)
outputs.logits
|> Nx.to_list()
|> Enum.zip(docs)
|> Enum.sort_by(fn {rank, _doc} -> rank end, :desc)
|> tap(log_reranking_fn)
|> Enum.map(fn {_rank, doc} -> doc end)
end
inputs_with_rag3 = [
input_with_rag:
Kino.Input.text("Query with RAG + RAG FUSION(Re-Ranking)",
default: "What kind of transformations can be achieved with Elixir"
)
]
form_with_rag3 = Kino.Control.form(inputs_with_rag3, submit: "Check")
frame_with_rag3 = Kino.Frame.new()
Kino.render(form_with_rag3)
Kino.listen(form_with_rag3, fn %{data: %{input_with_rag: input}} ->
results =
[input | expanded_queries_fn.(input)]
|> Stream.map(fn q ->
%{embedding: query_embedding} = Nx.Serving.run(serving, q)
query_embedding
end)
|> Stream.map(fn query_embedding ->
Demo.Repo.all(
from(i in Demo.Document, order_by: l2_distance(i.embedding, ^query_embedding), limit: 1)
)
end)
|> Enum.to_list()
|> List.flatten()
|> Enum.uniq()
|> rerank_fn.(input)
context = results |> Enum.map(& &1.text) |> Enum.join("\n")
urls = results |> Enum.map(& &1.url) |> Enum.join("\n")
Logger.info("Based on query `#{input}`, found the following resources #{urls}")
response = inference_fn.(input, context, urls)
content =
Kino.Markdown.new("""
----
**query**:
> #{input}
**response**:
#{response}
-----
""")
Kino.Frame.append(frame_with_rag3, content)
end)
frame_with_rag3