Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Text Processing

lib/grupo_1/TextProcessing.livemd

Text Processing

Mix.install([
  {:nx, "~> 0.6"},
  {:kino, "~> 0.12.0"}
])

Parallel Server Module

defmodule ParallelServer do
  @doc """
  Starts node to be visible in network
  """
  def start_node do
    # set name of node based on hostname
    {:ok, hostname} = :inet.gethostname()
    # start node
    Node.start(String.to_atom("livebook@#{hostname}"))
    # set cookie
    Node.set_cookie(:secure)
    # return name of node
    Node.self()
  end

  @doc """
  Create connections between nodes
  """
  def connect_node(node_name) do
    # connect node to other node
    Node.connect(String.to_atom(node_name))
  end

  @doc """
  Execute a distributed task, expects a partition_fun, function that from data create a list
  with information necessary to do the task individually, original_data, that is the whole
  data of the problem, a processing_fun that is in charge of process the data for each node,
  and finally a merge_function that after each node compute data is in charge of merge the information
  """
  def execute_distributed_task(
        original_data,
        partition_fun,
        merge_fun,
        persistence_fun,
        processing_fun
      ) do
    IO.puts("hola00")
    # nodes including itself
    nodes = [Node.self() | Node.list()]
    n = length(nodes)
    # initial state
    state = {self(), Enum.map(1..n, fn _ -> false end), Enum.map(1..n, fn _ -> false end)}
    # starts monitor PID that will monitor different responses from different nodes
    monitor_pid = spawn(fn -> listen_monitor(state) end)
    # list with data specific for computation in each node
    data_partition_list = partition_fun.(original_data, n)
    # for all nodes including itself
    [nodes, data_partition_list, 1..n]
    |> Enum.zip()
    |> Enum.map(fn {node, data_partition_element, i} ->
      # starts a process in each node to be prepared for listen
      pid = Node.spawn_link(node, fn -> listen_node(processing_fun) end)
      # sends a message to each node to start processing of data
      send(pid, {:process_data, monitor_pid, data_partition_element, original_data, i})
    end)

    # after send all the differents pids stays waiting for the final partitioned data to be merges
    result =
      receive do
        {:merge_response, list_to_merge} ->
          merge_fun.(list_to_merge)
      end

    # if some data need to be save images or something
    IO.inspect(result)
    persistence_fun.(result)
  end

  @doc """
  From the node listen to the monitor orders this just happens once so it does not call listen_node
  at the end
  """
  def listen_node(processing_fun) do
    # this process is initialized from the monitor
    receive do
      {:process_data, monitor_pid, data_partition, original_data, i} ->
        IO.puts("Inicio de Ejecución desde Nodo: #{Node.self()}")
        # the "processing_fun" should be declared also in the other node
        processed_data = processing_fun.(data_partition, original_data)

        # una vez procesada se envia la información al nodo monitor (que inicio la tarea distribuida)
        send(monitor_pid, {:processed_data, processed_data, i})
    end
  end

  @doc """
  From the node that start the task he will be listening to the different nodes expecting to get responses for
  each node
  """
  def listen_monitor({pid_origin, data_response, completition_list}) do
    receive do
      {:processed_data, processed_result, i} ->
        # receive information from one node
        IO.puts("Recepción de Respuesta desde Nodo Monitor: #{Node.self()}")
        # adds ra true into completition list to mantain record of result completition
        new_completition_list = List.replace_at(completition_list, i - 1, true)
        # adds data response to a list
        new_data_response = List.replace_at(data_response, i - 1, processed_result)
        IO.inspect({new_completition_list, new_data_response})
        # if all data is completed returns response to original PID who is waiting for response
        if Enum.all?(new_completition_list, fn x -> x == true end) do
          IO.puts("Respuesta completa enviando")
          send(pid_origin, {:merge_response, new_data_response})
        else
          # if is not completed call itself to wait for other responses
          listen_monitor({pid_origin, new_data_response, new_completition_list})
        end
    end
  end
end

defmodule TestFunctions do
  @doc """
  Test function to partition data
  """
  def test_partition_fun(data, n) do
    data_partition_list = Enum.map(1..n, fn x -> "data #{x}" end)
  end

  @doc """
  Test processing fun
  """
  def test_processing_fun(data, original_data) do
    # "| #{data} processed by #{Node.self()} |"
    data
  end

  @doc """
  Test merge fun
  """
  def test_merge_fun(list) do
    Enum.reduce(list, "", fn str, acc -> acc <> str end)
  end

  @doc """
  Test persistence fun
  """
  def test_persistence_fun(result) do
    IO.puts("result")
    IO.inspect(result)
  end
end
ParallelServer.start_node()
Node.list()
ParallelServer.connect_node("livebook@nerves-cb66.local")

Text Processing Module

text_input = Kino.Input.textarea("Uploaded Text")
content = Kino.Input.read(text_input)
defmodule Count do
  def read(file) do
    {:ok, string} = File.read("lib/#{file}")
    string
  end

  def processing_fun(list, _) do
    list
    |> Enum.filter(fn element ->
      Regex.match?(~r/\A\d+\z|\A[a-zA-Z]+\z|\A[a-zA-Z]+'[a-zA-Z]+\z/, element)
    end)
    |> Enum.reduce(%{}, fn palabra, conteo -> Map.update(conteo, palabra, 1, &amp;(&amp;1 + 1)) end)
  end

  def partition_fun(string, n) do
    string
    |> String.downcase()
    |> (fn cadena -> Regex.replace(~r/[^a-zA-Z0-9'\s]/, cadena, "") end).()

    words = String.split(string, ~r/\s+/, trim: true)

    total_words = length(words)
    words_per_list = div(total_words, n)
    leftover_words = rem(total_words, n)

    lists = distribute_words(words, words_per_list, leftover_words)
    lists
  end

  defp distribute_words(words, words_per_list, leftover_words) do
    Enum.chunk_every(words, words_per_list + if(leftover_words > 0, do: 1, else: 0))
  end

  def merge_fun(dictionaries) do
    Enum.reduce(dictionaries, %{}, fn dict, acc ->
      Enum.reduce(dict, acc, fn {key, value}, acc_dict ->
        Map.update(acc_dict, key, value, &amp;(&amp;1 + value))
      end)
    end)
  end

  def persistence_fun(dict) do
    IO.inspect(dict)
  end

  def run_program(text) do
    {elapsed_time, _result} =
      :timer.tc(fn ->
        ParallelServer.execute_distributed_task(
          text,
          &amp;Count.partition_fun/2,
          &amp;Count.merge_fun/1,
          &amp;Count.persistence_fun/1,
          &amp;Count.processing_fun/2
        )
      end)

    IO.puts("El tiempo total de ejecución es #{elapsed_time / 1_000_000} segundos.")
  end
end
img_content = ImgManager.run_program(content)