Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Chapter 15

programming-elixir-1.6/chapter-15.livemd

Chapter 15

Mix.install([
  {:kino, "~> 0.8.1"},
  {:vega_lite, "~> 0.1.6"},
  {:kino_vega_lite, "~> 0.1.7"}
])

alias VegaLite, as: Vl

Exercise: WorkingWithMultipleProcesses-1

defmodule Chain do
  def counter(next_pid) do
    receive do
      n -> send(next_pid, n + 1)
    end
  end

  def create_processes(n) do
    code_to_run = fn _, send_to ->
      spawn(Chain, :counter, [send_to])
    end

    last = Enum.reduce(1..n, self(), code_to_run)
    send(last, 0)

    receive do
      final_answer when is_integer(final_answer) ->
        "Result is #{final_answer}"
    end
  end

  def run(n) do
    :timer.tc(Chain, :create_processes, [n])
    |> IO.inspect()
  end
end
number_of_processes = [10, 100, 1_000, 10_000, 100_000]

times =
  number_of_processes
  |> Enum.map(&Chain.run/1)
  |> Enum.map(&elem(&1, 0))
  |> Enum.map(fn nanoseconds -> nanoseconds / 1000 end)

data =
  Enum.zip(number_of_processes, times)
  |> Enum.map(fn {n, time} -> %{"Number of processes" => n, "time (ms)" => time} end)

Kino.DataTable.new(data) |> Kino.render()

Vl.new(width: 400, height: 400)
|> Vl.data_from_values(data)
|> Vl.mark(:point)
|> Vl.encode_field(:x, "Number of processes", type: :quantitative, scale: [type: "log"])
|> Vl.encode_field(:y, "time (ms)", type: :quantitative, scale: [type: "log"])

Exercise: WorkingWithMultipleProcesses-2

echo = fn token ->
  send_to = self()
  spawn(fn -> send(send_to, token) end)
end

echo.("fred")
echo.("betty")

receive do
  token -> IO.puts(token)
end

receive do
  token -> IO.puts(token)
end

In theory, the order should be deterministic, FIFO. Messages sent first should be received first. In practice, seems also true.

Exercise: WorkingWithMultipleProcesses-3

defmodule MySpawn do
  def run(code_for_child) do
    code_for_child.(self())
    :timer.sleep(500)
    print_all_message()
  end

  defp print_all_message do
    print_message(1)
  end

  defp print_message(i) do
    receive do
      message ->
        IO.puts("Got message (##{i}): #{inspect(message)}")
        print_message(i + 1)
    after
      1000 -> IO.puts("No more message")
    end
  end
end
code_for_child = fn parent ->
  spawn_link(fn -> send(parent, "Hello from child") end)
end

MySpawn.run(code_for_child)

It doesn’t matter that you weren’t waiting for the notification from the child when it exited. Notifications are simply placed in your inbox and you can read them at any time (before being flushed).

Exercise: WorkingWithMultipleProcesses-4

code_for_child = fn parent ->
  spawn_link(fn ->
    send(parent, "Hello from child")
    raise "something wrong"
  end)
end

MySpawn.run(code_for_child)

The process died before the message was received.

Exercise: WorkingWithMultipleProcesses-5

code_for_child = fn parent ->
  spawn_monitor(fn -> send(parent, "Hello from child") end)
end

MySpawn.run(code_for_child)
code_for_child = fn parent ->
  spawn_monitor(fn ->
    send(parent, "Hello from child")
    raise "something wrong"
  end)
end

MySpawn.run(code_for_child)

Exercise: WorkingWithMultipleProcesses-6

self returns PID of the calling process. Calling self in an anonymous function passed to spawn_link returns the PID of the process created by spawn_link. Therefore, assigned the value of self to the variable me at the top of the method, in the process of receiving messages.

Exercise: WorkingWithMultipleProcesses-7

