Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Processes

_2024_08_22_elixir/Concurrency.livemd

Processes

Section

Process

A beam process is a lightweight process that takes a few kb or memory to run, they are different from OS Processes as these takes more memory.

Following code simulate a long running process

run_query = fn query_def ->
  Process.sleep(2000)
  "#{query_def} result"
end

Running one process will take 2s to complete

run_query.("query 1")

Running 5 process will take 10s to complete

Enum.map(
  1..5,
  fn index ->
    query_def = "query #{index}"
    run_query.(query_def)
  end
)

To resolve this and make it a more scalable solution, we instanciate there queries into different processes

spawn(fn ->
  query_result = run_query.("query 1")
  IO.puts(query_result)
end)

Creating a helper lambda for concurent process

async_query =
  fn query_def ->
    spawn(
      fn ->
        query_result = run_query.(query_def)
        IO.puts(query_result)
      end
    )
  end

Now calling async_query will still take 2 seconds to complete, but the calculation is none blocking because it happens in separate process

async_query.("query 1")

Now calling async_query 5 times will result in the calculation stil taking only 2 seconds. this is because each call makes a separate process that all run at the same time.

Enum.each(1..5, &async_query.("query #{&1}"))

Message Passing

Sending messages to a process can be done by using the send function and providing the PID (Process ID)

send(self(), "a message")

Using receive do to pattern match the incoming message to predefined cases.

receive do
  "a message" -> IO.puts("hello world")
  message -> IO.inspect(message)
end
 
send(self(), {:message, 1})
receive do
  {:message, id} -> IO.puts("received message #{id}")
end

If a pattern is not matched or no messages are recieved, the funtion waits indefinitely and blocks the process, we avoid this using after like follows. The process waits for the amount of time specified and excecute the other function. (here its IO.puts)

receive do
  message -> IO.inspect(message)
after
  5000 -> IO.puts("message not received")
end

COLLECTING QUERY RESULTS

Defining the code to run in separate processes, this code will return a string to the calling process, in this case the livebook process.

async_query =
  fn query_def ->
    caller = self()
    spawn(
      fn ->
        query_result = run_query.(query_def)
        send(caller, {:query_result, query_result})
      end
    )
  end

Calling async_query 5 times, the resulting message is stored in the queue untill consumed.

Enum.each(1..5, &async_query.("query #{&1}"))
get_result =
  fn ->
    receive do
      {:query_result, result} -> result
    end
  end

No getting all messages into a list.

results = Enum.map(1..5, fn _ -> get_result.() end)

The same could be achieved with piping

1..5
  |> Enum.map(&async_query.("query #{&1}"))
  |> Enum.map(fn _ -> get_result.() end)

Stateful server processes

These processes keep some king of data and are long lived, they go this by listening to messages and doing trail recustion.

defmodule DatabaseServer do
  def start do
    spawn(
      fn ->
        connection = :rand.uniform(1000)
        loop(connection)
      end
    )
  end
  defp loop(connection) do
    receive do
      {:run_query, from_pid, query_def} ->
        query_result = run_query(connection, query_def)
        send(from_pid, {:query_result, query_result})
    end
    loop(connection)
  end
  defp run_query(connection, query_def) do
    Process.sleep(2000)
    "Connection #{connection}: #{query_def} result"
  end
  def get_result do
    receive do
      {:query_result, result} -> result
    after
      5000 -> {:error, :timeout}
    end
  end
  def run_async(server_pid, query_def) do
    send(server_pid, {:run_query, self(), query_def})
  end
end
server_pid = DatabaseServer.start()
DatabaseServer.run_async(server_pid, "query 1")
DatabaseServer.get_result()
DatabaseServer.run_async(server_pid, "query 2")
DatabaseServer.get_result()

The state is kept between the requests (Connection id)

Changing the state is just as simple, passing a different state in the loop. for example passing a different state depending on recieved message.

Example

defmodule Calculator do
  def start, do: spawn(fn -> loop(0) end)
  
  def value(server_pid) do
    send(server_pid, {:value, self()})
    receive do
      {:response, value} -> value
    end
  end
  
  def add(server_pid, value), do: send(server_pid, {:add, value})
  def sub(server_pid, value), do: send(server_pid, {:sub, value})
  def mul(server_pid, value), do: send(server_pid, {:mul, value})
  def div(server_pid, value), do: send(server_pid, {:div, value})
  
  defp loop(current_value) do
    new_value =
      receive do
        {:value, caller} ->
          send(caller, {:response, current_value})
          current_value
        {:add, value} -> current_value + value
        {:sub, value} -> current_value - value
        {:mul, value} -> current_value * value
        {:div, value} -> current_value / value
        invalid_request ->
          IO.puts("invalid request #{inspect invalid_request}")
        current_value
      end
    loop(new_value)
  end

