Sponsored by AppSignal
Would you like to see your link here? Contact us
Notesclub

Pinecone

livebooks/evision/pinecone.livemd

Pinecone

Mix.install(
  [
    {:kino, "~> 0.12"},
    {:evision, github: "cocoa-xu/evision", branch: "main"},
    {:pinecone, "~> 0.1"},
    {:jason, "~> 1.4"}
  ],
  system_env: [
    {"EVISION_PREFER_PRECOMPILED", "false"}
  ]
)

Prepare images

image_files = Path.wildcard("/home/livebook/evision/test-images/*.{jpg,png}")
images =
  image_files
  |> Enum.map(fn image_file ->
    Evision.imread(image_file)
  end)

Kino.Layout.grid(images, columns: 4)

Facial feature extraction

recognizer =
  Evision.Zoo.FaceRecognition.SFace.init(:default_model,
    backend: Evision.Constant.cv_DNN_BACKEND_OPENCV(),
    target: Evision.Constant.cv_DNN_TARGET_CPU(),
    distance_type: :cosine_similarity,
    cosine_threshold: 0.363,
    l2_norm_threshold: 1.128
  )

detector =
  Evision.Zoo.FaceDetection.YuNet.init(:default_model,
    backend: Evision.Constant.cv_DNN_BACKEND_OPENCV(),
    target: Evision.Constant.cv_DNN_TARGET_CPU(),
    nms_threshold: 0.3,
    conf_threshold: 0.8,
    top_k: 5
  )
[feature_list, visualized_list] =
  images
  |> Enum.reduce([[], []], fn image, [feature_acc, visualized_acc] ->
    results = Evision.Zoo.FaceDetection.YuNet.infer(detector, image)

    bbox = Evision.Mat.to_nx(results, Nx.BinaryBackend)[0][0..-2//1]

    feature =
      recognizer
      |> Evision.Zoo.FaceRecognition.SFace.infer(image, bbox)
      |> Evision.Mat.to_nx()
      |> Evision.Mat.from_nx()

    visualized = Evision.Zoo.FaceDetection.YuNet.visualize(image, results[0])

    [[feature | feature_acc], [visualized | visualized_acc]]
  end)
  |> Enum.map(&Enum.reverse/1)

Kino.Layout.grid(visualized_list, columns: 4)
feature_list
feature_list
|> hd()
|> Evision.Mat.to_nx()
Evision.Zoo.FaceRecognition.SFace.match_feature(
  recognizer,
  Enum.at(feature_list, 8),
  Enum.at(feature_list, 15)
)
Evision.Zoo.FaceRecognition.SFace.match_feature(
  recognizer,
  Enum.at(feature_list, 8),
  Enum.at(feature_list, 0)
)
vectors =
  feature_list
  |> Enum.zip(image_files)
  |> Enum.map(fn {feature, image_file} ->
    values =
      feature
      |> Evision.Mat.to_nx(Nx.BinaryBackend)
      |> Nx.flatten()
      |> Nx.to_list()

    %{
      values: values,
      id: image_file
    }
  end)

Create Index

api_key_input = Kino.Input.password("API_KEY")
environment_input = Kino.Input.text("ENVIRONMENT")
api_key = Kino.Input.read(api_key_input)
environment = Kino.Input.read(environment_input)
defmodule Pinecone.Controller do
  def new(opts) do
    environment = Keyword.get(opts, :environment)
    api_key = Keyword.get(opts, :api_key)

    middleware = [
      {Tesla.Middleware.BaseUrl, "https://controller.#{environment}.pinecone.io"},
      Tesla.Middleware.JSON,
      {Tesla.Middleware.Headers, [{"api-key", api_key}]}
    ]

    Tesla.client(middleware)
  end

  def create_index(client, opts) do
    params = Enum.into(opts, %{})

    client
    |> Tesla.post("/databases", params)
    |> handle_response()
  end

  def list_indexes(client) do
    client
    |> Tesla.get("/databases")
    |> handle_response()
  end

  def describe_index(client, name) do
    client
    |> Tesla.get("/databases/#{name}")
    |> handle_response()
  end

  def delete_index(client, name) do
    client
    |> Tesla.delete("/databases/#{name}")
    |> handle_response()
  end

  def handle_response(resp, opts \\ [])

  def handle_response({:error, _} = err, _opts), do: err

  def handle_response({:ok, %Tesla.Env{status: status, body: body}}, _opts) when status <= 400 do
    {:ok, body}
  end

  def handle_response({:ok, resp}, _opts), do: {:error, resp}
end
controller =
  Pinecone.Controller.new(
    environment: environment,
    api_key: api_key
  )

Kino.nothing()
index_name = "face-search"
Pinecone.Controller.create_index(controller,
  name: index_name,
  dimension: 128,
  metric: "cosine",
  pod_type: "s1"
)
controller
|> Pinecone.Controller.list_indexes()
|> elem(1)
controller
|> Pinecone.Controller.describe_index(index_name)
|> elem(1)
project =
  controller
  |> Pinecone.Controller.describe_index(index_name)
  |> elem(1)
  |> Map.get("status")
  |> Map.get("host")
  |> String.split(".")
  |> Enum.at(0)
  |> String.replace("#{index_name}-", "")

Upsert vectors

client =
  Pinecone.Client.new(
    environment: environment,
    api_key: api_key,
    project: project,
    index: index_name
  )

Kino.nothing()
Pinecone.Vector.upsert(client, %{vectors: vectors})

Search vector

vector =
  vectors
  |> Enum.at(8)
  |> Map.get(:values)
matches =
  client
  |> Pinecone.Vector.query(%{topK: 10, vector: vector})
  |> elem(1)
  |> Map.get("matches")
matches
|> Enum.map(fn match ->
  [
    Kino.Markdown.new("#{match["score"]}"),
    Evision.imread(match["id"])
  ]
  |> Kino.Layout.grid()
end)
|> Kino.Layout.grid(columns: 4)

Delete index

Pinecone.Controller.delete_index(controller, index_name)