Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Vector Search with LLM

vector-search-with-llm.livemd

Vector Search with LLM

Mix.install([
  {:ollama, "~> 0.8"},
  {:req, "~> 0.5"},
  {:explorer, "~> 0.10.1"},
  {:instructor, "~> 0.1.0"},
  {:hnswlib, "~> 0.1.5"},
  {:kino, "~> 0.14"}
])

Initialization

alias Explorer.DataFrame
alias Explorer.Series

require Explorer.DataFrame

client = Req.new(base_url: "https://models.arrakis.upmaru.network")

Prepare Dataset

df = 
  Kino.FS.file_path("movies.json")
  |> DataFrame.from_ndjson!()

df = DataFrame.mutate(df, 
  release_date: Series.cast(release_date, :date)
)
df =
  df
  |> DataFrame.filter(year(release_date) >= 2020)
  |> DataFrame.filter(not(adult))
  |> DataFrame.filter(status == "Released")
  |> DataFrame.filter(lengths(split(overview, " ")) > 5)
  |> DataFrame.sort_by(desc: popularity)

LLM Generated Description

space = :l2
dimensions = 1024
max_elements = 100_000

{:ok, index} = HNSWLib.Index.new(space, dimensions, max_elements)
stream = 
  DataFrame.to_rows_stream(df, atom_keys: true, chunk_size: 500)

generate_llm_description = fn movie -> 
  genres = Enum.map(movie.genres, fn g -> g["name"] end)
  production_companies = Enum.map(movie.production_companies, fn pc -> pc["name"] end)
  collection = Map.get(movie.belongs_to_collection || %{}, "name")

  text =
    ~s"""  
    Name: #{movie.title} 
    Genres: #{Enum.join(genres, ", ")}
  
    Overview: #{movie.overview}
    Collection: #{collection}
  
    Production companies: #{Enum.join(production_companies, ", ")}
    """

  prompt =
    """
    Write an accurate description of the movie in a single paragraph, don't use quote marks or special characters in the response.

    #{text}
    """

  body = %{
    model: "mistral-small-24b",
    messages: [
      %{role: "user", content: prompt}
    ],
    n: 5
  }

  {:ok, response} = Req.post(client, url: "/v1/chat/completions", json: body)

  %{body: %{"choices" => choices}} = response

  %{"message" => %{"content" => description}} = List.first(choices)

  Map.put(movie, :description, description)
end

movies = 
  stream
  |> Enum.take(200)
  |> Enum.map(generate_llm_description)

movies
|> Enum.map(fn m -> 
  %{
    id: m.id, 
    title: m.title, 
    status: m.status, 
    release: m.release_date, 
    genres: Enum.map(m.genres, fn g -> g["name"] end), 
    description: m.description, 
    description_length: byte_size(m.description)
  }  
end)
|> Kino.DataTable.new(keys: [:id, :title, :description, :status, :release, :genres, :description_length])

Generate Embeddings and query using description

generate_embeddings_and_index = fn batch -> 
   movie_ids = 
    Enum.map(batch, fn movie -> movie.id end)
    |> Nx.tensor()
  
   movie_llm_descriptions = 
    Enum.map(batch, fn movie -> 
      movie.description
    end)

  body = %{
    input: movie_llm_descriptions,
    model: "multilingual-e5-large",
    encoding_format: "float"
  }
  
  {:ok, %{body: response}} = Req.post(client, url: "/v1/embeddings", json: body, receive_timeout: 300_000)
  
  %{"data" => embeddings, "usage" => usage_data} = response

  embeddings = 
    Enum.sort_by(embeddings, fn e -> 
      e["index"]
    end)
    |> Enum.map(fn e -> e["embedding"] end)
    |> Nx.tensor()

  HNSWLib.Index.add_items(index, embeddings, ids: movie_ids)

  %{movies: batch, usage: usage_data}
end

movies
|> Enum.chunk_every(100)
|> Enum.map(generate_embeddings_and_index)
|> Enum.flat_map(fn b -> b.movies end)
input = Kino.Input.textarea("Query")
original_query = Kino.Input.read(input)

query_body = %{
  input: original_query,
  model: "multilingual-e5-large",
  encoding_format: "float"
}

{:ok, %{body: query}} = Req.post(client, url: "/v1/embeddings", json: query_body)

%{"data" => [%{"embedding" => embedding}]} = query

query = Nx.tensor(embedding)

{:ok, labels, dists} = HNSWLib.Index.knn_query(index, query, k: 15)