end
server = Calculator.start()
Calculator.add(server,5)
Calculator.mul(server, 10)
Calculator.value(server)

Implementing the todolist with processes

defmodule TodoList1 do
  defstruct next_id: 1, entries: %{}
  def new(entries \\ []) do
    Enum.reduce(entries, %TodoList1{}, &add_entry(&2, &1))
  end
  def add_entry(todo_list, entry) do
    entry = Map.put(entry, :id, todo_list.next_id)
    new_entries = Map.put(todo_list.entries, todo_list.next_id, entry)
    %TodoList1{todo_list | entries: new_entries, next_id: todo_list.next_id + 1}
  end
  def entries(todo_list, date) do
    todo_list.entries
    |> Map.values()
    |> Enum.filter(fn entry -> entry.date == date end)
  end
  def update_entry(todo_list, entry_id, updater_fun) do
    case Map.fetch(todo_list.entries, entry_id) do
      :error -> 
        todo_list
      {:ok, old_entry} -> 
        new_entry = updater_fun.(old_entry)
        new_entries = Map.put(todo_list.entries, new_entry.id, new_entry)
        %TodoList1{todo_list | entries: new_entries}
    end
  end
  def delete_entry(todo_list, entry_id) do
    %TodoList1{todo_list | entries: Map.delete(todo_list.entries, entry_id)} 
  end
end
defmodule TaskServer do
  def start, do: spawn(fn -> loop(TodoList1.new()) end)
  
  defp loop(todo_list) do
    new_todo_list =
      receive do
        message -> process_message(todo_list, message)
      end
    loop(new_todo_list)
  end
  
  def add_entry(todo_server, new_entry) do
    send(todo_server, {:add_entry, new_entry})
  end
  
  defp process_message(todo_list, {:add_entry, new_entry}) do
    TodoList1.add_entry(todo_list, new_entry)
  end
  defp process_message(todo_list, {:entries, caller, date}) do
    send(caller, {:todo_entries, TodoList1.entries(todo_list, date)})
    todo_list
  end
  
  def entries(todo_server, date) do
    send(todo_server, {:entries, self(), date})
    receive do
      {:todo_entries, entries} -> entries
    after
      5000 -> {:error, :timeout}
    end
  end
  
 
end

When working with server processes, it’s important to always implement a catch all case to avoid messages accumulating in the mailbox and potentially degrade performance.

defp loop
  receive do
    {:message, msg} -> do_something(msg)
    other -> warn_about_unknown_message(other)
  end
  loop()
end

Generic server processes

defmodule ServerProcess do
  def start(callback_module) do
    spawn(fn ->
      initial_state = callback_module.init
      loop(callback_module, initial_state)
    end)
  end

  defp loop(callback_module, current_state) do
    receive do
      {:call, request, caller} ->
        {response, new_state} = callback_module.handle_call(request, current_state)
        send(caller, {:response, response})
        loop(callback_module, new_state)

      {:cast, request} ->
        new_state = callback_module.handle_cast(request, current_state)
        loop(callback_module, new_state)
    end
  end
  
  def cast(server_pid, request), do: send(server_pid, {:cast, request})

  def call(server_pid, request) do
    send(server_pid, {:call, request, self()})
    receive do
      {:response, response} -> response
    end
  end
end

defmodule KeyValueStore do
  def init, do: %{}
  def start, do: ServerProcess.start(KeyValueStore)
  def put(pid, key, value), do: ServerProcess.call(pid, {:put, key, value})
  def get(pid, key), do: ServerProcess.call(pid, {:get, key})
  def handle_call({:put, key, value}, state) do
    {:ok, Map.put(state, key, value)}
  end
  def handle_call({:get, key}, state), do: {Map.get(state, key), state}
  def cast(pid, key, value), do: ServerProcess.cast(pid, {:put, key, value})
  def handle_cast({:put, key, value}, state), do: Map.put(state, key, value)
end
 pid = ServerProcess.start(KeyValueStore)
ServerProcess.call(pid, {:put, :some_key, :some_value})
ServerProcess.call(pid, {:get, :some_key})
pid = KeyValueStore.start()
KeyValueStore.cast(pid, :some_key, :some_value)
 KeyValueStore.get(pid, :some_key)
defmodule TodoServer do
  def start, do: ServerProcess.start(TodoServer)

  def add_entry(todo_server, new_entry) do
    ServerProcess.cast(todo_server, {:add_entry, new_entry})
  end

  def entries(todo_server, date) do
    ServerProcess.call(todo_server, {:entries, date})
  end

  def init, do: TodoList.new()

  def handle_cast({:add_entry, new_entry}, todo_list) do
    TodoList.add_entry(todo_list, new_entry)
  end

  def handle_call({:entries, date}, todo_list) do
    {TodoList.entries(todo_list, date), todo_list}
  end
