Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Image Processing

lib/grupo_1/ImageProcessing.livemd

Image Processing

Mix.install([
  {:nx, "~> 0.6"},
  {:kino, "~> 0.12.0"}
])

Parallel Server Module

defmodule ParallelServer do
  @doc """
  Starts node to be visible in network
  """
  def start_node do
    # set name of node based on hostname
    {:ok, hostname} = :inet.gethostname()
    # start node
    Node.start(String.to_atom("livebook@#{hostname}"))
    # set cookie
    Node.set_cookie(:secure)
    # return name of node
    Node.self()
  end

  @doc """
  Create connections between nodes
  """
  def connect_node(node_name) do
    # connect node to other node
    Node.connect(String.to_atom(node_name))
  end

  @doc """
  Execute a distributed task, expects a partition_fun, function that from data create a list
  with information necessary to do the task individually, original_data, that is the whole
  data of the problem, a processing_fun that is in charge of process the data for each node,
  and finally a merge_function that after each node compute data is in charge of merge the information
  """
  def execute_distributed_task(
        original_data,
        partition_fun,
        merge_fun,
        persistence_fun,
        processing_fun
      ) do
    IO.puts("hola00")
    # nodes including itself
    nodes = [Node.self() | Node.list()]
    n = length(nodes)
    # initial state
    state = {self(), Enum.map(1..n, fn _ -> false end), Enum.map(1..n, fn _ -> false end)}
    # starts monitor PID that will monitor different responses from different nodes
    monitor_pid = spawn(fn -> listen_monitor(state) end)
    # list with data specific for computation in each node
    data_partition_list = partition_fun.(original_data, n)
    # for all nodes including itself
    [nodes, data_partition_list, 1..n]
    |> Enum.zip()
    |> Enum.map(fn {node, data_partition_element, i} ->
      # starts a process in each node to be prepared for listen
      pid = Node.spawn_link(node, fn -> listen_node(processing_fun) end)
      # sends a message to each node to start processing of data
      send(pid, {:process_data, monitor_pid, data_partition_element, original_data, i})
    end)

    # after send all the differents pids stays waiting for the final partitioned data to be merges
    result =
      receive do
        {:merge_response, list_to_merge} ->
          merge_fun.(list_to_merge)
      end

    # if some data need to be save images or something
    IO.inspect(result)
    persistence_fun.(result)
  end

  @doc """
  From the node listen to the monitor orders this just happens once so it does not call listen_node
  at the end
  """
  def listen_node(processing_fun) do
    # this process is initialized from the monitor
    receive do
      {:process_data, monitor_pid, data_partition, original_data, i} ->
        IO.puts("Inicio de Ejecución desde Nodo: #{Node.self()}")
        # the "processing_fun" should be declared also in the other node
        processed_data = processing_fun.(data_partition, original_data)

        # una vez procesada se envia la información al nodo monitor (que inicio la tarea distribuida)
        send(monitor_pid, {:processed_data, processed_data, i})
    end
  end

  @doc """
  From the node that start the task he will be listening to the different nodes expecting to get responses for
  each node
  """
  def listen_monitor({pid_origin, data_response, completition_list}) do
    receive do
      {:processed_data, processed_result, i} ->
        # receive information from one node
        IO.puts("Recepción de Respuesta desde Nodo Monitor: #{Node.self()}")
        # adds ra true into completition list to mantain record of result completition
        new_completition_list = List.replace_at(completition_list, i - 1, true)
        # adds data response to a list
        new_data_response = List.replace_at(data_response, i - 1, processed_result)
        IO.inspect({new_completition_list, new_data_response})
        # if all data is completed returns response to original PID who is waiting for response
        if Enum.all?(new_completition_list, fn x -> x == true end) do
          IO.puts("Respuesta completa enviando")
          send(pid_origin, {:merge_response, new_data_response})
        else
          # if is not completed call itself to wait for other responses
          listen_monitor({pid_origin, new_data_response, new_completition_list})
        end
    end
  end
end

defmodule TestFunctions do
  @doc """
  Test function to partition data
  """
  def test_partition_fun(data, n) do
    data_partition_list = Enum.map(1..n, fn x -> "data #{x}" end)
  end

  @doc """
  Test processing fun
  """
  def test_processing_fun(data, original_data) do
    # "| #{data} processed by #{Node.self()} |"
    data
  end

  @doc """
  Test merge fun
  """
  def test_merge_fun(list) do
    Enum.reduce(list, "", fn str, acc -> acc <> str end)
  end

  @doc """
  Test persistence fun
  """
  def test_persistence_fun(result) do
    IO.puts("result")
    IO.inspect(result)
  end
end
ParallelServer.start_node()
Node.list()
ParallelServer.connect_node("livebook@nerves-cb66.local")

Image Processing Module

image_input = Kino.Input.image("Uploaded Image")
%{file_ref: file_ref, format: :rgb, height: height, width: width} = Kino.Input.read(image_input)

content = file_ref |> Kino.Input.file_path() |> File.read!()

image_tensor =
  Nx.from_binary(content, :u8)
  |> Nx.reshape({height, width, 3})
