Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Ortex YOLOv2

livebooks/ortex/ortex_yolov2.livemd

Ortex YOLOv2

Mix.install(
  [
    {:exla, "~> 0.8"},
    {:stb_image, "~> 0.6"},
    {:req, "~> 0.5"},
    {:kino, "~> 0.14"},
    {:ortex, "~> 0.1"}
  ],
  config: [nx: [default_backend: EXLA.Backend]]
)

Load models

classes =
  "https://raw.githubusercontent.com/pjreddie/darknet/master/data/coco.names"
  |> Req.get!()
  |> then(&String.split(&1.body, "\n"))
  |> Enum.map(&String.trim(&1))
  |> Enum.filter(&(String.length(&1) > 0))
num_classes = Enum.count(classes)
model_path = "/tmp/yolov2.onnx"

unless File.exists?(model_path) do
  "https://media.githubusercontent.com/media/onnx/models/main/validated/vision/object_detection_segmentation/yolov2-coco/model/yolov2-coco-9.onnx?download=true"
  |> Req.get!(connect_options: [timeout: 300_000], into: File.stream!(model_path))
end

model = Ortex.load(model_path)
serving = Nx.Serving.new(Ortex.Serving, model)

Anchors

anchors =
  Nx.tensor([
    [0.57273, 0.677385],
    [1.87446, 2.06253],
    [3.33843, 5.47434],
    [7.88282, 3.52778],
    [9.77052, 9.16828]
  ])
num_anchors =
  anchors
  |> Nx.shape()
  |> elem(0)
anchors_tensor = Nx.reshape(anchors, {1, 1, 1, num_anchors, 2})

Load image

img_tensor =
  "https://raw.githubusercontent.com/pjreddie/darknet/master/data/dog.jpg"
  |> Req.get!()
  |> then(&StbImage.read_binary!(&1.body))
  |> StbImage.resize(416, 416)
  |> StbImage.to_nx()

Kino.Image.new(img_tensor)
nx_channels = Nx.axis_size(img_tensor, 2)

img_tensor =
  case nx_channels do
    3 -> img_tensor
    4 -> Nx.slice(img_tensor, [0, 0, 0], [416, 416, 3])
  end
  |> Nx.divide(255)
  |> Nx.transpose(axes: [2, 0, 1])
batch = Nx.Batch.stack([img_tensor])

Predict

feats =
  serving
  |> Nx.Serving.run(batch)
  |> Nx.backend_transfer()
  |> elem(0)

Parse results

{_, _, num_y_blocks, num_x_blocks} = Nx.shape(feats)
feats =
  feats
  |> Nx.transpose(axes: [0, 2, 3, 1])
  |> Nx.reshape({1, num_y_blocks, num_x_blocks, num_anchors, num_classes + 5})
conv_height_index =
  Nx.iota({num_y_blocks})
  |> Nx.tile([num_x_blocks])

conv_width_index =
  Nx.iota({num_x_blocks})
  |> Nx.reshape({1, num_x_blocks})
  |> Nx.tile([num_y_blocks, 1])
  |> Nx.transpose()
  |> Nx.flatten()

conv_index =
  Nx.stack([conv_height_index, conv_width_index])
  |> Nx.transpose()
  |> Nx.reshape({1, num_x_blocks, num_y_blocks, 1, 2})
  |> Nx.as_type({:f, 32})
conv_tensor =
  Nx.tensor([num_y_blocks, num_x_blocks])
  |> Nx.reshape({1, 1, 1, 1, 2})
  |> Nx.as_type({:f, 32})
box_xy =
  feats[[0..0, 0..(num_y_blocks - 1), 0..(num_x_blocks - 1), 0..(num_anchors - 1), 0..1]]
  |> Nx.sigmoid()
  |> Nx.add(conv_index)
  |> Nx.divide(conv_tensor)
box_wh =
  feats[[0..0, 0..(num_y_blocks - 1), 0..(num_x_blocks - 1), 0..(num_anchors - 1), 2..3]]
  |> Nx.exp()
  |> Nx.multiply(anchors_tensor)
  |> Nx.divide(conv_tensor)
box_mins =
  box_xy
  |> Nx.subtract(Nx.divide(box_wh, 2.0))
box_maxes =
  box_xy
  |> Nx.add(Nx.divide(box_wh, 2))
box_list =
  Nx.concatenate([
    box_mins[[0..0, 0..(num_y_blocks - 1), 0..(num_x_blocks - 1), 0..(num_anchors - 1), 0..0]],
    box_mins[[0..0, 0..(num_y_blocks - 1), 0..(num_x_blocks - 1), 0..(num_anchors - 1), 1..1]],
    box_maxes[[0..0, 0..(num_y_blocks - 1), 0..(num_x_blocks - 1), 0..(num_anchors - 1), 0..0]],
    box_maxes[[0..0, 0..(num_y_blocks - 1), 0..(num_x_blocks - 1), 0..(num_anchors - 1), 1..1]]
  ])
  |> Nx.transpose(axes: [4, 1, 2, 3, 0])
box_confidence =
  Nx.sigmoid(
    feats[
      [
        0..0,
        0..(num_y_blocks - 1),
        0..(num_x_blocks - 1),
        0..(num_anchors - 1),
        4..4
      ]
    ]
  )
box_class_probs =
  feats[
    [
      0..0,
      0..(num_y_blocks - 1),
      0..(num_x_blocks - 1),
      0..(num_anchors - 1),
      5..(num_classes + 4)
    ]
  ]