matching_id = 
  Nx.to_list(labels) 
  |> List.flatten() 

matches = Enum.filter(movies, fn m -> m.id in matching_id end)
matches_description = Enum.map(matches, fn m -> m.description end)

body = %{
  query: original_query,
  documents: matches_description,
  model: "bge-reranker-v2-m3"
}

{:ok, %{body: %{"results" => reranking_results}}} = Req.post(client, url: "/v1/rerank", json: body)

results = Enum.sort_by(reranking_results, fn result -> 
  result["index"]  
end)

results = 
  matches
  |> Enum.zip(results)
  |> Enum.sort_by(fn {_movie, relevance} -> 
    relevance["relevance_score"] 
  end, :desc)
  |> Enum.filter(fn {_movie, relevance} -> 
    relevance["relevance_score"] > 0.2
  end)

results
|> Enum.map(fn {movie, relevance} -> 
  %{title: movie.title, description: movie.description, rating: movie.vote_average, popularity: movie.popularity, overview: movie.overview, release: movie.release_date, language: movie.original_language, score: relevance["relevance_score"]}
end)
|> Kino.DataTable.new(keys: [:title, :description, :overview, :rating, :release, :language, :popularity, :score])

Using LLM to generate setting summary data

In this section we’ll discover what else we can generate with LLMs. Our description maybe limited however we can extract more information out from our description to make the content for each movie even better.

defmodule MovieSetting do
  use Ecto.Schema
  use Instructor

  @llm_doc """
  Output of the movie setting based on a given description.

  ## Fields:
  - description: A description about the setting of the movie in a single paragraph.
  - reason: A brief description of the reason why the model thinks the movie took place in those locations.
  - locations: Potential locations the movie takes place.
  """ 
  @primary_key false
  embedded_schema do
    field :description, :string
    field :reason, :string
    field :locations, {:array, :string}
  end
end

instructor_config = [
  adapter: Instructor.Adapters.Llamacpp,
  api_url: "https://models.arrakis.upmaru.network"
]

predict_movie_setting = fn movie -> 
  {:ok, setting} = Instructor.chat_completion([
    model: "mistral-small-24b",
    mode: :json_schema,
    response_model: MovieSetting,
    messages: [
      %{
        role: "user",
        content: """
           Make a best estimate based on the description below on the type of setting the movie is likely to take place, whether it takes place in space, forest, jungle, tundra, desert, in the ocean, or futuristic city?
          
           
           #{movie.description}
           
          """
      }
    ]
  ], instructor_config)

  Map.put(movie, :setting, setting)
end

movies = Enum.map(movies, predict_movie_setting)
movies
|> Enum.map(fn m -> 
  %{
    id: m.id, 
    title: m.title, 
    setting_locations: m.setting.locations, 
    setting_reason: m.setting.reason,
    setting_description: m.setting.description,
    release: m.release_date, 
    genres: Enum.map(m.genres, fn g -> g["name"] end), 
    description: m.description, 
    description_length: byte_size(m.description)
  }  
end)
|> Kino.DataTable.new(keys: [:id, :title, :setting_locations, :setting_reason, :setting_description, :description, :status, :release, :genres, :description_length])

Generate questions from descriptions

We can generate more questions based on the movie description and setting description.

defmodule MovieQuery do
  use Ecto.Schema
  use Instructor

  @llm_doc """
  Output of potential question about the movie based on the description.

  ## Fields:
  - question: Potential question based on the description.
  - reason: A brief description of the reason why the model thinks the question relates to the movie.
  """

  @primary_key false
  embedded_schema do
    field :question, :string
    field :reason, :string
  end
end

predict_movie_queries = fn movie -> 
  queries = Instructor.chat_completion([
    model: "mistral-small-24b",
    mode: :json_schema,
    response_model: {:array, MovieQuery},
    messages: [
      %{
        role: "system",
        content: "You are a curious mind that generates questions about movies. You will generate at least 3 questions about the movies."
      },
      %{
        role: "user",
        content: ~s"""
           [description and setting description about the movie]
          
           #{movie.description}

           #{movie.setting.description}
          """
      }
    ]
  ], instructor_config)

  queries = Enum.map(queries, fn {:ok, query} -> query end)

  Map.put(movie, :queries, queries)
end

