Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Elixir In Action - Part 2 - Concurrent Elixir

livebook/1_Concurrency.livemd

Elixir In Action - Part 2 - Concurrent Elixir

defmodule NotebookHelpers do
  def directory(), do: (__ENV__.file |> Path.dirname())
  def file_path(filename), do: directory() <> "/files/" <> filename
end


defmodule Entry do
  defstruct [:id, :date, :title]

  def new(id, date, title), do: %Entry{id: id, date: date, title: title}
end

defmodule TodoList do
  @moduledoc """
  ## Examples
  
  iex> alias TodoList, as: TodoList
  iex> entries = [
  ...> %{date: ~D[2023-12-19], title: "Dentist"},
  ...> %{date: ~D[2023-12-20], title: "Shopping"},
  ...> %{date: ~D[2023-12-19], title: "Movies"},
  ...> ]
  iex> TodoList.new(entries) 
  %TodoList {
    next_id: 4, 
    entries: %{
      1 => %Entry{id: 1, date: ~D[2023-12-19], title: "Dentist"}, 
      2 => %Entry{id: 2, date: ~D[2023-12-20], title: "Shopping"}, 
      3 => %Entry{id: 3, date: ~D[2023-12-19], title: "Movies"}
    }
  }
  """

  defstruct next_id: 1, entries: %{}

  def new(entries \\ []) do
    Enum.reduce(entries, %TodoList{}, &amp;(add_entry(&amp;2, &amp;1)))
  end

  def add_entry(%TodoList{} = todo_list, %{date: date, title: title}) do
    entry = Entry.new(todo_list.next_id, date, title)
    entries = Map.put(todo_list.entries, todo_list.next_id, entry)
    %TodoList{todo_list | next_id: entry.id + 1, entries: entries}
  end

  def entries(%TodoList{} = todo_list, date) do
    Map.values(todo_list.entries)
      |> Enum.filter(&amp;(&amp;1.date == date))
  end

  def update_entry(%TodoList{} = todo_list, entry_id, updater_fun) do    
    case Map.fetch(todo_list.entries, entry_id) do
      :error -> todo_list
      {:ok, entry} -> 
        put_in(todo_list.entries[entry_id], updater_fun.(entry))
    end
  end

  def delete_entry(%TodoList{} = todo_list, entry_id) do
    updated_entries = todo_list.entries |> Map.delete(entry_id)
    %TodoList{todo_list | entries: updated_entries}
  end
end

defmodule TodoList.CsvImporter do
  @moduledoc """
  ## Examples

  iex> TodoList.CsvImporter.import("todos.csv")
  %TodoList{
    next_id: 4,
    entries: %{
      1 => %Entry{id: 1, date: ~D[2023-12-19], title: "Dentist"},
      2 => %Entry{id: 2, date: ~D[2023-12-20], title: "Shopping"},
      3 => %Entry{id: 3, date: ~D[2023-12-19], title: "Movies"}
    }
  }
  """
  
  def import(file_name) do
    NotebookHelpers.file_path(file_name)
      |> File.stream!(:line)
      |> Stream.map(&amp;line_to_entry/1)
      |> TodoList.new()
  end

  defp line_to_entry(line) do
    [date_string, title] = String.trim(line) 
      |> String.split(",", trim: true)
    
    date = Date.from_iso8601!(date_string)
    
    Map.new([{:date, date}, {:title, title}] )
  end
end

defimpl Collectable, for: TodoList do
  @moduledoc """
  ## Examples
  iex> alias TodoList, as: TodoList
  iex> todo_list = TodoList.CsvImporter.import("todos.csv")
  iex> Enum.into([%{date: ~D[2023-12-21], title: "MyBook"}], todo_list)
  %TodoList{
    next_id: 5,
    entries: %{
      1 => %Entry{id: 1, date: ~D[2023-12-19], title: "Dentist"},
      2 => %Entry{id: 2, date: ~D[2023-12-20], title: "Shopping"},
      3 => %Entry{id: 3, date: ~D[2023-12-19], title: "Movies"},
      4 => %Entry{id: 4, date: ~D[2023-12-21], title: "MyBook"},
    }
  }
  """
  
  def into(todo_list) do
    fun = fn 
      todo_list_acc, {:cont, entry} -> TodoList.add_entry(todo_list_acc, entry)
      todo_list_acc, :done -> todo_list_acc
      _, :halt -> :ok
    end

    {todo_list, fun}
  end
end

5.2.1 Creating processes