end

defmodule TodoList do
  defstruct next_id: 1, entries: %{}

  def new(entries \\ []) do
    Enum.reduce(
      entries,
      %TodoList{},
      &add_entry(&2, &1)
    )
  end

  def add_entry(todo_list, entry) do
    entry = Map.put(entry, :id, todo_list.next_id)
    new_entries = Map.put(todo_list.entries, todo_list.next_id, entry)

    %TodoList{todo_list | entries: new_entries, next_id: todo_list.next_id + 1}
  end

  def entries(todo_list, date) do
    todo_list.entries
    |> Map.values()
    |> Enum.filter(fn entry -> entry.date == date end)
  end

  def update_entry(todo_list, entry_id, updater_fun) do
    case Map.fetch(todo_list.entries, entry_id) do
      :error ->
        todo_list

      {:ok, old_entry} ->
        new_entry = updater_fun.(old_entry)
        new_entries = Map.put(todo_list.entries, new_entry.id, new_entry)
        %TodoList{todo_list | entries: new_entries}
    end
  end

  def delete_entry(todo_list, entry_id) do
    %TodoList{todo_list | entries: Map.delete(todo_list.entries, entry_id)}
  end
end

Using GenServer

GenServer serve the same purpose as ServerProcess as it hides some boilerplates code

defmodule KeyValueStore1 do
  use GenServer
  def start do
    GenServer.start(KeyValueStore1, nil)
  end
  def put(pid, key, value) do
    GenServer.cast(pid, {:put, key, value})
  end
  def get(pid, key) do
    GenServer.call(pid, {:get, key})
  end
  def init(_) do
    {:ok, %{}}
  end
  def handle_cast({:put, key, value}, state) do
    {:noreply, Map.put(state, key, value)}
  end
  def handle_call({:get, key}, _, state) do
    {:reply, Map.get(state, key), state}
  end
end
 {:ok, pid} = KeyValueStore1.start()
KeyValueStore1.put(pid, :some_key_1, "Hellow world")
 KeyValueStore1.get(pid, :some_key_1)

Compile time check, ensure that the functions declared satisfy the contract in the imported module in this case GenServer. It shows a warning on compile time telling that the funtion is not well implemented.

defmodule EchoServer do
  use GenServer
  
  @impl GenServer
  def handle_call(some_request, server_state) do
    {:reply, some_request, server_state}
  end
end

Exercise

Implementing Todo Server with GenServer

defmodule TodoGenServer do
  use GenServer

  def start, do: GenServer.start(TodoGenServer, nil)

  def add_entry(todo_server, new_entry) do
    GenServer.cast(todo_server, {:add_entry, new_entry})
  end

  def entries(todo_server, date) do
    GenServer.call(todo_server, {:entries, date})
  end
  
  @impl GenServer
  def init(_), do: {:ok, TodoList2.new()} 

  @impl GenServer
  def handle_cast({:add_entry, new_entry}, todo_list) do
    new_state = TodoList2.add_entry(todo_list, new_entry)
    {:noreply, new_state}
  end

  @impl GenServer
  def handle_call({:entries, date}, _from, todo_list) do
    {:reply, TodoList2.entries(todo_list, date), todo_list}
  end
end

defmodule TodoList2 do
  defstruct next_id: 1, entries: %{}

  def new(entries \\ []) do
    Enum.reduce(
      entries,
      %TodoList2{},
      &add_entry(&2, &1)
    )
  end

  def add_entry(todo_list, entry) do
    entry = Map.put(entry, :id, todo_list.next_id)
    new_entries = Map.put(todo_list.entries, todo_list.next_id, entry)

    %TodoList2{todo_list | entries: new_entries, next_id: todo_list.next_id + 1}
  end

  def entries(todo_list, date) do
    todo_list.entries
    |> Map.values()
    |> Enum.filter(fn entry -> entry.date == date end)
  end

  def update_entry(todo_list, entry_id, updater_fun) do
    case Map.fetch(todo_list.entries, entry_id) do
      :error ->
        todo_list

      {:ok, old_entry} ->
        new_entry = updater_fun.(old_entry)
        new_entries = Map.put(todo_list.entries, new_entry.id, new_entry)
        %TodoList2{todo_list | entries: new_entries}
    end
  end

  def delete_entry(todo_list, entry_id) do
    %TodoList2{todo_list | entries: Map.delete(todo_list.entries, entry_id)}
  end
end
{:ok, pid} = TodoGenServer.start()
GenServer.cast(pid, {:add_entry, %{date: Date.new(2024,05,03), title: "Gayaaaaaa"}})
GenServer.call(pid, {:entries, Date.new(2024,05,03)})