Chapter 15
Mix.install([
{:kino, "~> 0.8.1"},
{:vega_lite, "~> 0.1.6"},
{:kino_vega_lite, "~> 0.1.7"}
])
alias VegaLite, as: Vl
Exercise: WorkingWithMultipleProcesses-1
defmodule Chain do
def counter(next_pid) do
receive do
n -> send(next_pid, n + 1)
end
end
def create_processes(n) do
code_to_run = fn _, send_to ->
spawn(Chain, :counter, [send_to])
end
last = Enum.reduce(1..n, self(), code_to_run)
send(last, 0)
receive do
final_answer when is_integer(final_answer) ->
"Result is #{final_answer}"
end
end
def run(n) do
:timer.tc(Chain, :create_processes, [n])
|> IO.inspect()
end
end
number_of_processes = [10, 100, 1_000, 10_000, 100_000]
times =
number_of_processes
|> Enum.map(&Chain.run/1)
|> Enum.map(&elem(&1, 0))
|> Enum.map(fn nanoseconds -> nanoseconds / 1000 end)
data =
Enum.zip(number_of_processes, times)
|> Enum.map(fn {n, time} -> %{"Number of processes" => n, "time (ms)" => time} end)
Kino.DataTable.new(data) |> Kino.render()
Vl.new(width: 400, height: 400)
|> Vl.data_from_values(data)
|> Vl.mark(:point)
|> Vl.encode_field(:x, "Number of processes", type: :quantitative, scale: [type: "log"])
|> Vl.encode_field(:y, "time (ms)", type: :quantitative, scale: [type: "log"])
Exercise: WorkingWithMultipleProcesses-2
echo = fn token ->
send_to = self()
spawn(fn -> send(send_to, token) end)
end
echo.("fred")
echo.("betty")
receive do
token -> IO.puts(token)
end
receive do
token -> IO.puts(token)
end
In theory, the order should be deterministic, FIFO. Messages sent first should be received first. In practice, seems also true.
Exercise: WorkingWithMultipleProcesses-3
defmodule MySpawn do
def run(code_for_child) do
code_for_child.(self())
:timer.sleep(500)
print_all_message()
end
defp print_all_message do
print_message(1)
end
defp print_message(i) do
receive do
message ->
IO.puts("Got message (##{i}): #{inspect(message)}")
print_message(i + 1)
after
1000 -> IO.puts("No more message")
end
end
end
code_for_child = fn parent ->
spawn_link(fn -> send(parent, "Hello from child") end)
end
MySpawn.run(code_for_child)
It doesn’t matter that you weren’t waiting for the notification from the child when it exited. Notifications are simply placed in your inbox and you can read them at any time (before being flushed).
Exercise: WorkingWithMultipleProcesses-4
code_for_child = fn parent ->
spawn_link(fn ->
send(parent, "Hello from child")
raise "something wrong"
end)
end
MySpawn.run(code_for_child)
The process died before the message was received.
Exercise: WorkingWithMultipleProcesses-5
code_for_child = fn parent ->
spawn_monitor(fn -> send(parent, "Hello from child") end)
end
MySpawn.run(code_for_child)
code_for_child = fn parent ->
spawn_monitor(fn ->
send(parent, "Hello from child")
raise "something wrong"
end)
end
MySpawn.run(code_for_child)
Exercise: WorkingWithMultipleProcesses-6
self
returns PID of the calling process. Calling self
in an anonymous function passed to spawn_link
returns the PID of the process created by spawn_link
. Therefore, assigned the value of self
to the variable me
at the top of the method, in the process of receiving messages.
Exercise: WorkingWithMultipleProcesses-7
defmodule Parallel do
def pmap(collection, fun) do
me = self()
collection
|> Enum.map(fn elem ->
spawn_link(fn ->
random_delay()
send(me, {self(), fun.(elem)})
end)
end)
|> Enum.map(fn pid ->
receive do
{^pid, result} -> result
end
end)
end
defp random_delay do
:timer.sleep(:rand.uniform(10))
end
end
Parallel.pmap(1..10, &(&1 * &1))
Exercise: WorkingWithMultipleProcesses-8
defmodule FibSolver do
def fib(scheduler) do
send(scheduler, {:ready, self()})
receive do
{:fib, n, client} ->
send(client, {:answer, n, fib_calc(n), self()})
fib(scheduler)
{:shutdown} ->
exit(:normal)
end
end
defp fib_calc(0), do: 0
defp fib_calc(1), do: 1
defp fib_calc(n), do: fib_calc(n - 1) + fib_calc(n - 2)
end
defmodule Scheduler do
def run(num_processes, module, func, to_calculate) do
1..num_processes
|> Enum.map(fn _ -> spawn(module, func, [self()]) end)
|> schedule_processes(to_calculate, [])
end
defp schedule_processes(processes, queue, results) do
receive do
{:ready, pid} when queue != [] ->
[next | tail] = queue
send(pid, {:fib, next, self()})
schedule_processes(processes, tail, results)
{:ready, pid} ->
send(pid, {:shutdown})
if length(processes) > 1 do
schedule_processes(List.delete(processes, pid), queue, results)
else
Enum.sort(results, fn {n1, _}, {n2, _} -> n1 <= n2 end)
end
{:answer, number, result, _pid} ->
schedule_processes(processes, queue, [{number, result} | results])
end
end
end
to_process = List.duplicate(37, 20)
1..10
|> Enum.each(fn num_processes ->
{time, result} =
:timer.tc(
Scheduler,
:run,
[num_processes, FibSolver, :fib, to_process]
)
if num_processes == 1 do
IO.puts(inspect(result))
IO.puts("\n # time (s)")
end
:io.format("~2B. ~.2f~n", [num_processes, time / 1_000_000.0])
end)
Exercise: WorkingWithMultipleProcesses-9
defmodule CatFinder do
def task(scheduler) do
send(scheduler, {:ready, self()})
receive do
{:do, file, client} ->
send(client, {:done, file, _find(file), self()})
task(scheduler)
{:shutdown} ->
exit(:normal)
end
end
defp _find(file) do
file
|> File.read!()
|> String.split("cat")
|> length()
|> Kernel.-(1)
end
end
defmodule GenScheduler do
def run(num_processes, module, resource) do
1..num_processes
|> Enum.map(fn _ -> spawn(module, :task, [self()]) end)
|> schedule_processes(resource, [])
end
defp schedule_processes(processes, resource, results) do
receive do
{:ready, pid} when resource != [] ->
[next | tail] = resource
send(pid, {:do, next, self()})
schedule_processes(processes, tail, results)
{:ready, pid} ->
send(pid, :shutdown)
if length(processes) > 1 do
schedule_processes(List.delete(processes, pid), resource, results)
else
Enum.sort(results, fn {n1, _}, {n2, _} -> n1 <= n2 end)
end
{:done, given, result, _pid} ->
schedule_processes(processes, resource, [{given, result} | results])
end
end
end
input = Kino.Input.text("Directory")
directory = Kino.Input.read(input)
File.cd!(directory)
to_process = directory |> File.ls!() |> Enum.filter(&File.regular?/1)
1..10
|> Enum.each(fn num_processes ->
{time, result} =
:timer.tc(
GenScheduler,
:run,
[num_processes, CatFinder, to_process]
)
if num_processes == 1 do
IO.puts(inspect(result))
IO.puts("\n # time (s)")
end
:io.format("~2B. ~.2f~n", [num_processes, time / 1_000_000.0])
end)