long_running_task = fn title -> 
  Process.sleep(1000)
  "#{title} result" 
end

1..5
|> Enum.each(fn num -> 
  spawn(fn -> 
    long_running_task.(num) |> IO.puts()
  end)
end)

5.2.2 Message passing

parent_pid = self()

run_query_async = fn num -> 
  spawn(fn -> 
    Process.sleep(2000)
    send(parent_pid, {:query_result, "#{num} result"})
  end)
end

receive_result = fn -> 
  receive do
    {:query_result, result} -> result
  end
end

1..5
  |> Enum.map(&amp;run_query_async.(&amp;1))
  |> Enum.map(fn _ -> receive_result.() end)

5.3.1 - Server processes

defmodule DatabaseServer do
  def start() do
    spawn(fn -> loop() end)
  end

  def send_query(server_pid, query) do
    send(server_pid, {:query, self(), query})
  end

  def get_result() do
    receive do
      {:result, result} -> result
    after 
      5000 -> :error
    end
  end

  defp loop() do
    {client_pid, result} = receive do
      {:query, client_pid, query} -> 
        Process.sleep(2000)
        {client_pid, "#{query} result"}
    end

    send(client_pid, {:result, result})
    loop()
  end
end

# pooled servers -> concurrent processing
pool = 1..100 
  |> Enum.map(fn num -> {num, DatabaseServer.start()} end)
  |> Map.new()

1..20
  |> Enum.map(fn num -> 
    server_pid = Map.fetch!(pool, :rand.uniform(100))
    DatabaseServer.send_query(server_pid, num)
  end)
  |> Enum.map(fn _ -> DatabaseServer.get_result() end)

5.3.2 Keeping a process state

defmodule StatefulDatabaseServer do
  def start() do
    spawn(fn -> loop(:rand.uniform(1000)) end)
  end

  def send_query(server_pid, query) do
    send(server_pid, {:query, self(), query})
  end

  def get_result() do
    receive do
      {:result, result} -> result
    after 
      5000 -> :error
    end
  end

  defp loop(connection_id) do
    {client_pid, result} = receive do
      {:query, client_pid, query} -> 
        Process.sleep(2000)
        {client_pid, "Connection #{connection_id}: #{query} result"}
    end

    send(client_pid, {:result, result})
    loop(connection_id)
  end
end

pool_size = 100
# pooled servers -> concurrent processing
pool = 1..pool_size 
  |> Enum.map(fn num -> {num, StatefulDatabaseServer.start()} end)
  |> Map.new()

1..20
  |> Enum.map(fn num -> 
    server_pid = Map.fetch!(pool, :rand.uniform(pool_size))
    StatefulDatabaseServer.send_query(server_pid, num)
  end)
  |> Enum.map(fn _ -> StatefulDatabaseServer.get_result() end)

5.3.3 Mutable state

defmodule CalculatorServer do  
  @initial_state 0
  
  def start() do
    spawn(fn -> loop(@initial_state) end)
  end

  def add(calculator_pid, num), do: send(calculator_pid, {:add, num})
  def sub(calculator_pid, num), do: send(calculator_pid, {:sub, num})
  def mul(calculator_pid, num), do: send(calculator_pid, {:mul, num})
  def div(calculator_pid, num), do: send(calculator_pid, {:div, num})
  def value(calculator_pid) do
    send(calculator_pid, {:value, self()})
    receive do
      {:result, result} -> result
    after
      5000 -> {:error, :timeout}
    end
  end

  defp loop(state) do
    new_state = receive do
      message -> process_message(message, state)
    end

    loop(new_state)
  end

  defp process_message({:add, num}, state), do: state + num
  defp process_message({:sub, num}, state), do: state - num
  defp process_message({:mul, num}, state), do: state * num
  defp process_message({:div, num}, state), do: state / num
  defp process_message({:value, client_pid}, state) do
    send(client_pid, {:result, state})
    state
  end
end

calculator_pid = CalculatorServer.start()
CalculatorServer.value(calculator_pid)
CalculatorServer.add(calculator_pid, 10)
CalculatorServer.sub(calculator_pid, 5)
CalculatorServer.mul(calculator_pid, 3)
CalculatorServer.div(calculator_pid, 5)
CalculatorServer.value(calculator_pid)

5.3.4 Complex states

