Vector Search
Mix.install([
{:req, "~> 0.5"},
{:explorer, "~> 0.10.1"},
{:hnswlib, "~> 0.1.5"},
{:kino, "~> 0.14"},
{:elixir_make, "~> 0.9.0", override: true}
])
Initialization
alias Explorer.DataFrame
alias Explorer.Series
require Explorer.DataFrame
We use the :explorer
library to parse our json data. For our experiment we will query it for movies released within or after 2024. We can change the filter later if we want to experiment with a larger dataset.
Prepare dataset
client = Req.new(base_url: "https://podman-ml.bombay-scylla.ts.net")
df =
Kino.FS.file_path("movies.json")
|> DataFrame.from_ndjson!()
df = DataFrame.mutate(df,
release_date: Series.cast(release_date, :date)
)
df =
df
|> DataFrame.filter(year(release_date) >= 2020)
|> DataFrame.filter(not(adult))
|> DataFrame.filter(status == "Released")
|> DataFrame.filter(lengths(split(overview, " ")) > 5)
|> DataFrame.sort_by(desc: popularity)
Movie overview with L2
Instantiate a new index
We will need to create an index so we can store the embeddings. We will be using the :hnswlib
to do this.
space = :l2 # :cosine, :ip
dimensions = 1024
max_elements = 100_000
{:ok, index} = HNSWLib.Index.new(space, dimensions, max_elements)
mode = Kino.Input.select("Mode", [index: "index", load: "load"])
We’ll pick a 10000
samples to generate embeddings, we can always add more later. We’ll extract the overviews of the movie records and call our embedding model.
# some_text = "hi there!"
# demo_body = %{
# input: some_text,
# model: "intfloat/multilingual-e5-large",
# encoding_format: "float"
# }
# Req.post(client, url: "/v1/embeddings", json: demo_body, receive_timeout: 300_000)
require Logger
generate_embeddings_and_index = fn batch ->
movie_ids =
Enum.map(batch, fn movie -> movie.id end)
|> Nx.tensor()
movie_overviews =
Enum.map(batch, fn movie ->
movie.overview
end)
body = %{
input: movie_overviews,
model: "intfloat/multilingual-e5-large",
encoding_format: "float"
}
# headers = [{"authorization", "Bearer token-abc123"}]
{:ok, %{body: response}} = Req.post(client, url: "/v1/embeddings", json: body, receive_timeout: 300_000)
%{"data" => embeddings, "usage" => usage_data} = response
embeddings =
Enum.sort_by(embeddings, fn e ->
e["index"]
end)
|> Enum.map(fn e -> e["embedding"] end)
|> Nx.tensor()
HNSWLib.Index.add_items(index, embeddings, ids: movie_ids)
%{movies: batch, usage: usage_data}
end
stream =
DataFrame.to_rows_stream(df, atom_keys: true, chunk_size: 500)
path = "/root/notebooks/files/overview_only_#{space}.bin"
movies =
if Kino.Input.read(mode) == :index do
batches =
stream
|> Stream.take(100_000)
|> Stream.chunk_every(1000)
|> Enum.map(generate_embeddings_and_index)
usages = Enum.map(batches, fn batch -> batch.usage end)
tokens = Enum.reduce(usages, 0, fn usage, acc ->
usage["total_tokens"] + acc
end)
Logger.info("Tokens: #{tokens}")
{:ok, count} = HNSWLib.Index.get_current_count(index)
Logger.info("Count: #{count}")
HNSWLib.Index.save_index(index, path)
Enum.flat_map(batches, fn batch -> batch.movies end)
else
Enum.take(stream, 100_000)
end
Querying the index
This is where we query the index. First we need to generate an embedding of our query, and do a nearest neighbor search.
{:ok, index} = HNSWLib.Index.load_index(space, dimensions, path)
input = Kino.Input.textarea("Query")
original_query = Kino.Input.read(input)
query_body = %{
input: original_query,
model: "intfloat/multilingual-e5-large",
encoding_format: "float"
}
{:ok, %{body: query}} = Req.post(client, url: "/v1/embeddings", json: query_body)
%{"data" => [%{"embedding" => embedding}]} = query
query = Nx.tensor(embedding)
{:ok, labels, dists} = HNSWLib.Index.knn_query(index, query, k: 15)
IO.inspect(labels)
IO.inspect(dists)
matching_id =
Nx.to_list(labels)
|> List.flatten()
matches = Enum.filter(movies, fn m -> m.id in matching_id end)
matches
|> Enum.map(fn movie ->
%{title: movie.title, rating: movie.vote_average, popularity: movie.popularity, overview: movie.overview, release: movie.release_date, language: movie.original_language}
end)
|> Kino.DataTable.new(keys: [:title, :overview, :rating, :release, :language, :popularity])
matches_overviews = Enum.map(matches, fn m -> m.overview end)
body = %{
query: original_query,
documents: matches_overviews,
model: "BAAI/bge-reranker-v2-m3"
}
{:ok, %{body: %{"results" => reranking_results}}} = Req.post(client, url: "/v1/rerank", json: body)
results = Enum.sort_by(reranking_results, fn result ->
result["index"]
end)
results =
matches
|> Enum.zip(results)
|> Enum.sort_by(fn {_movie, relevance} ->
relevance["relevance_score"]
end, :desc)
# |> Enum.filter(fn {_movie, relevance} ->
# relevance["relevance_score"] > 0.2
# end)
results
|> Enum.map(fn {movie, relevance} ->
%{title: movie.title, rating: movie.vote_average, popularity: movie.popularity, overview: movie.overview, release: movie.release_date, language: movie.original_language, score: relevance["relevance_score"]}
end)
|> Kino.DataTable.new(keys: [:title, :overview, :rating, :release, :language, :popularity, :score])
Text corpus with L2
space = :l2
dimensions = 1024
max_elements = 200_000
{:ok, index} = HNSWLib.Index.new(space, dimensions, max_elements)
mode = Kino.Input.select("Mode", [index: "index", load: "load"])
require Logger
corpus_template = fn movie ->
genres = Enum.map(movie.genres, fn g -> g["name"] end)
production_companies = Enum.map(movie.production_companies, fn pc -> pc["name"] end)
collection = Map.get(movie.belongs_to_collection || %{}, "name")
~s"""
Name: #{movie.title}
Genres: #{Enum.join(genres, ", ")}
Overview: #{movie.overview}
Collection: #{collection}
Production companies: #{Enum.join(production_companies, ", ")}
"""
end
generate_embeddings_and_index = fn batch ->
movie_ids =
Enum.map(batch, fn movie -> movie.id end)
|> Nx.tensor()
movie_corpus = Enum.map(batch, corpus_template)
movies_with_corpus =
batch
|> Enum.with_index()
|> Enum.map(fn {movie, idx} ->
corpus = Enum.at(movie_corpus, idx)
Map.put(movie, :corpus, corpus)
end)
body = %{
input: movie_corpus,
model: "intfloat/multilingual-e5-large",
encoding_format: "float"
}
headers = [{"authorization", "Bearer token-abc123"}]
{:ok, %{body: response}} = Req.post("http://podman-ml:4000/v1/embeddings", json: body, headers: headers, receive_timeout: 300_000)
%{"data" => embeddings, "usage" => usage_data} = response
embeddings =
Enum.sort_by(embeddings, fn e ->
e["index"]
end)
|> Enum.map(fn e -> e["embedding"] end)
|> Nx.tensor()
HNSWLib.Index.add_items(index, embeddings, ids: movie_ids)
%{movies: movies_with_corpus, usage: usage_data}
end
stream =
DataFrame.to_rows_stream(df, atom_keys: true, chunk_size: 500)
path = "/root/notebooks/files/corpus_#{space}.bin"
movies =
if Kino.Input.read(mode) == :index do
batches =
stream
|> Stream.take(100_000)
|> Stream.chunk_every(1000)
|> Enum.map(generate_embeddings_and_index)
usages = Enum.map(batches, fn batch -> batch.usage end)
tokens = Enum.reduce(usages, 0, fn usage, acc ->
usage["total_tokens"] + acc
end)
Logger.info("Tokens: #{tokens}")
{:ok, count} = HNSWLib.Index.get_current_count(index)
Logger.info("Count: #{count}")
HNSWLib.Index.save_index(index, path)
Enum.flat_map(batches, fn batch -> batch.movies end)
else
stream
|> Enum.take(100_000)
|> Enum.map(fn movie ->
corpus = corpus_template.(movie)
Map.put(movie, :corpus, corpus)
end)
end
{:ok, index} = HNSWLib.Index.load_index(space, dimensions, path)
input = Kino.Input.textarea("Query")
original_query = Kino.Input.read(input)
query_body = %{
input: original_query,
model: "intfloat/multilingual-e5-large",
encoding_format: "float"
}
{:ok, %{body: query}} = Req.post(client, url: "/v1/embeddings", json: query_body)
%{"data" => [%{"embedding" => embedding}]} = query
query = Nx.tensor(embedding)
{:ok, labels, dists} = HNSWLib.Index.knn_query(index, query, k: 15)
matching_id =
Nx.to_list(labels)
|> List.flatten()
matches = Enum.filter(movies, fn m -> m.id in matching_id end)
matches_corpus = Enum.map(matches, fn m -> m.corpus end)
body = %{
query: original_query,
documents: matches_corpus,
model: "BAAI/bge-reranker-v2-m3"
}
headers = [{"authorization", "Bearer token-abc123"}]
{:ok, %{body: %{"results" => reranking_results}}} = Req.post(client, url: "/v1/rerank", json: body, headers: headers)
results = Enum.sort_by(reranking_results, fn result ->
result["index"]
end)
results =
matches
|> Enum.zip(results)
|> Enum.sort_by(fn {_movie, relevance} ->
relevance["relevance_score"]
end, :desc)
# |> Enum.filter(fn {_movie, relevance} ->
# relevance["relevance_score"] > 0.2
# end)
results
|> Enum.map(fn {movie, relevance} ->
%{title: movie.title, rating: movie.vote_average, popularity: movie.popularity, overview: movie.overview, release: movie.release_date, language: movie.original_language, score: relevance["relevance_score"]}
end)
|> Kino.DataTable.new(keys: [:title, :overview, :rating, :release, :language, :popularity, :score])