movies = Enum.map(movies, predict_movie_queries)
movies
|> Enum.map(fn m -> 
  %{
    id: m.id, 
    title: m.title, 
    questions: Enum.map(m.queries, fn q -> q.question end),
    questions_count: Enum.count(m.queries),
    description: m.description,
    setting_description: m.setting.description,
    setting_locations: m.setting.locations
  }
end)
|> Kino.DataTable.new(keys: [:id, :title, :questions, :questions_count, :description, :setting_description, :setting_locations])

We’ll now build 2 new index using cosine and index the description and setting description into the indices.

space = :cosine
dimensions = 1024
max_elements = 50

{:ok, description_index} = HNSWLib.Index.new(space, dimensions, max_elements)

{:ok, setting_index} = HNSWLib.Index.new(space, dimensions, max_elements)
generate_description_embeddings_and_index = fn batch -> 
  movie_ids = 
    Enum.map(batch, fn movie -> movie.id end)
    |> Nx.tensor()
  
  movie_llm_descriptions = 
    Enum.map(batch, fn movie -> 
      movie.description
    end)

  body = %{
    input: movie_llm_descriptions,
    model: "multilingual-e5-large",
    encoding_format: "float"
  }
  
  {:ok, %{body: response}} = Req.post(
    client, 
    url: "/v1/embeddings", 
    json: body, 
    receive_timeout: 300_000
  )
  
  %{"data" => embeddings} = response

  embeddings = 
    Enum.sort_by(embeddings, fn e -> 
      e["index"]
    end)
    |> Enum.map(fn e -> e["embedding"] end)
    |> Nx.tensor()

  HNSWLib.Index.add_items(description_index, embeddings, ids: movie_ids)

  batch
end

generate_setting_embeddings_and_index = fn batch -> 
  movie_ids = 
    Enum.map(batch, fn movie -> movie.id end)
    |> Nx.tensor()
  
  movie_settings = 
    Enum.map(batch, fn movie ->  
      movie.setting.description
    end)

  body = %{
    input: movie_settings,
    model: "multilingual-e5-large",
    encoding_format: "float"
  }
  
  {:ok, %{body: response}} = Req.post(
    client, 
    url: "/v1/embeddings", 
    json: body, receive_timeout: 300_000
  )
  
  %{"data" => embeddings} = response

  embeddings = 
    Enum.sort_by(embeddings, fn e -> 
      e["index"]
    end)
    |> Enum.map(fn e -> e["embedding"] end)
    |> Nx.tensor()

  HNSWLib.Index.add_items(setting_index, embeddings, ids: movie_ids)

  batch
end

movies =
  movies
  |> Enum.chunk_every(10)
  |> Enum.map(generate_description_embeddings_and_index)
  |> Enum.map(generate_setting_embeddings_and_index)
  |> List.flatten()
input = Kino.Input.textarea("Query")

We query both the indices

original_query = Kino.Input.read(input)

query_body = %{
  input: original_query,
  model: "multilingual-e5-large",
  encoding_format: "float"
}

{:ok, %{body: query}} = Req.post(client, url: "/v1/embeddings", json: query_body)

%{"data" => [%{"embedding" => embedding}]} = query

query = Nx.tensor(embedding)

{:ok, description_matches, _desc_dists} = HNSWLib.Index.knn_query(description_index, query, k: 15)
{:ok, setting_matches, _desc_dists} = HNSWLib.Index.knn_query(setting_index, query, k: 15)

description_matching_ids = 
  Nx.to_list(description_matches) 
  |> List.flatten() 

setting_matching_ids =
  Nx.to_list(setting_matches)
  |> List.flatten()

matches_on_description = Enum.filter(movies, fn m -> m.id in description_matching_ids end)
matches_on_setting = Enum.filter(movies, fn m -> m.id in setting_matching_ids end)

Run rerank on both the indicies and merge the result by averaging the 2 relevance scores.

matches_descriptions = Enum.map(matches_on_description, fn m -> m.description end)
matches_setting_descriptions = Enum.map(matches_on_setting, fn m -> m.setting.description end)

description_rerank_body = %{
  query: original_query,
  documents: matches_descriptions,
  model: "bge-reranker-v2-m3"
}

setting_description_rerank_body = %{
  query: original_query,
  documents: matches_setting_descriptions,
  model: "bge-reranker-v2-m3"
}

{:ok, %{body: %{"results" => description_reranking_results}}} = Req.post(client, url: "/v1/rerank", json: description_rerank_body)

{:ok, %{body: %{"results" => setting_description_reranking_results}}} = Req.post(client, url: "/v1/rerank", json: setting_description_rerank_body)