defmodule TodoServer do
  def start() do
    spawn(fn -> loop(TodoList.new()) end)
  end

  def add_entry(todo_server_pid, entry) do
    send(todo_server_pid, {:add, entry})
  end

  def delete_entry(todo_server_pid, entry_id) do
    send(todo_server_pid, {:delete, entry_id})
  end

  def update_entry(todo_server_pid, entry_id, update_fun) do
    send(todo_server_pid, {:update, entry_id, update_fun})
  end

  def entries(todo_server_pid, date) do
    send(todo_server_pid, {:entries, self(), date})
    receive do
      {:result, result} -> result
    after 
      5000 -> :error
    end
  end

  defp loop(todo_list) do
    updated_todo_list = receive do
      message -> process_message(message, todo_list)
    end
    loop(updated_todo_list)
  end

  defp process_message({:add, entry}, todo_list) do
    TodoList.add_entry(todo_list, entry)
  end
  defp process_message({:delete, entry_id}, todo_list) do
    TodoList.delete_entry(todo_list, entry_id)
  end
  defp process_message({:update, entry_id, update_fun}, todo_list) do
    TodoList.update_entry(todo_list, entry_id, update_fun)
  end
  defp process_message({:entries, client_pid, date}, todo_list) do
    entries = TodoList.entries(todo_list, date)
    send(client_pid, {:result, entries})
    todo_list
  end
end

todo_server = TodoServer.start()
TodoServer.add_entry(
   todo_server,
   %{date: ~D[2023-12-19], title: "Dentist"}
 )

TodoServer.add_entry(
   todo_server,
   %{date: ~D[2023-12-20], title: "Shopping"}
 )

TodoServer.add_entry(
   todo_server,
   %{date: ~D[2023-12-19], title: "Movies"}
 )

[
  %Entry{id: 1, date: ~D[2023-12-19], title: "Dentist"},
  %Entry{id: 3, date: ~D[2023-12-19], title: "Movies"}
] = TodoServer.entries(todo_server, ~D[2023-12-19])

TodoServer.update_entry(todo_server, 1, fn entry -> %Entry{entry | title: "DentistUpdated"} end)

[
  %Entry{id: 1, date: ~D[2023-12-19], title: "DentistUpdated"},
  %Entry{id: 3, date: ~D[2023-12-19], title: "Movies"}
] = TodoServer.entries(todo_server, ~D[2023-12-19])

TodoServer.delete_entry(todo_server, 1)
[
  %Entry{id: 3, date: ~D[2023-12-19], title: "Movies"}
] = TodoServer.entries(todo_server, ~D[2023-12-19])

:ok

5.3.5 Registered Processes (Singleton)

defmodule SingletonTodoServer do
  def start() do
    pid = spawn(fn -> loop(TodoList.new()) end)
    Process.register(pid, :todo_server)
  end

  def add_entry(entry) do
    send(:todo_server, {:add, entry})
  end

  def update_entry(entry_id, update_fun) do
    send(:todo_server, {:update, entry_id, update_fun})
  end

  def delete_entry(entry_id) do
    send(:todo_server, {:delete, entry_id})
  end

  def entries(date) do
    send(:todo_server, {:entries, self(), date})
    receive do
      {:result, result} -> result
    end
  end

  defp loop(todo_list) do
    new_todo_list = receive do
      message -> process_message(todo_list, message)
    after 
      500 -> :error
    end

    loop(new_todo_list)
  end

  defp process_message(todo_list, {:add, entry}) do
    TodoList.add_entry(todo_list, entry)
  end

  defp process_message(todo_list, {:update, entry_id, update_fun}) do
    TodoList.update_entry(todo_list, entry_id, update_fun)
  end

  defp process_message(todo_list, {:delete, entry_id}) do
    TodoList.delete_entry(todo_list, entry_id)
  end

  defp process_message(todo_list, {:entries, client_pid, date}) do
    entries = TodoList.entries(todo_list, date)
    send(client_pid, {:result, entries})
    todo_list
  end
end


SingletonTodoServer.start()
SingletonTodoServer.add_entry(%{date: ~D[2023-12-19], title: "Dentist"})
SingletonTodoServer.add_entry(%{date: ~D[2023-12-20], title: "Shopping"})
SingletonTodoServer.add_entry(%{date: ~D[2023-12-19], title: "Movies"})

[
  %Entry{id: 1, date: ~D[2023-12-19], title: "Dentist"},
  %Entry{id: 3, date: ~D[2023-12-19], title: "Movies"}
] = SingletonTodoServer.entries(~D[2023-12-19])

SingletonTodoServer.update_entry(1, fn entry -> %Entry{entry | title: "DentistUpdated"} end)

