Processes
Section
Process
A beam process is a lightweight process that takes a few kb or memory to run, they are different from OS Processes as these takes more memory.
Following code simulate a long running process
run_query = fn query_def ->
Process.sleep(2000)
"#{query_def} result"
end
Running one process will take 2s to complete
run_query.("query 1")
Running 5 process will take 10s to complete
Enum.map(
1..5,
fn index ->
query_def = "query #{index}"
run_query.(query_def)
end
)
To resolve this and make it a more scalable solution, we instanciate there queries into different processes
spawn(fn ->
query_result = run_query.("query 1")
IO.puts(query_result)
end)
Creating a helper lambda for concurent process
async_query =
fn query_def ->
spawn(
fn ->
query_result = run_query.(query_def)
IO.puts(query_result)
end
)
end
Now calling async_query will still take 2 seconds to complete, but the calculation is none blocking because it happens in separate process
async_query.("query 1")
Now calling async_query 5 times will result in the calculation stil taking only 2 seconds. this is because each call makes a separate process that all run at the same time.
Enum.each(1..5, &async_query.("query #{&1}"))
Message Passing
Sending messages to a process can be done by using the send function and providing the PID (Process ID)
send(self(), "a message")
Using receive do to pattern match the incoming message to predefined cases.
receive do
"a message" -> IO.puts("hello world")
message -> IO.inspect(message)
end
send(self(), {:message, 1})
receive do
{:message, id} -> IO.puts("received message #{id}")
end
If a pattern is not matched or no messages are recieved, the funtion waits indefinitely and blocks the process, we avoid this using after like follows. The process waits for the amount of time specified and excecute the other function. (here its IO.puts)
receive do
message -> IO.inspect(message)
after
5000 -> IO.puts("message not received")
end
COLLECTING QUERY RESULTS
Defining the code to run in separate processes, this code will return a string to the calling process, in this case the livebook process.
async_query =
fn query_def ->
caller = self()
spawn(
fn ->
query_result = run_query.(query_def)
send(caller, {:query_result, query_result})
end
)
end
Calling async_query 5 times, the resulting message is stored in the queue untill consumed.
Enum.each(1..5, &async_query.("query #{&1}"))
get_result =
fn ->
receive do
{:query_result, result} -> result
end
end
No getting all messages into a list.
results = Enum.map(1..5, fn _ -> get_result.() end)
The same could be achieved with piping
1..5
|> Enum.map(&async_query.("query #{&1}"))
|> Enum.map(fn _ -> get_result.() end)
Stateful server processes
These processes keep some king of data and are long lived, they go this by listening to messages and doing trail recustion.
defmodule DatabaseServer do
def start do
spawn(
fn ->
connection = :rand.uniform(1000)
loop(connection)
end
)
end
defp loop(connection) do
receive do
{:run_query, from_pid, query_def} ->
query_result = run_query(connection, query_def)
send(from_pid, {:query_result, query_result})
end
loop(connection)
end
defp run_query(connection, query_def) do
Process.sleep(2000)
"Connection #{connection}: #{query_def} result"
end
def get_result do
receive do
{:query_result, result} -> result
after
5000 -> {:error, :timeout}
end
end
def run_async(server_pid, query_def) do
send(server_pid, {:run_query, self(), query_def})
end
end
server_pid = DatabaseServer.start()
DatabaseServer.run_async(server_pid, "query 1")
DatabaseServer.get_result()
DatabaseServer.run_async(server_pid, "query 2")
DatabaseServer.get_result()
The state is kept between the requests (Connection id)
Changing the state is just as simple, passing a different state in the loop. for example passing a different state depending on recieved message.
Example
defmodule Calculator do
def start, do: spawn(fn -> loop(0) end)
def value(server_pid) do
send(server_pid, {:value, self()})
receive do
{:response, value} -> value
end
end
def add(server_pid, value), do: send(server_pid, {:add, value})
def sub(server_pid, value), do: send(server_pid, {:sub, value})
def mul(server_pid, value), do: send(server_pid, {:mul, value})
def div(server_pid, value), do: send(server_pid, {:div, value})
defp loop(current_value) do
new_value =
receive do
{:value, caller} ->
send(caller, {:response, current_value})
current_value
{:add, value} -> current_value + value
{:sub, value} -> current_value - value
{:mul, value} -> current_value * value
{:div, value} -> current_value / value
invalid_request ->
IO.puts("invalid request #{inspect invalid_request}")
current_value
end
loop(new_value)
end
end
server = Calculator.start()
Calculator.add(server,5)
Calculator.mul(server, 10)
Calculator.value(server)
Implementing the todolist with processes
defmodule TodoList1 do
defstruct next_id: 1, entries: %{}
def new(entries \\ []) do
Enum.reduce(entries, %TodoList1{}, &add_entry(&2, &1))
end
def add_entry(todo_list, entry) do
entry = Map.put(entry, :id, todo_list.next_id)
new_entries = Map.put(todo_list.entries, todo_list.next_id, entry)
%TodoList1{todo_list | entries: new_entries, next_id: todo_list.next_id + 1}
end
def entries(todo_list, date) do
todo_list.entries
|> Map.values()
|> Enum.filter(fn entry -> entry.date == date end)
end
def update_entry(todo_list, entry_id, updater_fun) do
case Map.fetch(todo_list.entries, entry_id) do
:error ->
todo_list
{:ok, old_entry} ->
new_entry = updater_fun.(old_entry)
new_entries = Map.put(todo_list.entries, new_entry.id, new_entry)
%TodoList1{todo_list | entries: new_entries}
end
end
def delete_entry(todo_list, entry_id) do
%TodoList1{todo_list | entries: Map.delete(todo_list.entries, entry_id)}
end
end
defmodule TaskServer do
def start, do: spawn(fn -> loop(TodoList1.new()) end)
defp loop(todo_list) do
new_todo_list =
receive do
message -> process_message(todo_list, message)
end
loop(new_todo_list)
end
def add_entry(todo_server, new_entry) do
send(todo_server, {:add_entry, new_entry})
end
defp process_message(todo_list, {:add_entry, new_entry}) do
TodoList1.add_entry(todo_list, new_entry)
end
defp process_message(todo_list, {:entries, caller, date}) do
send(caller, {:todo_entries, TodoList1.entries(todo_list, date)})
todo_list
end
def entries(todo_server, date) do
send(todo_server, {:entries, self(), date})
receive do
{:todo_entries, entries} -> entries
after
5000 -> {:error, :timeout}
end
end
end
When working with server processes, it’s important to always implement a catch all case to avoid messages accumulating in the mailbox and potentially degrade performance.
defp loop
receive do
{:message, msg} -> do_something(msg)
other -> warn_about_unknown_message(other)
end
loop()
end
Generic server processes
defmodule ServerProcess do
def start(callback_module) do
spawn(fn ->
initial_state = callback_module.init
loop(callback_module, initial_state)
end)
end
defp loop(callback_module, current_state) do
receive do
{:call, request, caller} ->
{response, new_state} = callback_module.handle_call(request, current_state)
send(caller, {:response, response})
loop(callback_module, new_state)
{:cast, request} ->
new_state = callback_module.handle_cast(request, current_state)
loop(callback_module, new_state)
end
end
def cast(server_pid, request), do: send(server_pid, {:cast, request})
def call(server_pid, request) do
send(server_pid, {:call, request, self()})
receive do
{:response, response} -> response
end
end
end
defmodule KeyValueStore do
def init, do: %{}
def start, do: ServerProcess.start(KeyValueStore)
def put(pid, key, value), do: ServerProcess.call(pid, {:put, key, value})
def get(pid, key), do: ServerProcess.call(pid, {:get, key})
def handle_call({:put, key, value}, state) do
{:ok, Map.put(state, key, value)}
end
def handle_call({:get, key}, state), do: {Map.get(state, key), state}
def cast(pid, key, value), do: ServerProcess.cast(pid, {:put, key, value})
def handle_cast({:put, key, value}, state), do: Map.put(state, key, value)
end
pid = ServerProcess.start(KeyValueStore)
ServerProcess.call(pid, {:put, :some_key, :some_value})
ServerProcess.call(pid, {:get, :some_key})
pid = KeyValueStore.start()
KeyValueStore.cast(pid, :some_key, :some_value)
KeyValueStore.get(pid, :some_key)
defmodule TodoServer do
def start, do: ServerProcess.start(TodoServer)
def add_entry(todo_server, new_entry) do
ServerProcess.cast(todo_server, {:add_entry, new_entry})
end
def entries(todo_server, date) do
ServerProcess.call(todo_server, {:entries, date})
end
def init, do: TodoList.new()
def handle_cast({:add_entry, new_entry}, todo_list) do
TodoList.add_entry(todo_list, new_entry)
end
def handle_call({:entries, date}, todo_list) do
{TodoList.entries(todo_list, date), todo_list}
end
end
defmodule TodoList do
defstruct next_id: 1, entries: %{}
def new(entries \\ []) do
Enum.reduce(
entries,
%TodoList{},
&add_entry(&2, &1)
)
end
def add_entry(todo_list, entry) do
entry = Map.put(entry, :id, todo_list.next_id)
new_entries = Map.put(todo_list.entries, todo_list.next_id, entry)
%TodoList{todo_list | entries: new_entries, next_id: todo_list.next_id + 1}
end
def entries(todo_list, date) do
todo_list.entries
|> Map.values()
|> Enum.filter(fn entry -> entry.date == date end)
end
def update_entry(todo_list, entry_id, updater_fun) do
case Map.fetch(todo_list.entries, entry_id) do
:error ->
todo_list
{:ok, old_entry} ->
new_entry = updater_fun.(old_entry)
new_entries = Map.put(todo_list.entries, new_entry.id, new_entry)
%TodoList{todo_list | entries: new_entries}
end
end
def delete_entry(todo_list, entry_id) do
%TodoList{todo_list | entries: Map.delete(todo_list.entries, entry_id)}
end
end
Using GenServer
GenServer serve the same purpose as ServerProcess as it hides some boilerplates code
defmodule KeyValueStore1 do
use GenServer
def start do
GenServer.start(KeyValueStore1, nil)
end
def put(pid, key, value) do
GenServer.cast(pid, {:put, key, value})
end
def get(pid, key) do
GenServer.call(pid, {:get, key})
end
def init(_) do
{:ok, %{}}
end
def handle_cast({:put, key, value}, state) do
{:noreply, Map.put(state, key, value)}
end
def handle_call({:get, key}, _, state) do
{:reply, Map.get(state, key), state}
end
end
{:ok, pid} = KeyValueStore1.start()
KeyValueStore1.put(pid, :some_key_1, "Hellow world")
KeyValueStore1.get(pid, :some_key_1)
Compile time check, ensure that the functions declared satisfy the contract in the imported module in this case GenServer. It shows a warning on compile time telling that the funtion is not well implemented.
defmodule EchoServer do
use GenServer
@impl GenServer
def handle_call(some_request, server_state) do
{:reply, some_request, server_state}
end
end
Exercise
Implementing Todo Server with GenServer
defmodule TodoGenServer do
use GenServer
def start, do: GenServer.start(TodoGenServer, nil)
def add_entry(todo_server, new_entry) do
GenServer.cast(todo_server, {:add_entry, new_entry})
end
def entries(todo_server, date) do
GenServer.call(todo_server, {:entries, date})
end
@impl GenServer
def init(_), do: {:ok, TodoList2.new()}
@impl GenServer
def handle_cast({:add_entry, new_entry}, todo_list) do
new_state = TodoList2.add_entry(todo_list, new_entry)
{:noreply, new_state}
end
@impl GenServer
def handle_call({:entries, date}, _from, todo_list) do
{:reply, TodoList2.entries(todo_list, date), todo_list}
end
end
defmodule TodoList2 do
defstruct next_id: 1, entries: %{}
def new(entries \\ []) do
Enum.reduce(
entries,
%TodoList2{},
&add_entry(&2, &1)
)
end
def add_entry(todo_list, entry) do
entry = Map.put(entry, :id, todo_list.next_id)
new_entries = Map.put(todo_list.entries, todo_list.next_id, entry)
%TodoList2{todo_list | entries: new_entries, next_id: todo_list.next_id + 1}
end
def entries(todo_list, date) do
todo_list.entries
|> Map.values()
|> Enum.filter(fn entry -> entry.date == date end)
end
def update_entry(todo_list, entry_id, updater_fun) do
case Map.fetch(todo_list.entries, entry_id) do
:error ->
todo_list
{:ok, old_entry} ->
new_entry = updater_fun.(old_entry)
new_entries = Map.put(todo_list.entries, new_entry.id, new_entry)
%TodoList2{todo_list | entries: new_entries}
end
end
def delete_entry(todo_list, entry_id) do
%TodoList2{todo_list | entries: Map.delete(todo_list.entries, entry_id)}
end
end
{:ok, pid} = TodoGenServer.start()
GenServer.cast(pid, {:add_entry, %{date: Date.new(2024,05,03), title: "Gayaaaaaa"}})
GenServer.call(pid, {:entries, Date.new(2024,05,03)})