defmodule Parallel do
  def pmap(collection, fun) do
    me = self()

    collection
    |> Enum.map(fn elem ->
      spawn_link(fn ->
        random_delay()
        send(me, {self(), fun.(elem)})
      end)
    end)
    |> Enum.map(fn pid ->
      receive do
        {^pid, result} -> result
      end
    end)
  end

  defp random_delay do
    :timer.sleep(:rand.uniform(10))
  end
end

Parallel.pmap(1..10, &(&1 * &1))

Exercise: WorkingWithMultipleProcesses-8

defmodule FibSolver do
  def fib(scheduler) do
    send(scheduler, {:ready, self()})

    receive do
      {:fib, n, client} ->
        send(client, {:answer, n, fib_calc(n), self()})
        fib(scheduler)

      {:shutdown} ->
        exit(:normal)
    end
  end

  defp fib_calc(0), do: 0
  defp fib_calc(1), do: 1
  defp fib_calc(n), do: fib_calc(n - 1) + fib_calc(n - 2)
end

defmodule Scheduler do
  def run(num_processes, module, func, to_calculate) do
    1..num_processes
    |> Enum.map(fn _ -> spawn(module, func, [self()]) end)
    |> schedule_processes(to_calculate, [])
  end

  defp schedule_processes(processes, queue, results) do
    receive do
      {:ready, pid} when queue != [] ->
        [next | tail] = queue
        send(pid, {:fib, next, self()})
        schedule_processes(processes, tail, results)

      {:ready, pid} ->
        send(pid, {:shutdown})

        if length(processes) > 1 do
          schedule_processes(List.delete(processes, pid), queue, results)
        else
          Enum.sort(results, fn {n1, _}, {n2, _} -> n1 <= n2 end)
        end

      {:answer, number, result, _pid} ->
        schedule_processes(processes, queue, [{number, result} | results])
    end
  end
end

to_process = List.duplicate(37, 20)

1..10
|> Enum.each(fn num_processes ->
  {time, result} =
    :timer.tc(
      Scheduler,
      :run,
      [num_processes, FibSolver, :fib, to_process]
    )

  if num_processes == 1 do
    IO.puts(inspect(result))
    IO.puts("\n #   time (s)")
  end

  :io.format("~2B.    ~.2f~n", [num_processes, time / 1_000_000.0])
end)

Exercise: WorkingWithMultipleProcesses-9

defmodule CatFinder do
  def task(scheduler) do
    send(scheduler, {:ready, self()})

    receive do
      {:do, file, client} ->
        send(client, {:done, file, _find(file), self()})
        task(scheduler)

      {:shutdown} ->
        exit(:normal)
    end
  end

  defp _find(file) do
    file
    |> File.read!()
    |> String.split("cat")
    |> length()
    |> Kernel.-(1)
  end
end

defmodule GenScheduler do
  def run(num_processes, module, resource) do
    1..num_processes
    |> Enum.map(fn _ -> spawn(module, :task, [self()]) end)
    |> schedule_processes(resource, [])
  end

  defp schedule_processes(processes, resource, results) do
    receive do
      {:ready, pid} when resource != [] ->
        [next | tail] = resource
        send(pid, {:do, next, self()})
        schedule_processes(processes, tail, results)

      {:ready, pid} ->
        send(pid, :shutdown)

        if length(processes) > 1 do
          schedule_processes(List.delete(processes, pid), resource, results)
        else
          Enum.sort(results, fn {n1, _}, {n2, _} -> n1 <= n2 end)
        end

      {:done, given, result, _pid} ->
        schedule_processes(processes, resource, [{given, result} | results])
    end
  end
end
input = Kino.Input.text("Directory")
directory = Kino.Input.read(input)
File.cd!(directory)
to_process = directory |> File.ls!() |> Enum.filter(&amp;File.regular?/1)

1..10
|> Enum.each(fn num_processes ->
  {time, result} =
    :timer.tc(
      GenScheduler,
      :run,
      [num_processes, CatFinder, to_process]
    )

  if num_processes == 1 do
    IO.puts(inspect(result))
    IO.puts("\n #   time (s)")
  end

  :io.format("~2B.    ~.2f~n", [num_processes, time / 1_000_000.0])
end)