[
  %Entry{id: 1, date: ~D[2023-12-19], title: "DentistUpdated"},
  %Entry{id: 3, date: ~D[2023-12-19], title: "Movies"}
] = SingletonTodoServer.entries(~D[2023-12-19])

SingletonTodoServer.delete_entry(1)
[
  %Entry{id: 3, date: ~D[2023-12-19], title: "Movies"}
] = SingletonTodoServer.entries(~D[2023-12-19])

:ok

6. GenServer - 6.1.2 Implementing the generic code

defmodule MyGenServer do
  def start(callback_module) do
    spawn(fn -> loop(callback_module, callback_module.init()) end)
  end

  def call(server_pid, message) do
    send(server_pid, {:call, message, self()})
    receive do
      response -> {:response, response}
    end
  end

  def cast(server_pid, message) do
    send(server_pid, {:cast, message})
    :ok
  end

  defp loop(callback_module, state) do
    new_state = receive do
      
      {:call, message, client_id} -> 
        {response, new_state} = callback_module.handle_call(message, state)
        send(client_id, response)
        new_state
        
      {:cast, message} -> 
        callback_module.handle_cast(message, state)
    end
    loop(callback_module, new_state)
  end
end

defmodule MyKeyValueStore do
  def start do
    MyGenServer.start(__MODULE__)
  end

  def put(pid, key, value) do
    MyGenServer.call(pid, {:put, key, value})
  end

  def get(pid, key) do
    MyGenServer.call(pid, {:get, key})
  end

  def reset(pid) do
    MyGenServer.cast(pid, :reset)
  end
  
  def init(), do: %{}
  def handle_call({:put, key, value}, map) do
    {:ok, Map.put(map, key, value)}
  end

  def handle_call({:get, key}, map) do
    {Map.get(map, key), map}
  end

  def handle_cast(:reset, _map) do
    %{}
  end
end


pid = MyKeyValueStore.start()
MyKeyValueStore.put(pid, :some_key, :some_value) |> IO.inspect()
MyKeyValueStore.get(pid, :some_key) |> IO.inspect()
MyKeyValueStore.reset(pid) |> IO.inspect()
MyKeyValueStore.get(pid, :some_key) |> IO.inspect()
MyKeyValueStore.put(pid, :some_key, :some_value) |> IO.inspect()
MyKeyValueStore.get(pid, :some_key) |> IO.inspect()

6.1.5 Exercise: Refactoring the to-do server

defmodule SimpleTodoServer do
  def start(), do: MyGenServer.start(__MODULE__)

  def add_entry(todo_server, entry) do
    MyGenServer.cast(todo_server, {:add, entry})
  end

  def update_entry(todo_server, entry_id, updater_fun) do
    MyGenServer.cast(todo_server, {:update, entry_id, updater_fun})
  end

  def delete_entry(todo_server, entry_id) do
    MyGenServer.cast(todo_server, {:delete, entry_id})
  end

  def entries(todo_server, date) do
    MyGenServer.call(todo_server, {:entries, date})
  end

  def init(), do: TodoList.new()

  def handle_cast({:add, entry}, todo_list) do
    TodoList.add_entry(todo_list, entry)
  end

  def handle_cast({:update, entry_id, updater_fun}, todo_list) do
    TodoList.update_entry(todo_list, entry_id, updater_fun)
  end

  def handle_cast({:delete, entry_id}, todo_list) do
    TodoList.delete_entry(todo_list, entry_id)
  end

  def handle_call({:entries, date}, todo_list) do
    {TodoList.entries(todo_list, date), todo_list}
  end
end

todo_server = SimpleTodoServer.start()
SimpleTodoServer.add_entry(
   todo_server,
   %{date: ~D[2023-12-19], title: "Dentist"}
 )

SimpleTodoServer.add_entry(
   todo_server,
   %{date: ~D[2023-12-20], title: "Shopping"}
 )

SimpleTodoServer.add_entry(
   todo_server,
   %{date: ~D[2023-12-19], title: "Movies"}
 )

{:response, [
  %Entry{id: 1, date: ~D[2023-12-19], title: "Dentist"},
  %Entry{id: 3, date: ~D[2023-12-19], title: "Movies"}
]} = SimpleTodoServer.entries(todo_server, ~D[2023-12-19])

SimpleTodoServer.update_entry(todo_server, 1, fn entry -> %Entry{entry | title: "DentistUpdated"} end)

