Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Untitled notebook

ortex_yolo.livemd

Untitled notebook

Mix.install([
  {:ortex, "~> 0.1.7"},
  {:image, "~> 0.38.2"},
  {:kino, "~> 0.11.0"}
])

Section

Install yolo and export the model:

$ pip install ultralytics
$ yolo export model=yolov8n.pt format=onnx imgsz=640
path = "/home/kuku/MachineLearning/ultralytics/yolov8n.onnx"
model = Ortex.load(path)
image =
  "/home/kuku/cat_dog.jpg"
  |> Image.open!()
  |> Image.embed!(640, 640)

image_tensor =
  image
  |> Image.to_nx!()
  |> Nx.transpose(axes: [2, 0, 1])
  |> Nx.as_type(:f32)
  |> Nx.divide(255)

batch = Nx.new_axis(image_tensor, 0)
make_img = fn ->
  {:ok, image} =
    image_tensor
    |> Nx.transpose(axes: [1, 2, 0])
    |> Nx.multiply(255)
    |> Image.from_nx()

  image
end

make_img.()
defmodule Util do
  @classes [
    "person",
    "bicycle",
    "car",
    "motorcycle",
    "airplane",
    "bus",
    "train",
    "truck",
    "boat",
    "traffic light",
    "fire hydrant",
    "stop sign",
    "parking meter",
    "bench",
    "bird",
    "cat",
    "dog",
    "horse",
    "sheep",
    "cow",
    "elephant",
    "bear",
    "zebra",
    "giraffe",
    "backpack",
    "umbrella",
    "handbag",
    "tie",
    "suitcase",
    "frisbee",
    "skis",
    "snowboard",
    "sports ball",
    "kite",
    "baseball bat",
    "baseball glove",
    "skateboard",
    "surfboard",
    "tennis racket",
    "bottle",
    "wine glass",
    "cup",
    "fork",
    "knife",
    "spoon",
    "bowl",
    "banana",
    "apple",
    "sandwich",
    "orange",
    "broccoli",
    "carrot",
    "hot dog",
    "pizza",
    "donut",
    "cake",
    "chair",
    "couch",
    "potted plant",
    "bed",
    "dining table",
    "toilet",
    "tv",
    "laptop",
    "mouse",
    "remote",
    "keyboard",
    "cell phone",
    "microwave",
    "oven",
    "toaster",
    "sink",
    "refrigerator",
    "book",
    "clock",
    "vase",
    "scissors",
    "teddy bear",
    "hair drier",
    "toothbrush"
  ]

  def draw_bboxes(bboxes, image) do
    Enum.reduce(bboxes, image, fn boxes, image ->
      Enum.reduce(boxes, image, fn
        [], image ->
          image

        [cx, cy, w, h | _], image ->
          Image.Draw.rect!(
            image,
            round(cx - w / 2),
            round(cy - h / 2),
            round(w),
            round(h),
            fill: false,
            color: :red
          )
      end)
    end)
  end

  def draw_bbox_labels(bboxes, image) do
    bboxes
    |> Enum.zip(@classes)
    |> Enum.reduce(image, fn {boxes, class_name}, image ->
      Enum.reduce(boxes, image, fn
        [], image ->
          image

        [_cx, _cy, w, h | _], image when w < 5 or h < 5 ->
          image

        [cx, cy, w, h | _], image ->
          {text_image, _alpha} =
            class_name
            |> Image.Text.text!(
              font_size: 24,
              padding: [4, 0],
              background_fill_color: :blue,
              text_fill_color: :white
            )
            |> Image.split_alpha()

          Image.Draw.rect!(
            image,
            round(cx - w / 2),
            round(cy - h / 2),
            round(w),
            round(h),
            fill: false,
            color: :red
          )
          |> Image.Draw.image!(
            text_image,
            min(max(round(cx - w / 2), 0), 640),
            min(max(round(cy - h / 2 - 25), 0), 640)
          )
      end)
    end)
  end

  def nms(boxes, prob_thresh \\ 0.8, iou_thresh \\ 0.8) do
    {_anchors, data} = Nx.shape(boxes)

    0..(data - 5)
    |> Enum.map(fn idx ->
      probs =
        boxes
        |> Nx.slice_along_axis(4 + idx, 1, axis: 1)
        |> Nx.reshape({:auto})

      argsort = Nx.argsort(probs, direction: :desc)

      boxes_ordered = Nx.take(Nx.slice_along_axis(boxes, 0, 4, axis: 1), argsort)
      probs_ordered = Nx.new_axis(Nx.take(probs, argsort), 1)

      concated = Nx.concatenate([boxes_ordered, probs_ordered], axis: 1)

      above_thresh =
        concated
        |> Nx.to_batched(1)
        |> Stream.map(&amp;Nx.to_flat_list/1)
        |> Enum.take_while(fn [_, _, _, _, prob] -> prob > prob_thresh end)

      do_nms(above_thresh, [], iou_thresh)
    end)
  end

  def do_nms([], results, _iou_thresh), do: results

  def do_nms([box1 | rest], results, iou_thresh) do
    rest =
      rest
      |> Stream.map(fn box2 -> {box2, iou(box1, box2)} end)
      |> Stream.reject(fn {_box2, iou} -> iou > iou_thresh end)
      |> Enum.map(fn {bbox2, _iou} -> bbox2 end)

    do_nms(rest, [box1 | results], iou_thresh)
  end

  def iou([x1, y1, w1, h1 | _], [x2, y2, w2, h2 | _]) do
    area1 = w1 * h1
    area2 = w2 * h2

    xx = max(x1 - w1 / 2, x2 - w2 / 2)
    yy = max(y1 - h1 / 2, y2 - h2 / 2)
    aa = min(x1 + w1 / 2, x2 + w2 / 2)
    bb = min(y1 + h2 / 2, y2 + h2 / 2)

    w = max(0, aa - xx)
    h = max(0, bb - yy)

    intersection_area = w * h

    union_area = area1 + area2 - intersection_area

    intersection_area / union_area
  end

  def filter_predictions(bboxes, thresh \\ 0.5) do
    boxes = Nx.slice(bboxes, [0, 0], [8400, 4])
    probs = Nx.slice(bboxes, [0, 4], [8400, 80])
    max_prob = Nx.reduce_max(probs, axes: [1])
    sorted_idxs = Nx.argsort(max_prob, direction: :desc)
    boxes = Nx.take(Nx.concatenate([boxes, Nx.new_axis(max_prob, 1)], axis: 1), sorted_idxs)
    Enum.take_while(Nx.to_list(boxes), fn [_, _, _, _, prob] -> prob > thresh end)
  end
end
{pred} = Ortex.run(model, batch)

pred[0]
|> Nx.backend_transfer()
|> Nx.transpose(axes: [1, 0])
|> Util.nms(0.1, 0.1)
|> Util.draw_bbox_labels(make_img.())