Evision.DNN Example - Generic Object Detection Task
# set `EVISION_PREFER_PRECOMPILED` to `false`
# if you prefer `:evision` to be compiled from source
# note that to compile from source, you may need at least 1GB RAM
# System.put_env("EVISION_PREFER_PRECOMPILED", "false")
Mix.install([
{:evision, "~> 0.2"},
{:req, "~> 0.5"},
{:kino, "~> 0.7"}
], system_env: [
# optional, defaults to `true`
# set `EVISION_PREFER_PRECOMPILED` to `false`
# if you prefer `:evision` to be compiled from source
# note that to compile from source, you may need at least 1GB RAM
{"EVISION_PREFER_PRECOMPILED", true},
# optional, defaults to `true`
# set `EVISION_ENABLE_CONTRIB` to `false`
# if you don't need modules from `opencv_contrib`
{"EVISION_ENABLE_CONTRIB", true},
# optional, defaults to `false`
# set `EVISION_ENABLE_CUDA` to `true`
# if you wish to use CUDA related functions
# note that `EVISION_ENABLE_CONTRIB` also has to be `true`
# because cuda related modules come from the `opencv_contrib` repo
{"EVISION_ENABLE_CUDA", false},
# required when
# - `EVISION_ENABLE_CUDA` is `true`
# - and `EVISION_PREFER_PRECOMPILED` is `true`
#
# set `EVISION_CUDA_VERSION` to the version that matches
# your local CUDA runtime version
#
# current available versions are
# - 118
# - 121
{"EVISION_CUDA_VERSION", "118"},
# require for Windows users when
# - `EVISION_ENABLE_CUDA` is `true`
# set `EVISION_CUDA_RUNTIME_DIR` to the directory that contains
# CUDA runtime libraries
{"EVISION_CUDA_RUNTIME_DIR", "C:/PATH/TO/CUDA/RUNTIME"}
])
:ok
Define Some Helper Functions
defmodule Helper do
def download!(url, save_as, overwrite? \\ false) do
unless File.exists?(save_as) do
Req.get!(url, http_errors: :raise, into: File.stream!(save_as), cache: not overwrite?)
end
:ok
end
end
{:module, Helper, <<70, 79, 82, 49, 0, 0, 10, ...>>, {:download!, 3}}
Write the Generic DetectionModel Module
alias Evision, as: Cv
# change to the file's directory
# or somewhere you have write permission
File.cd!(__DIR__)
defmodule DetectionModel do
def visualise_pred(mat, _labels, []), do: {:ok, mat}
def visualise_pred(mat, labels, [translated_out | outs]) do
{:ok, mat} = _visualise_pred(mat, labels, translated_out)
visualise_pred(mat, labels, outs)
end
defp _visualise_pred(mat, _labels, []), do: {:ok, mat}
defp _visualise_pred(mat, labels, [{class_id, confidence, l, t, r, b} | outs]) do
confidence = "#{Float.round(confidence, 2)}"
label = Enum.at(labels, class_id)
text = "#{label}: #{confidence}"
mat = Cv.rectangle(mat, {l, t}, {r, b}, {255, 0, 0})
{{label_weight, label_height}, baseline} =
Cv.getTextSize(text, Cv.Constant.cv_FONT_HERSHEY_SIMPLEX(), 0.5, 1)
label_weight = trunc(label_weight)
label_height = trunc(label_height)
top = max(t, label_height)
mat =
Cv.rectangle(mat, {l, top - label_height}, {l + label_weight, top + baseline}, {
255,
255,
255
})
mat = Cv.putText(mat, text, {l, top}, Cv.Constant.cv_FONT_HERSHEY_SIMPLEX(), 0.5, {0, 0, 255})
_visualise_pred(mat, labels, outs)
end
def postprocess(mat, detections, net, confidence_threshold) do
out_layers = Cv.DNN.Net.getUnconnectedOutLayers(net)
out_layer = Cv.DNN.Net.getLayer(net, Enum.at(out_layers, 0))
out_layer_type = Cv.DNN.Layer.get_type(out_layer) |> IO.iodata_to_binary()
_postprocess(mat, detections, net, confidence_threshold, out_layer_type, [])
end
defp _postprocess(_mat, [], _net, _confidence_threshold, <<"DetectionOutput">>, acc),
do: {:ok, Enum.reverse(acc)}
defp _postprocess(
%Evision.Mat{shape: {h, w, _}} = mat,
[outs | detections],
net,
confidence_threshold,
<<"DetectionOutput">>,
acc
) do
data = Cv.Mat.to_binary(outs)
{:ok, translated_outs} = _translate_outs(confidence_threshold, data, h, w, [])
_postprocess(mat, detections, net, confidence_threshold, "DetectionOutput", [
translated_outs | acc
])
end
defp _translate_outs(_confidence_threshold, <<>>, _h, _w, acc), do: {:ok, acc}
defp _translate_outs(
confidence_threshold,
<<_batch_id::float-size(32)-little, class_id::float-size(32)-little,
confidence::float-size(32)-little, left::float-size(32)-little,
top::float-size(32)-little, right::float-size(32)-little,
bottom::float-size(32)-little, rest::binary>>,
h,
w,
acc
) do
if confidence > confidence_threshold do
[class_id, l, t, r, b] =
Enum.map([class_id, left, top, right, bottom], fn f -> trunc(f) end)
width = r - l + 1
height = b - t + 1
[l, t, r, b] =
if width <= 2 or height <= 2 do
Enum.map([left * w, top * h, right * w, bottom * h], fn f -> trunc(f) end)
else
[l, t, r, b]
end
_translate_outs(confidence_threshold, rest, h, w, [
{class_id - 1, confidence, l, t, r, b} | acc
])
else
_translate_outs(confidence_threshold, rest, h, w, acc)
end
end
def get_labels(class_label_file) do
class_label_file
|> File.read!()
|> String.split("\n")
end
def predict(mat, model, out_names, opts \\ []) do
blob = Cv.DNN.blobFromImage(mat, opts)
model = Cv.DNN.Net.setInput(model, blob, name: "", scalefactor: 1.0, mean: [0, 0, 0])
start_time = :os.system_time(:millisecond)
detections = Cv.DNN.Net.forward(model, outBlobNames: out_names)
end_time = :os.system_time(:millisecond)
IO.puts("Inference time=>#{end_time - start_time} ms")
{:ok, mat, detections}
end
def predict_file(image_file, model, out_names, opts \\ []) do
predict(Cv.imread(image_file), model, out_names, opts)
end
def get_model(params, config, framework \\ "") do
net =
Cv.DNN.readNet(params,
config: config,
framework: framework
)
out_names = Cv.DNN.Net.getUnconnectedOutLayersNames(net)
{:ok, net, out_names}
end
end
{:module, DetectionModel, <<70, 79, 82, 49, 0, 0, 34, ...>>, {:get_model, 3}}
Example Detect Model: SSD MobileNetv2
Basic steps:
- Download model weights and config file, as well as a list of class names.
-
DetectionModel.get_model
. -
DetectionModel.get_labels
. -
DetectionModel.predict
and specify some preprocessing parameters. -
DetectionModel.postprocess
. This translates nerual network outputs to data that is easier to read/use. -
DetectionModel.visualise_pred
if you want to see the result.
defmodule SSDMobileNetV2 do
defp download_model() do
Helper.download!(
"https://raw.githubusercontent.com/opencv/opencv_extra/master/testdata/dnn/ssd_mobilenet_v2_coco_2018_03_29.pbtxt",
"ssd_mobilenet_v2_coco_2018_03_29.pbtxt"
)
Helper.download!(
"https://raw.githubusercontent.com/cocoa-xu/evision/main/test/testdata/models/coco_names.txt",
"coco_names.txt"
)
graph_pb = "ssd_mobilenet_v2_coco_2018_03_29/frozen_inference_graph.pb"
if !File.exists?(graph_pb) do
Helper.download!(
"http://download.tensorflow.org/models/object_detection/ssd_mobilenet_v2_coco_2018_03_29.tar.gz",
"ssd_mobilenet_v2_coco_2018_03_29.tar.gz"
)
"ssd_mobilenet_v2_coco_2018_03_29.tar.gz"
|> File.read!()
|> :zlib.gunzip()
|> then(&:erl_tar.extract({:binary, &1}, [:memory, :compressed]))
|> elem(1)
|> Enum.map(fn {filename, content} -> {List.to_string(filename), content} end)
|> Enum.reject(
&(elem(&1, 0) != "ssd_mobilenet_v2_coco_2018_03_29/frozen_inference_graph.pb")
)
|> Enum.at(0)
|> then(fn {_filename, content} ->
File.mkdir_p!("ssd_mobilenet_v2_coco_2018_03_29")
File.write!(graph_pb, content)
:ok
end)
else
:ok
end
end
def get_detection_model() do
:ok = download_model()
graph_pb = "ssd_mobilenet_v2_coco_2018_03_29/frozen_inference_graph.pb"
{:ok, net, out_names} =
DetectionModel.get_model(
graph_pb,
"ssd_mobilenet_v2_coco_2018_03_29.pbtxt"
)
labels = DetectionModel.get_labels("coco_names.txt")
{net, out_names, labels}
end
def predict_file_and_show(filename, confidence_threshold \\ 0.5) when is_binary(filename) do
{net, out_names, labels} = get_detection_model()
{:ok, mat, detections} =
DetectionModel.predict_file(filename, net, out_names,
scalefactor: 1,
swapRB: true,
mean: [0, 0, 0],
size: [300, 300]
)
{:ok, translated_outs} =
DetectionModel.postprocess(mat, detections, net, confidence_threshold)
{:ok, mat} = DetectionModel.visualise_pred(mat, labels, translated_outs)
mat
end
def predict_and_show(net, out_names, labels, mat, confidence_threshold \\ 0.5)
when is_reference(mat) do
{:ok, mat, detections} =
DetectionModel.predict(mat, net, out_names,
scalefactor: 1,
swapRB: true,
mean: [0, 0, 0],
size: [300, 300]
)
{:ok, translated_outs} =
DetectionModel.postprocess(mat, detections, net, confidence_threshold)
{:ok, mat} = DetectionModel.visualise_pred(mat, labels, translated_outs)
mat
end
end
{:module, SSDMobileNetV2, <<70, 79, 82, 49, 0, 0, 22, ...>>, {:predict_and_show, 5}}
Detect Objects in An Image
Helper.download!(
"https://raw.githubusercontent.com/cocoa-xu/evision/main/test/dnn_detection_test.jpg",
"dnn_detection_test.jpg"
)
%Evision.Mat{type: type} = mat = SSDMobileNetV2.predict_file_and_show("dnn_detection_test.jpg")
Inference time=>35 ms
%Evision.Mat{
channels: 3,
dims: 2,
type: {:u, 8},
raw_type: 16,
shape: {586, 872, 3},
ref: #Reference<0.1834280076.3317825552.216669>
}
Cv.imencode(".png", mat)
|> Kino.Image.new(:png)
Detect Objects in a Video Stream
# if you have a camera available
# uncomment the line below
# video = Cv.VideoCapture.videoCapture(0)
# or the OpenCV library you compiled can decode video files
# uncomment the line below
# video = Cv.VideoCapture.videoCapture("/path/to/your/video/file")
nil
defmodule VideoDetection do
def detect(video, widget, max_frames \\ 30 * 60)
when is_reference(video) and is_integer(max_frames) do
{net, out_names, labels} = SSDMobileNetV2.get_detection_model()
frame_read = Cv.VideoCapture.read(video)
_detect(net, out_names, labels, frame_read, video, widget, max_frames)
end
defp _detect(_, _, _, _, _, _, 0), do: :ok
defp _detect(net, out_names, labels, frame, video, widget, left_frames)
when left_frames > 0 or left_frames < 0 do
mat = SSDMobileNetV2.predict_and_show(net, out_names, labels, frame)
Cv.imencode(".png", mat)
|> Kino.Image.new(:png)
|> then(&Kino.Frame.render(widget, &1))
frame_read = Cv.VideoCapture.read(video)
if left_frames > 0 do
_detect(net, out_names, labels, frame_read, video, widget, left_frames - 1)
else
_detect(net, out_names, labels, frame_read, video, widget, left_frames)
end
end
end
{:module, VideoDetection, <<70, 79, 82, 49, 0, 0, 12, ...>>, {:_detect, 7}}
Run the detection
widget = Kino.Frame.new() |> Kino.render()
# read 1,800 frames at most
# change the number to negative values to
# detect objects until OpenCV cannot read
# new frame from the video stream
VideoDetection.detect(video, widget, 1800)
"Inference time=>48 ms"