Nx.shape(image_tensor)
defmodule ImgManager do
  @doc """
  Given the image tensor (img_tensor) and number of nodes/process divides the task to run it in parallel
  """
  def img_partition_fun({img_tensor, angle}, n) do
    {height, width, _} = Nx.shape(img_tensor)
    # by default the image if divided vertically
    n_pixels = ceil(width / n)

    Enum.map(
      # iterate over number of nodes/process
      1..n,
      fn x ->
        # divides just j that is the responsible for columns
        j_f =
          if x * n_pixels - 1 > width do
            height
          else
            x * n_pixels - 1
          end

        # gives the start and the end of column
        %{
          j_0: (x - 1) * n_pixels,
          j_f: j_f,
          i_0: 0,
          i_f: height,
          angle: angle,
          img_height: height,
          img_width: width
        }
      end
    )
  end

  @doc """
  Main function, rotates one section the image, data of starting and ending indexs for section in data and original
  image in img_tensor
  """
  def img_processing_fun(data, par) do
    IO.puts("hola0")
    {img_tensor, _} = par
    # retrieves from dictionary
    i_0 = data[:i_0]
    i_f = data[:i_f]
    j_0 = data[:j_0]
    j_f = data[:j_f]
    # get angle
    angle = data[:angle]
    img_height = data[:img_height]
    img_width = data[:img_width]
    IO.puts("hola1")
    # creates the final tensor (rectangular section of the processed image)
    cropped_tensor_xd =
      Nx.broadcast(Nx.tensor(0, type: {:u, 8}), {i_f - i_0 + 1, j_f - j_0 + 1, 3})

    IO.puts("hola")
    # cartesian product
    indexes = for i <- i_0..i_f, j <- j_0..j_f, do: {i, j}

    # processing by each pixel within limits
    Enum.reduce(
      indexes,
      cropped_tensor_xd,
      fn val, cropped_tensor ->
        # apply transform
        {i_new, j_new} = val
        x_new = j_new
        y_new = img_height - i_new - 1

        # apply matrix transformations
        # x_old = x_new * cos(-theta) - y_new * sin(-theta)
        x_old =
          Nx.to_number(
            Nx.subtract(Nx.multiply(x_new, Nx.cos(-angle)), Nx.multiply(y_new, Nx.sin(-angle)))
          )

        # y_old = x_new * sin(-theta) + y_new * sin(-theta)
        y_old =
          Nx.to_number(
            Nx.sum(
              Nx.tensor([
                Nx.to_number(Nx.multiply(x_new, Nx.sin(-angle))),
                Nx.to_number(Nx.multiply(y_new, Nx.cos(-angle)))
              ])
            )
          )

        # round to let in space the most close pixel if not exact
        j_old = round(x_old)
        i_old = img_height - round(y_old)
        # limits respect old, if it's outside old image put black pixel
        if j_old < 0 or j_old >= img_width or i_old < 0 or i_old >= img_height do
          # red
          cropped_tensor =
            Nx.indexed_put(
              cropped_tensor,
              Nx.tensor([i_new - i_0, j_new - j_0, 0]),
              Nx.tensor(0, type: {:u, 8})
            )

          # green
          cropped_tensor =
            Nx.indexed_put(
              cropped_tensor,
              Nx.tensor([i_new - i_0, j_new - j_0, 1]),
              Nx.tensor(0, type: {:u, 8})
            )

          # blue
          cropped_tensor =
            Nx.indexed_put(
              cropped_tensor,
              Nx.tensor([i_new - i_0, j_new - j_0, 2]),
              Nx.tensor(0, type: {:u, 8})
            )
        else
          # red
          cropped_tensor =
            Nx.indexed_put(
              cropped_tensor,
              Nx.tensor([i_new - i_0, j_new - j_0, 0]),
              img_tensor[i_old][j_old][0]
            )

          # green
          cropped_tensor =
            Nx.indexed_put(
              cropped_tensor,
              Nx.tensor([i_new - i_0, j_new - j_0, 1]),
              img_tensor[i_old][j_old][1]
            )

          # blue
          cropped_tensor =
            Nx.indexed_put(
              cropped_tensor,
              Nx.tensor([i_new - i_0, j_new - j_0, 2]),
              img_tensor[i_old][j_old][2]
            )
        end
      end
    )
  end

  @doc """
  Function to merge parts in single image
  """
  def img_merge_fun(list_parts) do
    # with this single line the diferents sections are concatenated to be part of same list
    Nx.concatenate(list_parts, axis: 1)
  end

  @doc """
  The function retrieves original image to be displayed
  """
  def img_persistence_fun(result) do
    # give original tensor
    # File.write("files/img_rotated.png",result)
    result
  end

  @doc """
  Function to run in
  """
  def run_program(tensor, angle) do
    {elapsed_time, result} =
      :timer.tc(fn ->
        ParallelServer.execute_distributed_task(
          {tensor, angle},
          &amp;ImgManager.img_partition_fun/2,
          &amp;ImgManager.img_merge_fun/1,
          &amp;ImgManager.img_persistence_fun/1,
          &amp;ImgManager.img_processing_fun/2
        )
      end)

    IO.puts("El tiempo total de ejecución es #{elapsed_time / 1_000_000} segundos.")
    result
  end
end
img_content = ImgManager.run_program(image_tensor, 0.5)
Kino.Image.new(img_content)