description_results = 
  Enum.sort_by(description_reranking_results, fn result -> 
    result["index"]  
  end)

setting_description_results =
  Enum.sort_by(setting_description_reranking_results, fn result -> 
    result["index"]  
  end)

description_results = 
  matches_on_description
  |> Enum.zip(description_results)
  |> Enum.sort_by(fn {_movie, relevance} -> 
    relevance["relevance_score"] 
  end, :desc)

setting_description_results =
  matches_on_setting
  |> Enum.zip(setting_description_results)
  |> Enum.sort_by(fn {_movie, relevance} -> 
    relevance["relevance_score"] 
  end, :desc)


combined_relevance =
  description_results
  |> Enum.concat(setting_description_results)
  |> Enum.group_by(fn {m, _relevance} -> m.id end)
  |> Enum.map(fn {id, movies} -> 
    {id, Enum.sum(Enum.map(movies, fn {_m, r} -> r["relevance_score"] end)) / 2}
  end)
  |> Enum.into(%{})
  |> Enum.filter(fn {_, relevance} -> relevance > 0.05 end)

combined_movies =
  matches_on_description
  |> Enum.concat(matches_on_setting)
  |> Enum.uniq_by(fn m -> m.id end)

final_result = Enum.reduce(combined_relevance, [], fn {id, score}, acc -> 
  movie = Enum.find(combined_movies, fn m -> m.id == id end)

  movie = Map.put(movie, :relevance, score)

  acc ++ [movie]
end)
|> Enum.sort_by(fn m -> m.relevance end, :desc)
|> Enum.map(fn movie -> 
  %{title: movie.title, queries: Enum.map(movie.queries, fn q -> q.question end), setting: movie.setting.description, description: movie.description, rating: movie.vote_average, popularity: movie.popularity, overview: movie.overview, release: movie.release_date, language: movie.original_language, score: movie.relevance}
end)
|> Kino.DataTable.new(keys: [:title, :queries,:setting, :description, :overview, :rating, :release, :language, :popularity, :score])

With Zero Shot Classification

We will use the LLM to classify our query and use that extra information to query the index in a better way.

input = Kino.Input.textarea("Query")
defmodule QueryClassification do
  use Ecto.Schema
  use Instructor

  @llm_doc """
  Query classification
  
  ## Fields:
  - reply: Should the reply be list of results or a recommendation?
  - about: What is the query about? Location, character or general knowledge?
  - improved_question: An improved version of the query that will provide a better answer.
  """
  @primary_key false
  embedded_schema do
    field :reply, Ecto.Enum, values: [:results, :recommendation]
    field :about, Ecto.Enum, values: [:general, :character, :location]
    field :improved_query, :string
  end
end

original_query = Kino.Input.read(input)

{:ok, query_classification} = Instructor.chat_completion([
    model: "mistral-small-24b",
    mode: :json_schema,
    response_model: QueryClassification,
    messages: [
      %{
        role: "user",
        content: """
           Correctly identify what the query is about and what type of reply should be made. 
           Also provide a better quality question that will give better results.
        
           
           #{original_query}
           
          """
      }
    ]
  ], instructor_config)
index = case query_classification.about do
  :location ->
    setting_index

  _ ->
    description_index
end

query_body = %{
  input: query_classification.improved_query,
  model: "multilingual-e5-large",
  encoding_format: "float"
}

{:ok, %{body: query}} = Req.post(client, url: "/v1/embeddings", json: query_body)

%{"data" => [%{"embedding" => embedding}]} = query

query = Nx.tensor(embedding)

{:ok, labels, distance} = HNSWLib.Index.knn_query(index, query, k: 15)

matching_id = 
  Nx.to_list(labels) 
  |> List.flatten() 

matches = Enum.filter(movies, fn m -> m.id in matching_id end)
matched_text = Enum.map(matches, fn m -> 
  case query_classification.about do
    :location ->
      m.setting.description

    _ -> 
      m.description
  end
end)

description_rerank_body = %{
  query: original_query,
  documents: matched_text,
  model: "bge-reranker-v2-m3"
}

{:ok, %{body: %{"results" => reranking_results}}} = 
  Req.post(client, url: "/v1/rerank", json: description_rerank_body)


results = Enum.sort_by(reranking_results, fn result -> 
  result["index"]  
end)