exp = Nx.exp(box_class_probs)
exp_sum =
  box_class_probs
  |> Nx.exp()
  |> Nx.sum(axes: [4])
  |> Nx.reshape({1, num_y_blocks, num_x_blocks, num_anchors, 1})
  |> Nx.broadcast({1, num_y_blocks, num_x_blocks, num_anchors, num_classes})
box_class_probs = Nx.divide(exp, exp_sum)
box_class_probs[0][0][0][0]
|> Nx.to_flat_list()
|> Enum.sum()
|> IO.inspect()

box_class_probs[0][0][0][1]
|> Nx.to_flat_list()
|> Enum.sum()
|> IO.inspect()

box_class_probs[0][0][0][-1]
|> Nx.to_flat_list()
|> Enum.sum()
|> IO.inspect()
box_scores = Nx.multiply(box_confidence, box_class_probs)
box_classes =
  box_scores
  |> Nx.argmax(axis: -1)
  |> Nx.reshape({1, num_y_blocks, num_x_blocks, num_anchors, 1})
box_class_scores =
  box_scores
  |> Nx.reduce_max(axes: [-1])
  |> Nx.reshape({1, num_y_blocks, num_x_blocks, num_anchors, 1})
score_threshold = 0.5
prediction_mask = Nx.greater(box_class_scores, score_threshold)
joined =
  Nx.concatenate(
    [
      prediction_mask,
      box_confidence,
      box_class_scores,
      box_classes,
      box_list
    ],
    axis: 4
  )
  |> Nx.reshape({num_y_blocks * num_x_blocks * num_anchors, 8})
masked_index_list =
  prediction_mask
  |> Nx.to_flat_list()
  |> Enum.with_index()
  |> Enum.filter(fn {value, _} -> value == 1 end)
  |> Enum.map(&elem(&1, 1))
  |> Nx.tensor()
selected_predictions = Nx.take(joined, masked_index_list)
formed_predictions =
  selected_predictions
  |> Nx.to_batched(1)
  |> Enum.map(fn t ->
    %{
      box: t[0][[4..7]] |> Nx.to_flat_list(),
      score: t[0][2] |> Nx.to_number(),
      class: t[0][3] |> Nx.to_number() |> trunc()
    }
  end)

NMS

iou_nx = fn a, b, a_area, b_area ->
  num_b = Nx.shape(b) |> elem(0)
  # xmin
  abx_mn = Nx.max(a[0], b[[0..(num_b - 1), 0]])
  # ymin
  aby_mn = Nx.max(a[1], b[[0..(num_b - 1), 1]])
  # xmax
  abx_mx = Nx.min(a[2], b[[0..(num_b - 1), 2]])
  # ymax
  aby_mx = Nx.min(a[3], b[[0..(num_b - 1), 3]])
  w = Nx.subtract(abx_mx, abx_mn)
  h = Nx.subtract(aby_mx, aby_mn)
  intersect = Nx.multiply(w, h)

  Nx.divide(intersect, Nx.subtract(Nx.add(a_area, b_area), intersect))
end
box_list =
  formed_predictions
  |> Enum.map(& &1.box)
  |> Nx.tensor()

score_list =
  formed_predictions
  |> Enum.map(& &1.score)
  |> Nx.tensor()

nms_threshold = 0.7
nms = fn bboxes, scores, iou_threshold ->
  num_boxes = Nx.shape(bboxes) |> elem(0)

  areas =
    Nx.multiply(
      Nx.subtract(bboxes[[0..(num_boxes - 1), 2]], bboxes[[0..(num_boxes - 1), 0]]),
      Nx.subtract(bboxes[[0..(num_boxes - 1), 3]], bboxes[[0..(num_boxes - 1), 1]])
    )

  bboxes
  |> Nx.to_batched(1)
  |> Enum.with_index()
  |> Enum.map(fn {box, index} ->
    box[0]
    |> iou_nx.(bboxes, areas[index], areas)
    |> Nx.greater(iou_threshold)
    |> Nx.to_flat_list()
  end)
  |> Enum.uniq()
  |> Enum.map(fn mask_list ->
    duplicated_index_list =
      mask_list
      |> Enum.with_index()
      |> Enum.filter(fn {value, _} -> value == 1 end)
      |> Enum.map(&elem(&1, 1))

    max_index =
      scores
      |> Nx.take(duplicated_index_list |> Nx.tensor())
      |> Nx.argmax()
      |> Nx.to_number()

    Enum.at(duplicated_index_list, max_index)
  end)
  |> Enum.uniq()
end
index_list = nms.(box_list, score_list, nms_threshold)
selected_predictions = Enum.map(index_list, &Enum.at(formed_predictions, &1))

Visualize results

img_tensor =
  "https://raw.githubusercontent.com/pjreddie/darknet/master/data/dog.jpg"
  |> Req.get!()
  |> then(&StbImage.read_binary!(&1.body))
  |> StbImage.to_nx()

{height, width, _} = Nx.shape(img_tensor)
croped_list =
  selected_predictions
  |> Enum.map(fn prediction ->
    classes
    |> Enum.at(prediction.class)
    |> IO.inspect()

    box = prediction.box
    left = Enum.at(box, 0) |> Kernel.*(width) |> trunc()
    top = Enum.at(box, 1) |> Kernel.*(height) |> trunc()
    right = Enum.at(box, 2) |> Kernel.*(width) |> trunc()
    bottom = Enum.at(box, 3) |> Kernel.*(height) |> trunc()

    img_tensor[[top..bottom, left..right, 0..2]]
    |> Kino.Image.new()
  end)
  |> Kino.Layout.grid()