Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Vector Search

vector-embeddings.livemd

Vector Search

Mix.install([
  {:req, "~> 0.5"},
  {:explorer, "~> 0.10.1"},
  {:hnswlib, "~> 0.1.5"},
  {:kino, "~> 0.14"},

  {:elixir_make, "~> 0.9.0", override: true}
])

Initialization

alias Explorer.DataFrame
alias Explorer.Series

require Explorer.DataFrame

We use the :explorer library to parse our json data. For our experiment we will query it for movies released within or after 2024. We can change the filter later if we want to experiment with a larger dataset.

Prepare dataset

client = Req.new(base_url: "https://podman-ml.bombay-scylla.ts.net")

df = 
  Kino.FS.file_path("movies.json")
  |> DataFrame.from_ndjson!()

df = DataFrame.mutate(df, 
  release_date: Series.cast(release_date, :date)
)
df =
  df
  |> DataFrame.filter(year(release_date) >= 2020)
  |> DataFrame.filter(not(adult))
  |> DataFrame.filter(status == "Released")
  |> DataFrame.filter(lengths(split(overview, " ")) > 5)
  |> DataFrame.sort_by(desc: popularity)

Movie overview with L2

Instantiate a new index

We will need to create an index so we can store the embeddings. We will be using the :hnswlib to do this.

space = :l2 # :cosine, :ip
dimensions = 1024
max_elements = 100_000

{:ok, index} = HNSWLib.Index.new(space, dimensions, max_elements)

mode = Kino.Input.select("Mode", [index: "index", load: "load"])

We’ll pick a 10000 samples to generate embeddings, we can always add more later. We’ll extract the overviews of the movie records and call our embedding model.

# some_text = "hi there!"

# demo_body = %{
#   input: some_text,
#   model: "intfloat/multilingual-e5-large",
#   encoding_format: "float"
# }

# Req.post(client, url: "/v1/embeddings", json: demo_body, receive_timeout: 300_000)
require Logger

generate_embeddings_and_index = fn batch -> 
   movie_ids = 
    Enum.map(batch, fn movie -> movie.id end)
    |> Nx.tensor()
  
   movie_overviews = 
    Enum.map(batch, fn movie -> 
      movie.overview  
    end)

  body = %{
    input: movie_overviews,
    model: "intfloat/multilingual-e5-large",
    encoding_format: "float"
  }

  # headers = [{"authorization", "Bearer token-abc123"}]
  
  {:ok, %{body: response}} = Req.post(client, url: "/v1/embeddings", json: body, receive_timeout: 300_000)
  
  %{"data" => embeddings, "usage" => usage_data} = response

  embeddings = 
    Enum.sort_by(embeddings, fn e -> 
      e["index"]
    end)
    |> Enum.map(fn e -> e["embedding"] end)
    |> Nx.tensor()

  HNSWLib.Index.add_items(index, embeddings, ids: movie_ids)

  %{movies: batch, usage: usage_data}
end

stream = 
  DataFrame.to_rows_stream(df, atom_keys: true, chunk_size: 500)

path = "/root/notebooks/files/overview_only_#{space}.bin"

movies =
  if Kino.Input.read(mode) == :index do
    batches = 
      stream
      |> Stream.take(100_000)
      |> Stream.chunk_every(1000)
      |> Enum.map(generate_embeddings_and_index)
      
    usages = Enum.map(batches, fn batch -> batch.usage end)
    
    tokens = Enum.reduce(usages, 0, fn usage, acc -> 
      usage["total_tokens"] + acc  
    end)
    
    Logger.info("Tokens: #{tokens}")
      
    {:ok, count} = HNSWLib.Index.get_current_count(index)
    
    Logger.info("Count: #{count}")
    
    HNSWLib.Index.save_index(index, path)
  
    Enum.flat_map(batches, fn batch -> batch.movies end)
  else
    Enum.take(stream, 100_000)
  end

Querying the index

This is where we query the index. First we need to generate an embedding of our query, and do a nearest neighbor search.

{:ok, index} = HNSWLib.Index.load_index(space, dimensions, path)

input = Kino.Input.textarea("Query")
original_query = Kino.Input.read(input)

query_body = %{
  input: original_query,
  model: "intfloat/multilingual-e5-large",
  encoding_format: "float"
}

{:ok, %{body: query}} = Req.post(client, url: "/v1/embeddings", json: query_body)

%{"data" => [%{"embedding" => embedding}]} = query

query = Nx.tensor(embedding)

{:ok, labels, dists} = HNSWLib.Index.knn_query(index, query, k: 15)

IO.inspect(labels)
IO.inspect(dists)

matching_id = 
  Nx.to_list(labels) 
  |> List.flatten() 

matches = Enum.filter(movies, fn m -> m.id in matching_id end)

matches
|> Enum.map(fn movie -> 
  %{title: movie.title, rating: movie.vote_average, popularity: movie.popularity, overview: movie.overview, release: movie.release_date, language: movie.original_language}
end)
|> Kino.DataTable.new(keys: [:title, :overview, :rating, :release, :language, :popularity])
matches_overviews = Enum.map(matches, fn m -> m.overview end)

body = %{
  query: original_query,
  documents: matches_overviews,
  model: "BAAI/bge-reranker-v2-m3"
}

{:ok, %{body: %{"results" => reranking_results}}} = Req.post(client, url: "/v1/rerank", json: body)

results = Enum.sort_by(reranking_results, fn result -> 
  result["index"]  
end)