results = 
  matches
  |> Enum.zip(results)
  |> Enum.sort_by(fn {_movie, relevance} -> 
    relevance["relevance_score"] 
  end, :desc)
  |> Enum.filter(fn {_movie, relevance} -> 
    relevance["relevance_score"] > 0.1
  end)

results
|> Enum.map(fn {movie, relevance} -> 
  %{title: movie.title, setting: movie.setting.description, description: movie.description, rating: movie.vote_average, popularity: movie.popularity, overview: movie.overview, release: movie.release_date, language: movie.original_language, score: relevance["relevance_score"]}
end)
|> Kino.DataTable.new(keys: [:title, :setting, :description, :overview, :rating, :release, :language, :popularity, :score])

Rag output

We should also be able to build a RAG output with our search result. We’ll build a custom KinoReply to handle streaming response.

defmodule KinoReply do
  use Kino.JS
  use Kino.JS.Live

  def new(starting_string \\ "") do
    Kino.JS.Live.new(__MODULE__, starting_string)
  end

  def append(kino, string) do
    Kino.JS.Live.cast(kino, {:append, string})
  end
  
  def finish(kino) do
    Kino.JS.Live.cast(kino, :finish)
  end

  @impl true
  def init(string, ctx) do
    {:ok, assign(ctx, reply: string)}
  end

  @impl true
  def handle_connect(ctx) do
    {:ok, ctx.assigns.reply, ctx}
  end

  @impl true
  def handle_cast({:append, string}, ctx) do
    broadcast_event(ctx, "append", string)

    {:noreply, assign(ctx, reply: "#{ctx.assigns.reply} #{string}")}
  end

  @impl true
  def handle_cast(:finish, ctx) do
    broadcast_event(ctx, "finish", nil)
    
    {:noreply, ctx}
  end

  asset "main.js" do
    """
    export function init(ctx, chunk) {
      ctx.importJS("https://cdn.jsdelivr.net/npm/markdown-it@14.1.0/dist/markdown-it.min.js")
      // ctx.importJS("https://cdn.jsdelivr.net/npm/marked@15.0.7/lib/marked.umd.min.js")

      ctx.root.innerHTML = chunk;
      const el = ctx.root;
      
      el.classList.add("markdown");
      el.classList.add("prose");

      ctx.handleEvent("append", (newChunk) => {
        const md = markdownit('commonmark')
        ctx.root.innerHTML = md.renderInline(ctx.root.innerHTML.concat(newChunk));
      })

      ctx.handleEvent("finish", (_n) => {
        const md = markdownit('commonmark')
        ctx.root.innerHTML = md.render(ctx.root.innerHTML);
      })
    }
    """
  end
end

We use the stream parser from here Open AI Streaming in Elixir Phoenix

defmodule StreamParser do
  def parse(chunk) do
    chunk
    |> String.split("data: ")
    |> Enum.map(&String.trim/1)
    |> Enum.map(&decode/1)
    |> Enum.reject(&is_nil/1)
  end

  defp decode(""), do: nil
  defp decode("[DONE]"), do: nil
  defp decode(data), do: Jason.decode!(data)
end

Let’s generate the RAG output.

results_text = Enum.map(results, fn {m, _} -> 
  ~s"""
  Title: #{m.title}
  Description: #{m.description}
  Setting: #{m.setting.description}
  Rating: #{m.vote_average}
  """  
end)
|> Enum.join("\n")

prompt =
  ~s"""
  The users query: #{original_query}

  We found #{Enum.count(results)} results matching the user's requirement.

  #{results_text}

  Please respond to the user with the data of the movie and explain the reasoning why they match the query given.
  Make sure the output complies with commonmark.
  """

frame = Kino.Frame.new()
md = Kino.Markdown.new("**You**: #{original_query}")

Kino.Frame.append(frame, md)
Kino.render(frame)

reply = KinoReply.new("**MovieBOT**: ")
Kino.Frame.append(frame, reply)

body = %{
  model: "mistral-small-24b",
  messages: [
    %{role: "user", content: prompt}
  ],
  stream: true
}

{:ok, response} = 
  Req.post(client, url: "/v1/chat/completions", json: body, into: fn {:data, data}, context ->
    case StreamParser.parse(data) do
      [%{"choices" => [%{"delta" => %{"content" => chunk}}]}] ->
        chunk = Regex.replace(~r/\n\n/, chunk, "

"
) chunk = Regex.replace(~r/\n/, chunk, "
"
) KinoReply.append(reply, chunk) {:cont, context} _ -> KinoReply.finish(reply) {:halt, context} end end) :ok