Text Processing
Mix.install([
{:nx, "~> 0.6"},
{:kino, "~> 0.12.0"}
])
Parallel Server Module
defmodule ParallelServer do
@doc """
Starts node to be visible in network
"""
def start_node do
# set name of node based on hostname
{:ok, hostname} = :inet.gethostname()
# start node
Node.start(String.to_atom("livebook@#{hostname}"))
# set cookie
Node.set_cookie(:secure)
# return name of node
Node.self()
end
@doc """
Create connections between nodes
"""
def connect_node(node_name) do
# connect node to other node
Node.connect(String.to_atom(node_name))
end
@doc """
Execute a distributed task, expects a partition_fun, function that from data create a list
with information necessary to do the task individually, original_data, that is the whole
data of the problem, a processing_fun that is in charge of process the data for each node,
and finally a merge_function that after each node compute data is in charge of merge the information
"""
def execute_distributed_task(
original_data,
partition_fun,
merge_fun,
persistence_fun,
processing_fun
) do
IO.puts("hola00")
# nodes including itself
nodes = [Node.self() | Node.list()]
n = length(nodes)
# initial state
state = {self(), Enum.map(1..n, fn _ -> false end), Enum.map(1..n, fn _ -> false end)}
# starts monitor PID that will monitor different responses from different nodes
monitor_pid = spawn(fn -> listen_monitor(state) end)
# list with data specific for computation in each node
data_partition_list = partition_fun.(original_data, n)
# for all nodes including itself
[nodes, data_partition_list, 1..n]
|> Enum.zip()
|> Enum.map(fn {node, data_partition_element, i} ->
# starts a process in each node to be prepared for listen
pid = Node.spawn_link(node, fn -> listen_node(processing_fun) end)
# sends a message to each node to start processing of data
send(pid, {:process_data, monitor_pid, data_partition_element, original_data, i})
end)
# after send all the differents pids stays waiting for the final partitioned data to be merges
result =
receive do
{:merge_response, list_to_merge} ->
merge_fun.(list_to_merge)
end
# if some data need to be save images or something
IO.inspect(result)
persistence_fun.(result)
end
@doc """
From the node listen to the monitor orders this just happens once so it does not call listen_node
at the end
"""
def listen_node(processing_fun) do
# this process is initialized from the monitor
receive do
{:process_data, monitor_pid, data_partition, original_data, i} ->
IO.puts("Inicio de Ejecución desde Nodo: #{Node.self()}")
# the "processing_fun" should be declared also in the other node
processed_data = processing_fun.(data_partition, original_data)
# una vez procesada se envia la información al nodo monitor (que inicio la tarea distribuida)
send(monitor_pid, {:processed_data, processed_data, i})
end
end
@doc """
From the node that start the task he will be listening to the different nodes expecting to get responses for
each node
"""
def listen_monitor({pid_origin, data_response, completition_list}) do
receive do
{:processed_data, processed_result, i} ->
# receive information from one node
IO.puts("Recepción de Respuesta desde Nodo Monitor: #{Node.self()}")
# adds ra true into completition list to mantain record of result completition
new_completition_list = List.replace_at(completition_list, i - 1, true)
# adds data response to a list
new_data_response = List.replace_at(data_response, i - 1, processed_result)
IO.inspect({new_completition_list, new_data_response})
# if all data is completed returns response to original PID who is waiting for response
if Enum.all?(new_completition_list, fn x -> x == true end) do
IO.puts("Respuesta completa enviando")
send(pid_origin, {:merge_response, new_data_response})
else
# if is not completed call itself to wait for other responses
listen_monitor({pid_origin, new_data_response, new_completition_list})
end
end
end
end
defmodule TestFunctions do
@doc """
Test function to partition data
"""
def test_partition_fun(data, n) do
data_partition_list = Enum.map(1..n, fn x -> "data #{x}" end)
end
@doc """
Test processing fun
"""
def test_processing_fun(data, original_data) do
# "| #{data} processed by #{Node.self()} |"
data
end
@doc """
Test merge fun
"""
def test_merge_fun(list) do
Enum.reduce(list, "", fn str, acc -> acc <> str end)
end
@doc """
Test persistence fun
"""
def test_persistence_fun(result) do
IO.puts("result")
IO.inspect(result)
end
end
ParallelServer.start_node()
Node.list()
ParallelServer.connect_node("livebook@nerves-cb66.local")
Text Processing Module
text_input = Kino.Input.textarea("Uploaded Text")
content = Kino.Input.read(text_input)
defmodule Count do
def read(file) do
{:ok, string} = File.read("lib/#{file}")
string
end
def processing_fun(list, _) do
list
|> Enum.filter(fn element ->
Regex.match?(~r/\A\d+\z|\A[a-zA-Z]+\z|\A[a-zA-Z]+'[a-zA-Z]+\z/, element)
end)
|> Enum.reduce(%{}, fn palabra, conteo -> Map.update(conteo, palabra, 1, &(&1 + 1)) end)
end
def partition_fun(string, n) do
string
|> String.downcase()
|> (fn cadena -> Regex.replace(~r/[^a-zA-Z0-9'\s]/, cadena, "") end).()
words = String.split(string, ~r/\s+/, trim: true)
total_words = length(words)
words_per_list = div(total_words, n)
leftover_words = rem(total_words, n)
lists = distribute_words(words, words_per_list, leftover_words)
lists
end
defp distribute_words(words, words_per_list, leftover_words) do
Enum.chunk_every(words, words_per_list + if(leftover_words > 0, do: 1, else: 0))
end
def merge_fun(dictionaries) do
Enum.reduce(dictionaries, %{}, fn dict, acc ->
Enum.reduce(dict, acc, fn {key, value}, acc_dict ->
Map.update(acc_dict, key, value, &(&1 + value))
end)
end)
end
def persistence_fun(dict) do
IO.inspect(dict)
end
def run_program(text) do
{elapsed_time, _result} =
:timer.tc(fn ->
ParallelServer.execute_distributed_task(
text,
&Count.partition_fun/2,
&Count.merge_fun/1,
&Count.persistence_fun/1,
&Count.processing_fun/2
)
end)
IO.puts("El tiempo total de ejecución es #{elapsed_time / 1_000_000} segundos.")
end
end
img_content = ImgManager.run_program(content)