GenServers
Section 1: Introduction to Processes and State in Elixir
1.1 How Processes Hold State
Example: Simple Counter Process
defmodule Counter do
def start(initial_value) do
spawn(fn -> loop(initial_value) end)
end
defp loop(current_value) do
receive do
{:increment, caller} ->
new_value = current_value + 1
send(caller, {:ok, new_value})
loop(new_value)
{:get, caller} ->
send(caller, {:ok, current_value})
loop(current_value)
end
end
end
counter = Counter.start(0)
Usage:
# Increment the counter
send(counter, {:increment, self()})
receive do
{:ok, new_value} -> IO.puts("Counter incremented to #{new_value}")
end
# Get the current value
send(counter, {:get, self()})
receive do
{:ok, value} -> IO.puts("Current counter value is #{value}")
end
Cleaner API
defmodule Counter2 do
def start(initial_value) do
spawn(fn -> loop(initial_value) end)
end
def increment(counter) do
send(counter, {:increment, self()})
receive do
{:ok, new_value} -> {:ok, new_value}
end
end
def get(counter) do
send(counter, {:get, self()})
receive do
{:ok, value} -> value
end
end
defp loop(current_value) do
receive do
{:increment, caller} ->
new_value = current_value + 1
send(caller, {:ok, new_value})
loop(new_value)
{:get, caller} ->
send(caller, {:ok, current_value})
loop(current_value)
end
end
end
counter = Counter2.start(0)
# Increment the counter
{:ok, new_value} = Counter2.increment(counter)
IO.puts("Counter incremented to #{new_value}")
# Get the current value
current_val = Counter2.get(counter)
IO.puts("Current counter value is #{current_val}")
In this example, we define a Counter module that starts a new process holding an initial value. The process waits for messages to increment the counter or retrieve its current value. The state (current_value) is maintained within the loop/1 function.
1.2 Recursive Functions for State Management
Example: Simple Accumulator Process
defmodule Accumulator do
def start() do
spawn(fn -> loop(0) end)
end
defp loop(total) do
receive do
{:add, value} ->
new_total = total + value
loop(new_total)
{:subtract, value} ->
new_total = total - value
loop(new_total)
{:total, caller} ->
send(caller, {:total, total})
loop(total)
end
end
end
Usage:
acc = Accumulator.start()
# Add values
send(acc, {:add, 10})
send(acc, {:add, 5})
# Subtract a value
send(acc, {:subtract, 3})
# Get the total
send(acc, {:total, self()})
receive do
{:total, total} -> IO.puts("Total is #{total}")
end
Here, the Accumulator process uses a recursive loop/1 function to keep its state (total) updated. Each time it receives a message to add or subtract, it calculates the new total and recursively calls itself with the updated state.
1.3 Limitations of Basic Processes
Example: Key-Value Store with Minimal Error Handling
defmodule KeyValueStore do
def start() do
spawn(fn -> loop(%{}) end)
end
defp loop(state) do
receive do
{:put, key, value} ->
new_state = Map.put(state, key, value)
loop(new_state)
{:get, key, caller} ->
value = Map.get(state, key, :not_found)
send(caller, {key, value})
loop(state)
{:delete, key} ->
new_state = Map.delete(state, key)
loop(new_state)
:stop ->
:ok
_unexpected ->
IO.puts("Received an unexpected message")
loop(state)
end
end
end
Usage:
store = KeyValueStore.start()
# Put values
send(store, {:put, :foo, 42})
send(store, {:put, :bar, "hello"})
# Get a value
send(store, {:get, :foo, self()})
receive do
{:foo, value} -> IO.puts("Value of :foo is #{value}")
end
# Delete a key
send(store, {:delete, :bar})
# Attempt to get a deleted key
send(store, {:get, :bar, self()})
receive do
{:bar, value} -> IO.puts("Value of :bar is #{value}")
end
# Stop the process
send(store, :stop)
Limitations Demonstrated:
-
Boilerplate Code: Each new process requires setting up
spawn,receive, and recursive loops manually. - Error Handling: There’s minimal error handling. Unexpected messages are only logged, and the process continues.
- No Supervision: If the process crashes, there’s no mechanism to restart it automatically.
Section 2: Building Generic Implementations
2.1 Abstracting Common Patterns
Example: Creating a Generic Server Module
In the previous examples, we noticed repetitive patterns such as setting up a loop, handling messages, and maintaining state. We can abstract these common patterns into a generic server module.
Generic Server Template:
defmodule GenericServer1 do
def start(callback_module, initial_state) do
spawn(fn -> loop(callback_module, initial_state) end)
end
defp loop(callback_module, state) do
receive do
message ->
case callback_module.handle_message(message, state) do
{:noreply, new_state} ->
loop(callback_module, new_state)
{:reply, caller, new_state} ->
send(caller, {:ok, new_state})
loop(callback_module, new_state)
end
end
end
end
Defining a Callback Module:
defmodule CounterServer do
def handle_message(:increment, state) do
new_state = state + 1
{:noreply, new_state}
end
def handle_message({:get, caller}, state) do
{:reply, caller, state}
end
end
counter = GenericServer1.start(CounterServer, 0)
Usage:
# Increment the counter
send(counter, :increment)
# Get the current value
send(counter, {:get, self()})
receive do
{:ok, value} -> IO.puts("Counter value is #{value}")
end
Explanation:
- GenericServer Module: Acts as a generic process that delegates message handling to a callback module.
-
Callback Module (
CounterServer): Implements thehandle_message/2function to define specific behaviors. - Abstraction Benefit: We eliminate repetitive code by having a generic loop and delegate specific logic to callback modules.
2.2 Implementing Generic Servers
Example: Building a More Robust Generic Server with API Functions
We can enhance our generic server to include API functions for starting the server and sending messages, providing a cleaner interface.
Enhanced Generic Server:
defmodule GenericServer2 do
def start(callback_module, initial_state) do
pid = spawn(fn -> loop(callback_module, initial_state) end)
{:ok, pid}
end
def call(pid, request) do
send(pid, {self(), request})
receive do
{:response, response} -> response
end
end
def cast(pid, request) do
send(pid, {:noreply, request})
:ok
end
defp loop(callback_module, state) do
receive do
{:noreply, request} ->
new_state = callback_module.handle_cast(request, state)
loop(callback_module, new_state)
{caller, request} ->
{response, new_state} = callback_module.handle_call(request, state)
send(caller, {:response, response})
loop(callback_module, new_state)
end
end
end
Defining the Callback Module:
defmodule KeyValueStoreServer do
def handle_call({:get, key}, state) do
value = Map.get(state, key, :not_found)
{value, state}
end
def handle_call({:put, key, value}, state) do
new_state = Map.put(state, key, value)
{:ok, new_state}
end
def handle_cast({:delete, key}, state) do
Map.delete(state, key)
end
end
Usage:
{:ok, store} = GenericServer2.start(KeyValueStoreServer, %{})
# Put a value
GenericServer2.call(store, {:put, :foo, 42})
# Get a value
value = GenericServer2.call(store, {:get, :foo})
IO.puts("Value of :foo is #{value}")
# Delete a key asynchronously
GenericServer2.cast(store, {:delete, :foo})
# Try to get the deleted key
value = GenericServer2.call(store, {:get, :foo})
IO.puts("Value of :foo after deletion is #{value}")
Explanation:
-
API Functions (
call/2andcast/2): Provide synchronous and asynchronous interfaces to interact with the server. -
Callback Module (
KeyValueStoreServer): Defines how to handle synchronous calls (handle_call/2) and asynchronous casts (handle_cast/2). - Improved Abstraction: This implementation closely mirrors how GenServer works in Elixir, offering a more standardized way to build servers.
2.3 Benefits of Code Reuse and Standardization
Example: Reusing the Generic Server for Different Modules
By standardizing our server implementation, we can easily create different servers for various purposes without rewriting the boilerplate code.
Defining a Stack Server:
defmodule StackServer do
def handle_call(:pop, [head | tail]) do
{head, tail}
end
def handle_call(:pop, []) do
{nil, []}
end
def handle_call({:push, value}, state) do
{:ok, [value | state]}
end
def handle_cast(:clear, _state) do
[]
end
end
Usage:
{:ok, stack} = GenericServer2.start(StackServer, [])
# Push values onto the stack
GenericServer2.call(stack, {:push, 1})
GenericServer2.call(stack, {:push, 2})
GenericServer2.call(stack, {:push, 3})
# Pop a value
value = GenericServer2.call(stack, :pop)
IO.puts("Popped value is #{value}")
# Clear the stack
GenericServer2.cast(stack, :clear)
# Try to pop from an empty stack
value = GenericServer2.call(stack, :pop)
IO.puts("Popped value from empty stack is #{inspect(value)}")
Explanation:
-
Code Reuse: We use the same
GenericServermodule to implement a stack server by just providing a new callback module (StackServer). -
Standardization Benefits:
- Consistency: All servers follow the same pattern, making the codebase easier to understand and maintain.
- Interchangeability: We can swap out modules or update implementations without affecting the overall structure.
- Reduced Errors: With a standard pattern, there’s less room for mistakes that commonly occur with boilerplate code.
Summary of Benefits:
- Efficiency: Developers can focus on the business logic rather than boilerplate code.
- Maintainability: Standardized code is easier to read, debug, and update.
- Scalability: Adding new servers or functionalities becomes straightforward, facilitating growth.
By abstracting common patterns and implementing generic servers, we harness the power of code reuse and standardization, leading to more robust and maintainable applications.
Section 3: Deep Dive into GenServer
3.1 GenServer Anatomy and Callbacks
Example: Implementing a GenServer for a Key-Value Store
The GenServer module provides a generic server implementation that abstracts the common patterns of stateful processes. We’ll implement a simple key-value store using GenServer to explore its anatomy and callbacks.
Defining the GenServer Module:
defmodule KVStore do
use GenServer
# Client API
def start_link(initial_state \\ %{}) do
GenServer.start_link(__MODULE__, initial_state, name: __MODULE__)
end
def put(key, value) do
GenServer.cast(__MODULE__, {:put, key, value})
end
def get(key) do
GenServer.call(__MODULE__, {:get, key})
end
# Server (Callbacks)
@impl true
def init(initial_state) do
# The init callback initializes the server state
{:ok, initial_state}
end
@impl true
def handle_call({:get, key}, _from, state) do
# Synchronously handle the :get request
value = Map.get(state, key, :not_found)
{:reply, value, state}
end
@impl true
def handle_cast({:put, key, value}, state) do
# Asynchronously handle the :put request
new_state = Map.put(state, key, value)
{:noreply, new_state}
end
end
Explanation:
-
use GenServer: Imports default implementations and necessary functions. -
Client API Functions:
-
start_link/1: Starts the GenServer process. -
put/2: Sends an asynchronouscastmessage. -
get/1: Sends a synchronouscallmessage.
-
-
Server Callbacks:
-
init/1: Initializes the server state. -
handle_call/3: Handles synchronouscallmessages. -
handle_cast/2: Handles asynchronouscastmessages.
-
-
Annotations (
@impl true): Optional but recommended, helps tools like Dialyzer ensure you’re correctly implementing callbacks.
Usage:
# Start the GenServer
{:ok, _pid} = KVStore.start_link()
# Put values into the store
KVStore.put(:foo, 42)
KVStore.put(:bar, "hello")
# Get values from the store
value_foo = KVStore.get(:foo)
IO.puts("Value of :foo is #{value_foo}")
value_bar = KVStore.get(:bar)
IO.puts("Value of :bar is #{value_bar}")
3.2 Synchronous vs. Asynchronous Calls
Example: Demonstrating call (Synchronous) vs. cast (Asynchronous)
We’ll modify the KVStore GenServer to include a delay in the handle_cast/2 function to simulate a time-consuming operation and observe the differences between call and cast.
Updating the handle_cast/2 Callback:
defmodule KVStoreDummy1 do
@impl true
def handle_cast({:put, key, value}, state) do
# Simulate a time-consuming task
:timer.sleep(2000)
new_state = Map.put(state, key, value)
{:noreply, new_state}
end
end
Testing Synchronous call with Delay:
defmodule KVStoreDummy2 do
def put_sync(key, value) do
GenServer.call(__MODULE__, {:put_sync, key, value})
end
@impl true
def handle_call({:put_sync, key, value}, _from, state) do
# Simulate a time-consuming task
:timer.sleep(2000)
new_state = Map.put(state, key, value)
{:reply, :ok, new_state}
end
end
Putting it all together:
defmodule KVStore2 do
use GenServer
# Client API
def start_link(initial_state \\ %{}) do
GenServer.start_link(__MODULE__, initial_state, name: __MODULE__)
end
def put(key, value) do
GenServer.cast(__MODULE__, {:put, key, value})
end
def put_sync(key, value) do
GenServer.call(__MODULE__, {:put_sync, key, value})
end
def get(key) do
GenServer.call(__MODULE__, {:get, key})
end
# Server (Callbacks)
@impl true
def init(initial_state) do
# The init callback initializes the server state
{:ok, initial_state}
end
@impl true
def handle_call({:get, key}, _from, state) do
# Synchronously handle the :get request
value = Map.get(state, key, :not_found)
{:reply, value, state}
end
@impl true
def handle_call({:put_sync, key, value}, _from, state) do
# Simulate a time-consuming task
:timer.sleep(2000)
new_state = Map.put(state, key, value)
{:reply, :ok, new_state}
end
@impl true
def handle_cast({:put, key, value}, state) do
# Simulate a time-consuming task
:timer.sleep(2000)
new_state = Map.put(state, key, value)
{:noreply, new_state}
end
end
Usage:
# Start the GenServer
{:ok, _pid} = KVStore2.start_link()
# Time the asynchronous `cast` operation
start_time = :os.system_time(:millisecond)
KVStore2.put(:foo, 42)
end_time = :os.system_time(:millisecond)
IO.puts("Asynchronous put took #{end_time - start_time} milliseconds")
# Time the synchronous `call` operation
start_time_sync = :os.system_time(:millisecond)
KVStore2.put_sync(:bar, "hello")
end_time_sync = :os.system_time(:millisecond)
IO.puts("Synchronous put_sync took #{end_time_sync - start_time_sync} milliseconds")
Explanation:
-
Asynchronous
cast:-
The
put/2function usesGenServer.cast/2, which sends a message and returns immediately. -
The delay in
handle_cast/2doesn’t block the caller.
-
The
-
Synchronous
call:-
The
put_sync/2function usesGenServer.call/2, which sends a message and waits for a reply. -
The delay in
handle_call/3blocks the caller until the operation completes.
-
The
-
Timing Results:
-
The asynchronous
castoperation completes almost instantly from the caller’s perspective. -
The synchronous
calloperation takes at least as long as the delay.
-
The asynchronous
3.3 Practical Examples and Best Practices
Example: Implementing a Rate-Limited Logger GenServer
We’ll create a GenServer that logs messages but limits the rate at which it writes to a file to avoid overwhelming the I/O system.
Defining the GenServer Module:
defmodule RateLimitedLogger do
use GenServer
# Client API
def start_link(log_file) do
GenServer.start_link(__MODULE__, log_file, name: __MODULE__)
end
def log(message) do
GenServer.cast(__MODULE__, {:log, message})
end
# Server (Callbacks)
@impl true
def init(log_file) do
{:ok, %{log_file: log_file, message_queue: [], timer: nil}}
end
@impl true
def handle_cast({:log, message}, state) do
new_queue = [message | state.message_queue]
# If there's no timer, set one to flush the messages later
new_state =
if state.timer == nil do
{:ok, timer} = :timer.send_after(1000, :flush)
%{state | message_queue: new_queue, timer: timer}
else
%{state | message_queue: new_queue}
end
{:noreply, new_state}
end
@impl true
def handle_info(:flush, state) do
# Write all messages to the log file
Enum.each(Enum.reverse(state.message_queue), fn msg ->
File.write!(state.log_file, "#{msg}\n", [:append])
end)
# Reset the message queue and timer
{:noreply, %{state | message_queue: [], timer: nil}}
end
end
Best Practices Demonstrated:
- Avoiding Bottlenecks: By batching log messages and writing them periodically, we reduce I/O overhead.
-
Using
handle_info/2: To handle messages that are notcallorcast, such as timer messages. - State Management: Keeping track of the timer and message queue within the server state.
Usage:
# Start the RateLimitedLogger
{:ok, _pid} = RateLimitedLogger.start_link("log.txt")
# Log multiple messages
RateLimitedLogger.log("Message 1")
RateLimitedLogger.log("Message 2")
RateLimitedLogger.log("Message 3")
# Wait to ensure messages are flushed
:timer.sleep(1500)
# Read and print the log file contents
{:ok, contents} = File.read("log.txt")
IO.puts("Log file contents:\n#{contents}")
Example: Implementing a GenServer with Supervision
To ensure fault tolerance, we’ll set up our GenServer under a Supervisor.
Supervisor Setup:
defmodule MyApp.Application do
use Application
def start(_type, _args) do
children = [
{KVStore, %{}},
{RateLimitedLogger, "log.txt"}
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end
Best Practices Demonstrated:
- Using Supervisors: To automatically restart child processes if they crash.
- Application Structure: Organizing processes under an OTP application for better management.
Explanation:
-
childrenList: Contains the worker specifications for the GenServers. -
strategy: :one_for_one: If a child process terminates, only that process is restarted. - Starting the Application: When the application starts, it also starts the Supervisor and its child processes.
Best Practices Summary:
-
Use
start_link/3Overstart/3:-
start_link/3creates a linked process, which is important for supervision trees.
-
-
Define a Clear Client API:
-
Encapsulate
callandcastoperations within module functions. - Provides a clean interface for interacting with the GenServer.
-
Encapsulate
-
Handle Unexpected Messages:
-
Implement
handle_info/2to deal with non-standard messages. - Prevents the GenServer from crashing due to unhandled messages.
-
Implement
-
Use Supervisors for Fault Tolerance:
- Place GenServers under Supervisors to automatically handle crashes.
-
Choose appropriate supervision strategies (
:one_for_one,:rest_for_one, etc.).
-
State Management:
- Keep the state immutable and pass it through callbacks.
- Use pattern matching to destructure and update state efficiently.
-
Avoid Blocking Operations in Callbacks:
- Offload long-running tasks to separate processes or handle asynchronously.
- Prevents the GenServer from becoming unresponsive.
Putting It All Together: A GenServer-Based Chat Room
Defining the Chat Room GenServer:
defmodule ChatRoom do
use GenServer
# Client API
def start_link(room_name) do
GenServer.start_link(__MODULE__, %{name: room_name, users: []}, name: via_tuple(room_name))
end
def join(room_name, user_pid) do
GenServer.cast(via_tuple(room_name), {:join, user_pid})
end
def leave(room_name, user_pid) do
GenServer.cast(via_tuple(room_name), {:leave, user_pid})
end
def send_message(room_name, message) do
GenServer.cast(via_tuple(room_name), {:message, message})
end
# Server (Callbacks)
@impl true
def init(state) do
{:ok, state}
end
@impl true
def handle_cast({:join, user_pid}, state) do
new_state = %{state | users: [user_pid | state.users]}
{:noreply, new_state}
end
@impl true
def handle_cast({:leave, user_pid}, state) do
new_state = %{state | users: List.delete(state.users, user_pid)}
{:noreply, new_state}
end
@impl true
def handle_cast({:message, message}, state) do
Enum.each(state.users, fn user ->
send(user, {:new_message, state.name, message})
end)
{:noreply, state}
end
# Helper function for via tuple
defp via_tuple(name) do
{:via, Registry, {:chat_room_registry, name}}
end
end
Setting Up a Registry for Named Processes:
defmodule MyOtherApp.Application do
use Application
def start(_type, _args) do
children = [
{Registry, keys: :unique, name: :chat_room_registry}
# Other workers and supervisors...
]
opts = [strategy: :one_for_one, name: MyOtherApp.Supervisor]
Supervisor.start_link(children, opts)
end
end
MyOtherApp.Application.start(:foo, nil)
Usage:
# Start the application (usually done automatically)
# Start a chat room
ChatRoom.start_link("lobby")
# Simulate user processes
self_pid = self()
spawn(fn ->
ChatRoom.join("lobby", self())
receive do
{:new_message, room, msg} ->
IO.puts("[#{room}] Received message: #{msg}")
end
end)
# Send a message to the chat room
ChatRoom.send_message("lobby", "Hello, world!")
Explanation:
-
Registry Usage:
- Allows processes to be registered and looked up by name.
- Enables multiple chat rooms identified by their room names.
-
Process Communication:
- Users receive messages asynchronously via their process mailbox.
-
Best Practices:
- Encapsulation of state and logic within the GenServer.
-
Use of
via_tuplefor dynamic process names. - Supervising the Registry and other processes.
Final Thoughts:
By leveraging GenServer and adhering to best practices, we can build robust, maintainable, and fault-tolerant applications in Elixir. The GenServer abstraction simplifies the complexities of process management, allowing developers to focus on implementing business logic.
Key Takeaways:
-
Understand Callbacks:
-
Properly implement
init/1,handle_call/3,handle_cast/2, andhandle_info/2.
-
Properly implement
-
Choose Between
callandcast:-
Use
callfor synchronous operations where a response is needed. -
Use
castfor asynchronous operations where no response is required.
-
Use
-
Implement Supervision Trees:
- Organize processes under supervisors for automatic fault recovery.
-
Optimize State Management:
- Keep state immutable and update it functionally.
-
Handle Errors Gracefully:
-
Use
try...catchor pattern matching to handle exceptions within callbacks.
-
Use
-
Document and Annotate:
-
Use
@impl trueand module documentation for better code clarity.
-
Use