Image Processing
Mix.install([
{:nx, "~> 0.6"},
{:kino, "~> 0.12.0"}
])
Parallel Server Module
defmodule ParallelServer do
@doc """
Starts node to be visible in network
"""
def start_node do
# set name of node based on hostname
{:ok, hostname} = :inet.gethostname()
# start node
Node.start(String.to_atom("livebook@#{hostname}"))
# set cookie
Node.set_cookie(:secure)
# return name of node
Node.self()
end
@doc """
Create connections between nodes
"""
def connect_node(node_name) do
# connect node to other node
Node.connect(String.to_atom(node_name))
end
@doc """
Execute a distributed task, expects a partition_fun, function that from data create a list
with information necessary to do the task individually, original_data, that is the whole
data of the problem, a processing_fun that is in charge of process the data for each node,
and finally a merge_function that after each node compute data is in charge of merge the information
"""
def execute_distributed_task(
original_data,
partition_fun,
merge_fun,
persistence_fun,
processing_fun
) do
IO.puts("hola00")
# nodes including itself
nodes = [Node.self() | Node.list()]
n = length(nodes)
# initial state
state = {self(), Enum.map(1..n, fn _ -> false end), Enum.map(1..n, fn _ -> false end)}
# starts monitor PID that will monitor different responses from different nodes
monitor_pid = spawn(fn -> listen_monitor(state) end)
# list with data specific for computation in each node
data_partition_list = partition_fun.(original_data, n)
# for all nodes including itself
[nodes, data_partition_list, 1..n]
|> Enum.zip()
|> Enum.map(fn {node, data_partition_element, i} ->
# starts a process in each node to be prepared for listen
pid = Node.spawn_link(node, fn -> listen_node(processing_fun) end)
# sends a message to each node to start processing of data
send(pid, {:process_data, monitor_pid, data_partition_element, original_data, i})
end)
# after send all the differents pids stays waiting for the final partitioned data to be merges
result =
receive do
{:merge_response, list_to_merge} ->
merge_fun.(list_to_merge)
end
# if some data need to be save images or something
IO.inspect(result)
persistence_fun.(result)
end
@doc """
From the node listen to the monitor orders this just happens once so it does not call listen_node
at the end
"""
def listen_node(processing_fun) do
# this process is initialized from the monitor
receive do
{:process_data, monitor_pid, data_partition, original_data, i} ->
IO.puts("Inicio de Ejecución desde Nodo: #{Node.self()}")
# the "processing_fun" should be declared also in the other node
processed_data = processing_fun.(data_partition, original_data)
# una vez procesada se envia la información al nodo monitor (que inicio la tarea distribuida)
send(monitor_pid, {:processed_data, processed_data, i})
end
end
@doc """
From the node that start the task he will be listening to the different nodes expecting to get responses for
each node
"""
def listen_monitor({pid_origin, data_response, completition_list}) do
receive do
{:processed_data, processed_result, i} ->
# receive information from one node
IO.puts("Recepción de Respuesta desde Nodo Monitor: #{Node.self()}")
# adds ra true into completition list to mantain record of result completition
new_completition_list = List.replace_at(completition_list, i - 1, true)
# adds data response to a list
new_data_response = List.replace_at(data_response, i - 1, processed_result)
IO.inspect({new_completition_list, new_data_response})
# if all data is completed returns response to original PID who is waiting for response
if Enum.all?(new_completition_list, fn x -> x == true end) do
IO.puts("Respuesta completa enviando")
send(pid_origin, {:merge_response, new_data_response})
else
# if is not completed call itself to wait for other responses
listen_monitor({pid_origin, new_data_response, new_completition_list})
end
end
end
end
defmodule TestFunctions do
@doc """
Test function to partition data
"""
def test_partition_fun(data, n) do
data_partition_list = Enum.map(1..n, fn x -> "data #{x}" end)
end
@doc """
Test processing fun
"""
def test_processing_fun(data, original_data) do
# "| #{data} processed by #{Node.self()} |"
data
end
@doc """
Test merge fun
"""
def test_merge_fun(list) do
Enum.reduce(list, "", fn str, acc -> acc <> str end)
end
@doc """
Test persistence fun
"""
def test_persistence_fun(result) do
IO.puts("result")
IO.inspect(result)
end
end
ParallelServer.start_node()
Node.list()
ParallelServer.connect_node("livebook@nerves-cb66.local")
Image Processing Module
image_input = Kino.Input.image("Uploaded Image")
%{file_ref: file_ref, format: :rgb, height: height, width: width} = Kino.Input.read(image_input)
content = file_ref |> Kino.Input.file_path() |> File.read!()
image_tensor =
Nx.from_binary(content, :u8)
|> Nx.reshape({height, width, 3})
Nx.shape(image_tensor)
defmodule ImgManager do
@doc """
Given the image tensor (img_tensor) and number of nodes/process divides the task to run it in parallel
"""
def img_partition_fun({img_tensor, angle}, n) do
{height, width, _} = Nx.shape(img_tensor)
# by default the image if divided vertically
n_pixels = ceil(width / n)
Enum.map(
# iterate over number of nodes/process
1..n,
fn x ->
# divides just j that is the responsible for columns
j_f =
if x * n_pixels - 1 > width do
height
else
x * n_pixels - 1
end
# gives the start and the end of column
%{
j_0: (x - 1) * n_pixels,
j_f: j_f,
i_0: 0,
i_f: height,
angle: angle,
img_height: height,
img_width: width
}
end
)
end
@doc """
Main function, rotates one section the image, data of starting and ending indexs for section in data and original
image in img_tensor
"""
def img_processing_fun(data, par) do
IO.puts("hola0")
{img_tensor, _} = par
# retrieves from dictionary
i_0 = data[:i_0]
i_f = data[:i_f]
j_0 = data[:j_0]
j_f = data[:j_f]
# get angle
angle = data[:angle]
img_height = data[:img_height]
img_width = data[:img_width]
IO.puts("hola1")
# creates the final tensor (rectangular section of the processed image)
cropped_tensor_xd =
Nx.broadcast(Nx.tensor(0, type: {:u, 8}), {i_f - i_0 + 1, j_f - j_0 + 1, 3})
IO.puts("hola")
# cartesian product
indexes = for i <- i_0..i_f, j <- j_0..j_f, do: {i, j}
# processing by each pixel within limits
Enum.reduce(
indexes,
cropped_tensor_xd,
fn val, cropped_tensor ->
# apply transform
{i_new, j_new} = val
x_new = j_new
y_new = img_height - i_new - 1
# apply matrix transformations
# x_old = x_new * cos(-theta) - y_new * sin(-theta)
x_old =
Nx.to_number(
Nx.subtract(Nx.multiply(x_new, Nx.cos(-angle)), Nx.multiply(y_new, Nx.sin(-angle)))
)
# y_old = x_new * sin(-theta) + y_new * sin(-theta)
y_old =
Nx.to_number(
Nx.sum(
Nx.tensor([
Nx.to_number(Nx.multiply(x_new, Nx.sin(-angle))),
Nx.to_number(Nx.multiply(y_new, Nx.cos(-angle)))
])
)
)
# round to let in space the most close pixel if not exact
j_old = round(x_old)
i_old = img_height - round(y_old)
# limits respect old, if it's outside old image put black pixel
if j_old < 0 or j_old >= img_width or i_old < 0 or i_old >= img_height do
# red
cropped_tensor =
Nx.indexed_put(
cropped_tensor,
Nx.tensor([i_new - i_0, j_new - j_0, 0]),
Nx.tensor(0, type: {:u, 8})
)
# green
cropped_tensor =
Nx.indexed_put(
cropped_tensor,
Nx.tensor([i_new - i_0, j_new - j_0, 1]),
Nx.tensor(0, type: {:u, 8})
)
# blue
cropped_tensor =
Nx.indexed_put(
cropped_tensor,
Nx.tensor([i_new - i_0, j_new - j_0, 2]),
Nx.tensor(0, type: {:u, 8})
)
else
# red
cropped_tensor =
Nx.indexed_put(
cropped_tensor,
Nx.tensor([i_new - i_0, j_new - j_0, 0]),
img_tensor[i_old][j_old][0]
)
# green
cropped_tensor =
Nx.indexed_put(
cropped_tensor,
Nx.tensor([i_new - i_0, j_new - j_0, 1]),
img_tensor[i_old][j_old][1]
)
# blue
cropped_tensor =
Nx.indexed_put(
cropped_tensor,
Nx.tensor([i_new - i_0, j_new - j_0, 2]),
img_tensor[i_old][j_old][2]
)
end
end
)
end
@doc """
Function to merge parts in single image
"""
def img_merge_fun(list_parts) do
# with this single line the diferents sections are concatenated to be part of same list
Nx.concatenate(list_parts, axis: 1)
end
@doc """
The function retrieves original image to be displayed
"""
def img_persistence_fun(result) do
# give original tensor
# File.write("files/img_rotated.png",result)
result
end
@doc """
Function to run in
"""
def run_program(tensor, angle) do
{elapsed_time, result} =
:timer.tc(fn ->
ParallelServer.execute_distributed_task(
{tensor, angle},
&ImgManager.img_partition_fun/2,
&ImgManager.img_merge_fun/1,
&ImgManager.img_persistence_fun/1,
&ImgManager.img_processing_fun/2
)
end)
IO.puts("El tiempo total de ejecución es #{elapsed_time / 1_000_000} segundos.")
result
end
end
img_content = ImgManager.run_program(image_tensor, 0.5)
Kino.Image.new(img_content)