Powered by AppSignal & Oban Pro

Comparação de MergeSort Paralelo com Sequencial

notebook.livemd

Comparação de MergeSort Paralelo com Sequencial

Modelo de atores

O modelo de concorrência do Elixir é o modelo de atores.

Esse é um modelo que trata o ator como o primitivo universal da computação simultânea.

Em resposta a uma mensagem que recebe, um ator pode: tomar decisões locais, criar mais atores, enviar mais mensagens e determinar como responder à próxima mensagem recebida.

Os atores podem modificar seu próprio estado privado, mas só podem afetar uns aos outros indiretamente por meio de mensagens (eliminando a necessidade de sincronização baseada em bloqueio).

Dessa forma, utilizar programação concorrente se torna mais fácil, uma vez que cada tarefa (que substitui o threading tradicional) não compartilha estado e elas se comunicam através de mensagens.

Merge Sort Paralelo

Uma vez que o merge sort não altera o array inplace (alterando diretamente na memória o array que está sendo ordenado), diferente de outros como o QuickSort, a utilização desse algortimo pode tomar uma vantagem dividindo os sortings de forma concorrente diferente da sua implementação clássica de forma sequencial.

Como funciona?

Na execução clássica, as chamadas recursivas vão dividindo e ordenando, mas isso ocorre de forma sequencial.

De forma paralela, podemos executar as chamadas de forma paralela. Cada chamada recursiva spawna um processo (ator) filho atrelado ao processo pai que o criou. A medida que os processos filhos forem terminando, os resultados vão subindo a árvore de processos enviando os resultados via mensagem para quem as criou.

De forma visual:

Implementação

No Elixir podemos definir varias funções com o mesmo nome, e dependendo do padrão do argumento, realizamos algo diferente. Isso é uma forma alternativa de fazer muitas branches com if/else e tornar a função difícil de ler.

Por exemplo, no módulo a seguir Mergex.Merger se a função merge receber dois arrays vazios, ela retorna um array vazio.

Se ela receber uma lista não vazia no primeiro argumento, mas uma vazia no segundo, retorna a lista. O inverso também é verdadeiro.

Ela também pode desestruturar os argumentos, de forma que `def merge([head tail], second_list)`.
Nesse caso, `[head tail],headé o primeiro item da lista etail` é o resto da lista.

Também é possível adicionar uma cláusula guarda que verifica alguma condição antes de executar a função como um when head <= hd(second_list). A função hd é uma função nativa para recuperar o head de um array.

list = [1, 2, 3]
[head | tail] = list

IO.inspect(head, label: "Head")
IO.inspect(hd(list), label: "Head with hd")
IO.inspect(tail, label: "Tail")
defmodule Mergex.Merger do
  def merge([], []), do: []
  def merge(not_empty_list, []), do: not_empty_list
  def merge([], not_empty_list), do: not_empty_list

  def merge([head | tail], second_list) when head <= hd(second_list) do
    [head | merge(tail, second_list)]
  end

  def merge(first_list, [head | tail]), do: [head | merge(first_list, tail)]
end

Para implementar o módulo em paralelo e sequencial, podemos fazer uso do mesmo artifício de casamento de padrões na função sort.

# lib/mergex/sequential.ex

defmodule Mergex.Sequential do
  alias Mergex.{Merger}

  def sort([]), do: []
  def sort(list = [_]), do: list

  def sort(list) do
    list
    |> Enum.split(div(length(list), 2))
    |> sort_and_merge
  end

  defp sort_and_merge({right, left}) do
    Merger.merge(sort(right), sort(left))
  end
end
# lib/mergex/parallel.ex

defmodule Mergex.Parallel do
  alias Mergex.{Merger, Sequential}

  def sort(list, max_parallel \\ 1)
  def sort([], _), do: []
  def sort(list = [_], _), do: list

  '''
    Essa é a primeira função a ser chamada,
    ela recebe a lista, um valor máximo de paralelização
    e qual o nível de paralelização a ser chamado.
    Ela chama uma função com a aridade (numero de params) 
    igual a 3: sort/3
  '''

  def sort(list, max_parallel),
    do: sort(list, max_parallel, 1)

  def sort([], _, _), do: []
  def sort(list = [_], _, _), do: list

  '''
    A função sort/3 
  '''

  def sort(list, max_parallel, current_level) do
    '''
      o |> pega o resultado da função anterior e passa
      como argumento para a proxima funçao na linha
    '''

    list
    |> Enum.split(div(length(list), 2))
    |> sort_and_merge(max_parallel, current_level + 1)
  end

  '''
    Essa é a parte mais importante.
    Caso o nivel atual de paralelismo for maior ou igual 
    que o máximo,s ão criadas duas tasks 
    com cada metade da lista e é chamado o sort Sequencial.
  '''

  defp sort_and_merge(
         {right, left},
         max_parallel,
         current_level
       )
       when current_level >= max_parallel do
    task_right = Task.async(fn -> Sequential.sort(right) end)
    task_left = Task.async(fn -> Sequential.sort(left) end)

    handle_response(task_right, task_left)
  end

  '''
    Caso contrário, as metades criam novas tasks e chamam o sort
    recursivamente
  '''

  defp sort_and_merge(
         {right, left},
         max_parallel,
         current_level
       ) do
    task_right = Task.async(fn -> sort(right, max_parallel, current_level) end)
    task_left = Task.async(fn -> sort(left, max_parallel, current_level) end)

    handle_response(task_right, task_left)
  end

  defp handle_response(task_right, task_left) do
    Merger.merge(Task.await(task_right, :infinity), Task.await(task_left, :infinity))
  end
end

Esse é um exemplo simples de como usar as funções.

# Parse de inputs
{array_size, _} = IO.gets("Tamanho do array") |> Integer.parse()
{parallel_max, _} = IO.gets("Max paralelismo") |> Integer.parse()

# Define a lista

list = 1..array_size |> Enum.to_list() |> Enum.shuffle()
# Realiza sort paralelo
Mergex.Parallel.sort(list, parallel_max)
Benchee.run(%{
  "parallel_4" => fn -> Mergex.Parallel.sort(list, 4) end,
  "sequential" => fn -> Mergex.Sequential.sort(list) end
  # "parallel_8" => fn -> Mergex.Parallel.sort(list, 8) end,
  # "parallel_input" => fn -> Mergex.Parallel.sort(list, parallel_max) end,
})