{:response, [
  %Entry{id: 1, date: ~D[2023-12-19], title: "DentistUpdated"},
  %Entry{id: 3, date: ~D[2023-12-19], title: "Movies"}
]} = SimpleTodoServer.entries(todo_server, ~D[2023-12-19])

SimpleTodoServer.delete_entry(todo_server, 1)
{:response, [
  %Entry{id: 3, date: ~D[2023-12-19], title: "Movies"}
]} = SimpleTodoServer.entries(todo_server, ~D[2023-12-19])

:ok

6.2.3 Handling requests


defmodule KeyValueStore do
  use GenServer

  def start(map \\ nil), do: GenServer.start(__MODULE__, map)

  def put(pid, key, value), do: GenServer.cast(pid, {:put, key, value})

  def get(pid, key), do: GenServer.call(pid, {:get, key})

  def init(map), do: {:ok, map || Map.new()}

  def handle_cast({:put, key, value}, map) do
    {:noreply, Map.put(map, key, value)}
  end

  def handle_call({:get, key}, metadata, map) do
    IO.inspect(metadata)
    {:reply, Map.get(map, key), map}
  end
end

{:ok, pid} = KeyValueStore.start()
KeyValueStore.put(pid, :some_key, :some_value)
:some_value = KeyValueStore.get(pid, :some_key)
:ok

6.2.4 Handling plain messages

defmodule CustomMessageServer do
  use GenServer

  def start(), do: GenServer.start(__MODULE__, nil)
  def hello(pid), do: send(pid, :hello)

  @impl GenServer
  def init(_), do: {:ok, nil}

  @impl GenServer
  def handle_info(:hello, state) do
    IO.puts("Hello world!")
    {:noreply, state}
  end
end

{:ok, pid} = CustomMessageServer.start()
CustomMessageServer.hello(pid)
CustomMessageServer.hello(pid)
CustomMessageServer.hello(pid)
:ok

6.2.7 OTP-compliant processes

defmodule RealTodoServer do
  use GenServer
  
  def start(), do: GenServer.start(__MODULE__, nil)

  def add_entry(todo_server, entry) do
    GenServer.cast(todo_server, {:add, entry})
  end

  def update_entry(todo_server, entry_id, updater_fun) do
    GenServer.cast(todo_server, {:update, entry_id, updater_fun})
  end

  def delete_entry(todo_server, entry_id) do
    GenServer.cast(todo_server, {:delete, entry_id})
  end

  def entries(todo_server, date) do
    GenServer.call(todo_server, {:entries, date})
  end

  def init(_), do: {:ok, TodoList.new()}

  def handle_cast({:add, entry}, todo_list) do
    {:noreply, TodoList.add_entry(todo_list, entry)}
  end

  def handle_cast({:update, entry_id, updater_fun}, todo_list) do
    {:noreply, TodoList.update_entry(todo_list, entry_id, updater_fun)}
  end

  def handle_cast({:delete, entry_id}, todo_list) do
    {:noreply, TodoList.delete_entry(todo_list, entry_id)}
  end

  def handle_call({:entries, date}, _, todo_list) do
    {:reply, TodoList.entries(todo_list, date), todo_list}
  end
end

{:ok, todo_server} = RealTodoServer.start()
RealTodoServer.add_entry(
   todo_server,
   %{date: ~D[2023-12-19], title: "Dentist"}
 )

RealTodoServer.add_entry(
   todo_server,
   %{date: ~D[2023-12-20], title: "Shopping"}
 )

RealTodoServer.add_entry(
   todo_server,
   %{date: ~D[2023-12-19], title: "Movies"}
 )

[
  %Entry{id: 1, date: ~D[2023-12-19], title: "Dentist"},
  %Entry{id: 3, date: ~D[2023-12-19], title: "Movies"}
] = RealTodoServer.entries(todo_server, ~D[2023-12-19])

RealTodoServer.update_entry(todo_server, 1, fn entry -> %Entry{entry | title: "DentistUpdated"} end)

[
  %Entry{id: 1, date: ~D[2023-12-19], title: "DentistUpdated"},
  %Entry{id: 3, date: ~D[2023-12-19], title: "Movies"}
] = RealTodoServer.entries(todo_server, ~D[2023-12-19])

RealTodoServer.delete_entry(todo_server, 1)
[
  %Entry{id: 3, date: ~D[2023-12-19], title: "Movies"}
] = RealTodoServer.entries(todo_server, ~D[2023-12-19])

:ok