Vector Search with LLM
Mix.install([
{:ollama, "~> 0.8"},
{:req, "~> 0.5"},
{:explorer, "~> 0.10.1"},
{:instructor, "~> 0.1.0"},
{:hnswlib, "~> 0.1.5"},
{:kino, "~> 0.14"}
])
Initialization
alias Explorer.DataFrame
alias Explorer.Series
require Explorer.DataFrame
client = Req.new(base_url: "https://models.arrakis.upmaru.network")
Prepare Dataset
df =
Kino.FS.file_path("movies.json")
|> DataFrame.from_ndjson!()
df = DataFrame.mutate(df,
release_date: Series.cast(release_date, :date)
)
df =
df
|> DataFrame.filter(year(release_date) >= 2020)
|> DataFrame.filter(not(adult))
|> DataFrame.filter(status == "Released")
|> DataFrame.filter(lengths(split(overview, " ")) > 5)
|> DataFrame.sort_by(desc: popularity)
LLM Generated Description
space = :l2
dimensions = 1024
max_elements = 100_000
{:ok, index} = HNSWLib.Index.new(space, dimensions, max_elements)
stream =
DataFrame.to_rows_stream(df, atom_keys: true, chunk_size: 500)
generate_llm_description = fn movie ->
genres = Enum.map(movie.genres, fn g -> g["name"] end)
production_companies = Enum.map(movie.production_companies, fn pc -> pc["name"] end)
collection = Map.get(movie.belongs_to_collection || %{}, "name")
text =
~s"""
Name: #{movie.title}
Genres: #{Enum.join(genres, ", ")}
Overview: #{movie.overview}
Collection: #{collection}
Production companies: #{Enum.join(production_companies, ", ")}
"""
prompt =
"""
Write an accurate description of the movie in a single paragraph, don't use quote marks or special characters in the response.
#{text}
"""
body = %{
model: "mistral-small-24b",
messages: [
%{role: "user", content: prompt}
],
n: 5
}
{:ok, response} = Req.post(client, url: "/v1/chat/completions", json: body)
%{body: %{"choices" => choices}} = response
%{"message" => %{"content" => description}} = List.first(choices)
Map.put(movie, :description, description)
end
movies =
stream
|> Enum.take(200)
|> Enum.map(generate_llm_description)
movies
|> Enum.map(fn m ->
%{
id: m.id,
title: m.title,
status: m.status,
release: m.release_date,
genres: Enum.map(m.genres, fn g -> g["name"] end),
description: m.description,
description_length: byte_size(m.description)
}
end)
|> Kino.DataTable.new(keys: [:id, :title, :description, :status, :release, :genres, :description_length])
Generate Embeddings and query using description
generate_embeddings_and_index = fn batch ->
movie_ids =
Enum.map(batch, fn movie -> movie.id end)
|> Nx.tensor()
movie_llm_descriptions =
Enum.map(batch, fn movie ->
movie.description
end)
body = %{
input: movie_llm_descriptions,
model: "multilingual-e5-large",
encoding_format: "float"
}
{:ok, %{body: response}} = Req.post(client, url: "/v1/embeddings", json: body, receive_timeout: 300_000)
%{"data" => embeddings, "usage" => usage_data} = response
embeddings =
Enum.sort_by(embeddings, fn e ->
e["index"]
end)
|> Enum.map(fn e -> e["embedding"] end)
|> Nx.tensor()
HNSWLib.Index.add_items(index, embeddings, ids: movie_ids)
%{movies: batch, usage: usage_data}
end
movies
|> Enum.chunk_every(100)
|> Enum.map(generate_embeddings_and_index)
|> Enum.flat_map(fn b -> b.movies end)
input = Kino.Input.textarea("Query")
original_query = Kino.Input.read(input)
query_body = %{
input: original_query,
model: "multilingual-e5-large",
encoding_format: "float"
}
{:ok, %{body: query}} = Req.post(client, url: "/v1/embeddings", json: query_body)
%{"data" => [%{"embedding" => embedding}]} = query
query = Nx.tensor(embedding)
{:ok, labels, dists} = HNSWLib.Index.knn_query(index, query, k: 15)
matching_id =
Nx.to_list(labels)
|> List.flatten()
matches = Enum.filter(movies, fn m -> m.id in matching_id end)
matches_description = Enum.map(matches, fn m -> m.description end)
body = %{
query: original_query,
documents: matches_description,
model: "bge-reranker-v2-m3"
}
{:ok, %{body: %{"results" => reranking_results}}} = Req.post(client, url: "/v1/rerank", json: body)
results = Enum.sort_by(reranking_results, fn result ->
result["index"]
end)
results =
matches
|> Enum.zip(results)
|> Enum.sort_by(fn {_movie, relevance} ->
relevance["relevance_score"]
end, :desc)
|> Enum.filter(fn {_movie, relevance} ->
relevance["relevance_score"] > 0.2
end)
results
|> Enum.map(fn {movie, relevance} ->
%{title: movie.title, description: movie.description, rating: movie.vote_average, popularity: movie.popularity, overview: movie.overview, release: movie.release_date, language: movie.original_language, score: relevance["relevance_score"]}
end)
|> Kino.DataTable.new(keys: [:title, :description, :overview, :rating, :release, :language, :popularity, :score])
Using LLM to generate setting summary data
In this section we’ll discover what else we can generate with LLMs. Our description maybe limited however we can extract more information out from our description to make the content for each movie even better.
defmodule MovieSetting do
use Ecto.Schema
use Instructor
@llm_doc """
Output of the movie setting based on a given description.
## Fields:
- description: A description about the setting of the movie in a single paragraph.
- reason: A brief description of the reason why the model thinks the movie took place in those locations.
- locations: Potential locations the movie takes place.
"""
@primary_key false
embedded_schema do
field :description, :string
field :reason, :string
field :locations, {:array, :string}
end
end
instructor_config = [
adapter: Instructor.Adapters.Llamacpp,
api_url: "https://models.arrakis.upmaru.network"
]
predict_movie_setting = fn movie ->
{:ok, setting} = Instructor.chat_completion([
model: "mistral-small-24b",
mode: :json_schema,
response_model: MovieSetting,
messages: [
%{
role: "user",
content: """
Make a best estimate based on the description below on the type of setting the movie is likely to take place, whether it takes place in space, forest, jungle, tundra, desert, in the ocean, or futuristic city?
#{movie.description}
"""
}
]
], instructor_config)
Map.put(movie, :setting, setting)
end
movies = Enum.map(movies, predict_movie_setting)
movies
|> Enum.map(fn m ->
%{
id: m.id,
title: m.title,
setting_locations: m.setting.locations,
setting_reason: m.setting.reason,
setting_description: m.setting.description,
release: m.release_date,
genres: Enum.map(m.genres, fn g -> g["name"] end),
description: m.description,
description_length: byte_size(m.description)
}
end)
|> Kino.DataTable.new(keys: [:id, :title, :setting_locations, :setting_reason, :setting_description, :description, :status, :release, :genres, :description_length])
Generate questions from descriptions
We can generate more questions based on the movie description and setting description.
defmodule MovieQuery do
use Ecto.Schema
use Instructor
@llm_doc """
Output of potential question about the movie based on the description.
## Fields:
- question: Potential question based on the description.
- reason: A brief description of the reason why the model thinks the question relates to the movie.
"""
@primary_key false
embedded_schema do
field :question, :string
field :reason, :string
end
end
predict_movie_queries = fn movie ->
queries = Instructor.chat_completion([
model: "mistral-small-24b",
mode: :json_schema,
response_model: {:array, MovieQuery},
messages: [
%{
role: "system",
content: "You are a curious mind that generates questions about movies. You will generate at least 3 questions about the movies."
},
%{
role: "user",
content: ~s"""
[description and setting description about the movie]
#{movie.description}
#{movie.setting.description}
"""
}
]
], instructor_config)
queries = Enum.map(queries, fn {:ok, query} -> query end)
Map.put(movie, :queries, queries)
end
movies = Enum.map(movies, predict_movie_queries)
movies
|> Enum.map(fn m ->
%{
id: m.id,
title: m.title,
questions: Enum.map(m.queries, fn q -> q.question end),
questions_count: Enum.count(m.queries),
description: m.description,
setting_description: m.setting.description,
setting_locations: m.setting.locations
}
end)
|> Kino.DataTable.new(keys: [:id, :title, :questions, :questions_count, :description, :setting_description, :setting_locations])
We’ll now build 2 new index using cosine and index the description and setting description into the indices.
space = :cosine
dimensions = 1024
max_elements = 50
{:ok, description_index} = HNSWLib.Index.new(space, dimensions, max_elements)
{:ok, setting_index} = HNSWLib.Index.new(space, dimensions, max_elements)
generate_description_embeddings_and_index = fn batch ->
movie_ids =
Enum.map(batch, fn movie -> movie.id end)
|> Nx.tensor()
movie_llm_descriptions =
Enum.map(batch, fn movie ->
movie.description
end)
body = %{
input: movie_llm_descriptions,
model: "multilingual-e5-large",
encoding_format: "float"
}
{:ok, %{body: response}} = Req.post(
client,
url: "/v1/embeddings",
json: body,
receive_timeout: 300_000
)
%{"data" => embeddings} = response
embeddings =
Enum.sort_by(embeddings, fn e ->
e["index"]
end)
|> Enum.map(fn e -> e["embedding"] end)
|> Nx.tensor()
HNSWLib.Index.add_items(description_index, embeddings, ids: movie_ids)
batch
end
generate_setting_embeddings_and_index = fn batch ->
movie_ids =
Enum.map(batch, fn movie -> movie.id end)
|> Nx.tensor()
movie_settings =
Enum.map(batch, fn movie ->
movie.setting.description
end)
body = %{
input: movie_settings,
model: "multilingual-e5-large",
encoding_format: "float"
}
{:ok, %{body: response}} = Req.post(
client,
url: "/v1/embeddings",
json: body, receive_timeout: 300_000
)
%{"data" => embeddings} = response
embeddings =
Enum.sort_by(embeddings, fn e ->
e["index"]
end)
|> Enum.map(fn e -> e["embedding"] end)
|> Nx.tensor()
HNSWLib.Index.add_items(setting_index, embeddings, ids: movie_ids)
batch
end
movies =
movies
|> Enum.chunk_every(10)
|> Enum.map(generate_description_embeddings_and_index)
|> Enum.map(generate_setting_embeddings_and_index)
|> List.flatten()
input = Kino.Input.textarea("Query")
We query both the indices
original_query = Kino.Input.read(input)
query_body = %{
input: original_query,
model: "multilingual-e5-large",
encoding_format: "float"
}
{:ok, %{body: query}} = Req.post(client, url: "/v1/embeddings", json: query_body)
%{"data" => [%{"embedding" => embedding}]} = query
query = Nx.tensor(embedding)
{:ok, description_matches, _desc_dists} = HNSWLib.Index.knn_query(description_index, query, k: 15)
{:ok, setting_matches, _desc_dists} = HNSWLib.Index.knn_query(setting_index, query, k: 15)
description_matching_ids =
Nx.to_list(description_matches)
|> List.flatten()
setting_matching_ids =
Nx.to_list(setting_matches)
|> List.flatten()
matches_on_description = Enum.filter(movies, fn m -> m.id in description_matching_ids end)
matches_on_setting = Enum.filter(movies, fn m -> m.id in setting_matching_ids end)
Run rerank on both the indicies and merge the result by averaging the 2 relevance scores.
matches_descriptions = Enum.map(matches_on_description, fn m -> m.description end)
matches_setting_descriptions = Enum.map(matches_on_setting, fn m -> m.setting.description end)
description_rerank_body = %{
query: original_query,
documents: matches_descriptions,
model: "bge-reranker-v2-m3"
}
setting_description_rerank_body = %{
query: original_query,
documents: matches_setting_descriptions,
model: "bge-reranker-v2-m3"
}
{:ok, %{body: %{"results" => description_reranking_results}}} = Req.post(client, url: "/v1/rerank", json: description_rerank_body)
{:ok, %{body: %{"results" => setting_description_reranking_results}}} = Req.post(client, url: "/v1/rerank", json: setting_description_rerank_body)
description_results =
Enum.sort_by(description_reranking_results, fn result ->
result["index"]
end)
setting_description_results =
Enum.sort_by(setting_description_reranking_results, fn result ->
result["index"]
end)
description_results =
matches_on_description
|> Enum.zip(description_results)
|> Enum.sort_by(fn {_movie, relevance} ->
relevance["relevance_score"]
end, :desc)
setting_description_results =
matches_on_setting
|> Enum.zip(setting_description_results)
|> Enum.sort_by(fn {_movie, relevance} ->
relevance["relevance_score"]
end, :desc)
combined_relevance =
description_results
|> Enum.concat(setting_description_results)
|> Enum.group_by(fn {m, _relevance} -> m.id end)
|> Enum.map(fn {id, movies} ->
{id, Enum.sum(Enum.map(movies, fn {_m, r} -> r["relevance_score"] end)) / 2}
end)
|> Enum.into(%{})
|> Enum.filter(fn {_, relevance} -> relevance > 0.05 end)
combined_movies =
matches_on_description
|> Enum.concat(matches_on_setting)
|> Enum.uniq_by(fn m -> m.id end)
final_result = Enum.reduce(combined_relevance, [], fn {id, score}, acc ->
movie = Enum.find(combined_movies, fn m -> m.id == id end)
movie = Map.put(movie, :relevance, score)
acc ++ [movie]
end)
|> Enum.sort_by(fn m -> m.relevance end, :desc)
|> Enum.map(fn movie ->
%{title: movie.title, queries: Enum.map(movie.queries, fn q -> q.question end), setting: movie.setting.description, description: movie.description, rating: movie.vote_average, popularity: movie.popularity, overview: movie.overview, release: movie.release_date, language: movie.original_language, score: movie.relevance}
end)
|> Kino.DataTable.new(keys: [:title, :queries,:setting, :description, :overview, :rating, :release, :language, :popularity, :score])
With Zero Shot Classification
We will use the LLM to classify our query and use that extra information to query the index in a better way.
input = Kino.Input.textarea("Query")
defmodule QueryClassification do
use Ecto.Schema
use Instructor
@llm_doc """
Query classification
## Fields:
- reply: Should the reply be list of results or a recommendation?
- about: What is the query about? Location, character or general knowledge?
- improved_question: An improved version of the query that will provide a better answer.
"""
@primary_key false
embedded_schema do
field :reply, Ecto.Enum, values: [:results, :recommendation]
field :about, Ecto.Enum, values: [:general, :character, :location]
field :improved_query, :string
end
end
original_query = Kino.Input.read(input)
{:ok, query_classification} = Instructor.chat_completion([
model: "mistral-small-24b",
mode: :json_schema,
response_model: QueryClassification,
messages: [
%{
role: "user",
content: """
Correctly identify what the query is about and what type of reply should be made.
Also provide a better quality question that will give better results.
#{original_query}
"""
}
]
], instructor_config)
index = case query_classification.about do
:location ->
setting_index
_ ->
description_index
end
query_body = %{
input: query_classification.improved_query,
model: "multilingual-e5-large",
encoding_format: "float"
}
{:ok, %{body: query}} = Req.post(client, url: "/v1/embeddings", json: query_body)
%{"data" => [%{"embedding" => embedding}]} = query
query = Nx.tensor(embedding)
{:ok, labels, distance} = HNSWLib.Index.knn_query(index, query, k: 15)
matching_id =
Nx.to_list(labels)
|> List.flatten()
matches = Enum.filter(movies, fn m -> m.id in matching_id end)
matched_text = Enum.map(matches, fn m ->
case query_classification.about do
:location ->
m.setting.description
_ ->
m.description
end
end)
description_rerank_body = %{
query: original_query,
documents: matched_text,
model: "bge-reranker-v2-m3"
}
{:ok, %{body: %{"results" => reranking_results}}} =
Req.post(client, url: "/v1/rerank", json: description_rerank_body)
results = Enum.sort_by(reranking_results, fn result ->
result["index"]
end)
results =
matches
|> Enum.zip(results)
|> Enum.sort_by(fn {_movie, relevance} ->
relevance["relevance_score"]
end, :desc)
|> Enum.filter(fn {_movie, relevance} ->
relevance["relevance_score"] > 0.1
end)
results
|> Enum.map(fn {movie, relevance} ->
%{title: movie.title, setting: movie.setting.description, description: movie.description, rating: movie.vote_average, popularity: movie.popularity, overview: movie.overview, release: movie.release_date, language: movie.original_language, score: relevance["relevance_score"]}
end)
|> Kino.DataTable.new(keys: [:title, :setting, :description, :overview, :rating, :release, :language, :popularity, :score])
Rag output
We should also be able to build a RAG output with our search result. We’ll build a custom KinoReply
to handle streaming response.
defmodule KinoReply do
use Kino.JS
use Kino.JS.Live
def new(starting_string \\ "") do
Kino.JS.Live.new(__MODULE__, starting_string)
end
def append(kino, string) do
Kino.JS.Live.cast(kino, {:append, string})
end
def finish(kino) do
Kino.JS.Live.cast(kino, :finish)
end
@impl true
def init(string, ctx) do
{:ok, assign(ctx, reply: string)}
end
@impl true
def handle_connect(ctx) do
{:ok, ctx.assigns.reply, ctx}
end
@impl true
def handle_cast({:append, string}, ctx) do
broadcast_event(ctx, "append", string)
{:noreply, assign(ctx, reply: "#{ctx.assigns.reply} #{string}")}
end
@impl true
def handle_cast(:finish, ctx) do
broadcast_event(ctx, "finish", nil)
{:noreply, ctx}
end
asset "main.js" do
"""
export function init(ctx, chunk) {
ctx.importJS("https://cdn.jsdelivr.net/npm/markdown-it@14.1.0/dist/markdown-it.min.js")
// ctx.importJS("https://cdn.jsdelivr.net/npm/marked@15.0.7/lib/marked.umd.min.js")
ctx.root.innerHTML = chunk;
const el = ctx.root;
el.classList.add("markdown");
el.classList.add("prose");
ctx.handleEvent("append", (newChunk) => {
const md = markdownit('commonmark')
ctx.root.innerHTML = md.renderInline(ctx.root.innerHTML.concat(newChunk));
})
ctx.handleEvent("finish", (_n) => {
const md = markdownit('commonmark')
ctx.root.innerHTML = md.render(ctx.root.innerHTML);
})
}
"""
end
end
We use the stream parser from here Open AI Streaming in Elixir Phoenix
defmodule StreamParser do
def parse(chunk) do
chunk
|> String.split("data: ")
|> Enum.map(&String.trim/1)
|> Enum.map(&decode/1)
|> Enum.reject(&is_nil/1)
end
defp decode(""), do: nil
defp decode("[DONE]"), do: nil
defp decode(data), do: Jason.decode!(data)
end
Let’s generate the RAG output.
results_text = Enum.map(results, fn {m, _} ->
~s"""
Title: #{m.title}
Description: #{m.description}
Setting: #{m.setting.description}
Rating: #{m.vote_average}
"""
end)
|> Enum.join("\n")
prompt =
~s"""
The users query: #{original_query}
We found #{Enum.count(results)} results matching the user's requirement.
#{results_text}
Please respond to the user with the data of the movie and explain the reasoning why they match the query given.
Make sure the output complies with commonmark.
"""
frame = Kino.Frame.new()
md = Kino.Markdown.new("**You**: #{original_query}")
Kino.Frame.append(frame, md)
Kino.render(frame)
reply = KinoReply.new("**MovieBOT**: ")
Kino.Frame.append(frame, reply)
body = %{
model: "mistral-small-24b",
messages: [
%{role: "user", content: prompt}
],
stream: true
}
{:ok, response} =
Req.post(client, url: "/v1/chat/completions", json: body, into: fn {:data, data}, context ->
case StreamParser.parse(data) do
[%{"choices" => [%{"delta" => %{"content" => chunk}}]}] ->
chunk = Regex.replace(~r/\n\n/, chunk, "
")
chunk = Regex.replace(~r/\n/, chunk, "
")
KinoReply.append(reply, chunk)
{:cont, context}
_ ->
KinoReply.finish(reply)
{:halt, context}
end
end)
:ok