results = 
  matches
  |> Enum.zip(results)
  |> Enum.sort_by(fn {_movie, relevance} -> 
    relevance["relevance_score"] 
  end, :desc)
  # |> Enum.filter(fn {_movie, relevance} -> 
  #   relevance["relevance_score"] > 0.2  
  # end)

results
|> Enum.map(fn {movie, relevance} -> 
  %{title: movie.title, rating: movie.vote_average, popularity: movie.popularity, overview: movie.overview, release: movie.release_date, language: movie.original_language, score: relevance["relevance_score"]}
end)
|> Kino.DataTable.new(keys: [:title, :overview, :rating, :release, :language, :popularity, :score])

Text corpus with L2

space = :l2
dimensions = 1024
max_elements = 200_000

{:ok, index} = HNSWLib.Index.new(space, dimensions, max_elements)

mode = Kino.Input.select("Mode", [index: "index", load: "load"])
require Logger

corpus_template = fn movie -> 
  genres = Enum.map(movie.genres, fn g -> g["name"] end)
  production_companies = Enum.map(movie.production_companies, fn pc -> pc["name"] end)
  collection = Map.get(movie.belongs_to_collection || %{}, "name")
  
  ~s"""  
  Name: #{movie.title} 
  Genres: #{Enum.join(genres, ", ")}

  Overview: #{movie.overview}
  Collection: #{collection}

  Production companies: #{Enum.join(production_companies, ", ")}
  """
end

generate_embeddings_and_index = fn batch -> 
   movie_ids = 
    Enum.map(batch, fn movie -> movie.id end)
    |> Nx.tensor()
  
  movie_corpus = Enum.map(batch, corpus_template)

  movies_with_corpus = 
    batch
    |> Enum.with_index()
    |> Enum.map(fn {movie, idx} -> 
      corpus = Enum.at(movie_corpus, idx)
      
      Map.put(movie, :corpus, corpus) 
    end)

  body = %{
    input: movie_corpus,
    model: "intfloat/multilingual-e5-large",
    encoding_format: "float"
  }

  headers = [{"authorization", "Bearer token-abc123"}]
  
  {:ok, %{body: response}} = Req.post("http://podman-ml:4000/v1/embeddings", json: body, headers: headers, receive_timeout: 300_000)
  
  %{"data" => embeddings, "usage" => usage_data} = response

  embeddings = 
    Enum.sort_by(embeddings, fn e -> 
      e["index"]
    end)
    |> Enum.map(fn e -> e["embedding"] end)
    |> Nx.tensor()

  HNSWLib.Index.add_items(index, embeddings, ids: movie_ids)

  %{movies: movies_with_corpus, usage: usage_data}
end

stream = 
  DataFrame.to_rows_stream(df, atom_keys: true, chunk_size: 500)
    
path = "/root/notebooks/files/corpus_#{space}.bin"

movies =
  if Kino.Input.read(mode) == :index do
    batches = 
      stream
      |> Stream.take(100_000)
      |> Stream.chunk_every(1000)
      |> Enum.map(generate_embeddings_and_index)
        
    usages = Enum.map(batches, fn batch -> batch.usage end)
    
    tokens = Enum.reduce(usages, 0, fn usage, acc -> 
      usage["total_tokens"] + acc  
    end)
    
    Logger.info("Tokens: #{tokens}")
    
    {:ok, count} = HNSWLib.Index.get_current_count(index)
    
    Logger.info("Count: #{count}")
    
    HNSWLib.Index.save_index(index, path)
    
    Enum.flat_map(batches, fn batch -> batch.movies end)
  else
    stream
    |> Enum.take(100_000)
    |> Enum.map(fn movie -> 
      corpus = corpus_template.(movie)

      Map.put(movie, :corpus, corpus)
    end)
  end
{:ok, index} = HNSWLib.Index.load_index(space, dimensions, path)

input = Kino.Input.textarea("Query")
original_query = Kino.Input.read(input)

query_body = %{
  input: original_query,
  model: "intfloat/multilingual-e5-large",
  encoding_format: "float"
}

{:ok, %{body: query}} = Req.post(client, url: "/v1/embeddings", json: query_body)

%{"data" => [%{"embedding" => embedding}]} = query

query = Nx.tensor(embedding)

{:ok, labels, dists} = HNSWLib.Index.knn_query(index, query, k: 15)

matching_id = 
  Nx.to_list(labels) 
  |> List.flatten() 

matches = Enum.filter(movies, fn m -> m.id in matching_id end)
matches_corpus = Enum.map(matches, fn m -> m.corpus end)

body = %{
  query: original_query,
  documents: matches_corpus,
  model: "BAAI/bge-reranker-v2-m3"
}

headers = [{"authorization", "Bearer token-abc123"}]

{:ok, %{body: %{"results" => reranking_results}}} = Req.post(client, url: "/v1/rerank", json: body, headers: headers)

results = Enum.sort_by(reranking_results, fn result -> 
  result["index"]  
end)

results = 
  matches
  |> Enum.zip(results)
  |> Enum.sort_by(fn {_movie, relevance} -> 
    relevance["relevance_score"] 
  end, :desc)
  # |> Enum.filter(fn {_movie, relevance} -> 
  #   relevance["relevance_score"] > 0.2
  # end)

results
|> Enum.map(fn {movie, relevance} -> 
  %{title: movie.title, rating: movie.vote_average, popularity: movie.popularity, overview: movie.overview, release: movie.release_date, language: movie.original_language, score: relevance["relevance_score"]}
end)
|> Kino.DataTable.new(keys: [:title, :overview, :rating, :release, :language, :popularity, :score])