Powered by AppSignal & Oban Pro

GenServers

livebooks/complete-talk.livemd

GenServers

Section 1: Introduction to Processes and State in Elixir


1.1 How Processes Hold State

Example: Simple Counter Process

defmodule Counter do
  def start(initial_value) do
    spawn(fn -> loop(initial_value) end)
  end

  defp loop(current_value) do
    receive do
      {:increment, caller} ->
        new_value = current_value + 1
        send(caller, {:ok, new_value})
        loop(new_value)

      {:get, caller} ->
        send(caller, {:ok, current_value})
        loop(current_value)
    end
  end
end
counter = Counter.start(0)

Usage:

# Increment the counter
send(counter, {:increment, self()})

receive do
  {:ok, new_value} -> IO.puts("Counter incremented to #{new_value}")
end

# Get the current value
send(counter, {:get, self()})

receive do
  {:ok, value} -> IO.puts("Current counter value is #{value}")
end

Cleaner API

defmodule Counter2 do
  def start(initial_value) do
    spawn(fn -> loop(initial_value) end)
  end

  def increment(counter) do
    send(counter, {:increment, self()})

    receive do
      {:ok, new_value} -> {:ok, new_value}
    end
  end

  def get(counter) do
    send(counter, {:get, self()})

    receive do
      {:ok, value} -> value
    end
  end

  defp loop(current_value) do
    receive do
      {:increment, caller} ->
        new_value = current_value + 1
        send(caller, {:ok, new_value})
        loop(new_value)

      {:get, caller} ->
        send(caller, {:ok, current_value})
        loop(current_value)
    end
  end
end
counter = Counter2.start(0)
# Increment the counter
{:ok, new_value} = Counter2.increment(counter)
IO.puts("Counter incremented to #{new_value}")

# Get the current value
current_val = Counter2.get(counter)
IO.puts("Current counter value is #{current_val}")

In this example, we define a Counter module that starts a new process holding an initial value. The process waits for messages to increment the counter or retrieve its current value. The state (current_value) is maintained within the loop/1 function.


1.2 Recursive Functions for State Management

Example: Simple Accumulator Process

defmodule Accumulator do
  def start() do
    spawn(fn -> loop(0) end)
  end

  defp loop(total) do
    receive do
      {:add, value} ->
        new_total = total + value
        loop(new_total)

      {:subtract, value} ->
        new_total = total - value
        loop(new_total)

      {:total, caller} ->
        send(caller, {:total, total})
        loop(total)
    end
  end
end

Usage:

acc = Accumulator.start()

# Add values
send(acc, {:add, 10})
send(acc, {:add, 5})

# Subtract a value
send(acc, {:subtract, 3})

# Get the total
send(acc, {:total, self()})

receive do
  {:total, total} -> IO.puts("Total is #{total}")
end

Here, the Accumulator process uses a recursive loop/1 function to keep its state (total) updated. Each time it receives a message to add or subtract, it calculates the new total and recursively calls itself with the updated state.


1.3 Limitations of Basic Processes

Example: Key-Value Store with Minimal Error Handling

defmodule KeyValueStore do
  def start() do
    spawn(fn -> loop(%{}) end)
  end

  defp loop(state) do
    receive do
      {:put, key, value} ->
        new_state = Map.put(state, key, value)
        loop(new_state)

      {:get, key, caller} ->
        value = Map.get(state, key, :not_found)
        send(caller, {key, value})
        loop(state)

      {:delete, key} ->
        new_state = Map.delete(state, key)
        loop(new_state)

      :stop ->
        :ok

      _unexpected ->
        IO.puts("Received an unexpected message")
        loop(state)
    end
  end
end

Usage:

store = KeyValueStore.start()

# Put values
send(store, {:put, :foo, 42})
send(store, {:put, :bar, "hello"})

# Get a value
send(store, {:get, :foo, self()})

receive do
  {:foo, value} -> IO.puts("Value of :foo is #{value}")
end

# Delete a key
send(store, {:delete, :bar})

# Attempt to get a deleted key
send(store, {:get, :bar, self()})

receive do
  {:bar, value} -> IO.puts("Value of :bar is #{value}")
end

# Stop the process
send(store, :stop)

Limitations Demonstrated:

  • Boilerplate Code: Each new process requires setting up spawn, receive, and recursive loops manually.
  • Error Handling: There’s minimal error handling. Unexpected messages are only logged, and the process continues.
  • No Supervision: If the process crashes, there’s no mechanism to restart it automatically.

Section 2: Building Generic Implementations

2.1 Abstracting Common Patterns

Example: Creating a Generic Server Module

In the previous examples, we noticed repetitive patterns such as setting up a loop, handling messages, and maintaining state. We can abstract these common patterns into a generic server module.

Generic Server Template:

defmodule GenericServer1 do
  def start(callback_module, initial_state) do
    spawn(fn -> loop(callback_module, initial_state) end)
  end

  defp loop(callback_module, state) do
    receive do
      message ->
        case callback_module.handle_message(message, state) do
          {:noreply, new_state} ->
            loop(callback_module, new_state)

          {:reply, caller, new_state} ->
            send(caller, {:ok, new_state})
            loop(callback_module, new_state)
        end
    end
  end
end

Defining a Callback Module:

defmodule CounterServer do
  def handle_message(:increment, state) do
    new_state = state + 1
    {:noreply, new_state}
  end

  def handle_message({:get, caller}, state) do
    {:reply, caller, state}
  end
end
counter = GenericServer1.start(CounterServer, 0)

Usage:

# Increment the counter
send(counter, :increment)

# Get the current value
send(counter, {:get, self()})

receive do
  {:ok, value} -> IO.puts("Counter value is #{value}")
end

Explanation:

  • GenericServer Module: Acts as a generic process that delegates message handling to a callback module.
  • Callback Module (CounterServer): Implements the handle_message/2 function to define specific behaviors.
  • Abstraction Benefit: We eliminate repetitive code by having a generic loop and delegate specific logic to callback modules.

2.2 Implementing Generic Servers

Example: Building a More Robust Generic Server with API Functions

We can enhance our generic server to include API functions for starting the server and sending messages, providing a cleaner interface.

Enhanced Generic Server:

defmodule GenericServer2 do
  def start(callback_module, initial_state) do
    pid = spawn(fn -> loop(callback_module, initial_state) end)
    {:ok, pid}
  end

  def call(pid, request) do
    send(pid, {self(), request})

    receive do
      {:response, response} -> response
    end
  end

  def cast(pid, request) do
    send(pid, {:noreply, request})
    :ok
  end

  defp loop(callback_module, state) do
    receive do
      {:noreply, request} ->
        new_state = callback_module.handle_cast(request, state)
        loop(callback_module, new_state)

      {caller, request} ->
        {response, new_state} = callback_module.handle_call(request, state)
        send(caller, {:response, response})
        loop(callback_module, new_state)
    end
  end
end

Defining the Callback Module:

defmodule KeyValueStoreServer do
  def handle_call({:get, key}, state) do
    value = Map.get(state, key, :not_found)
    {value, state}
  end

  def handle_call({:put, key, value}, state) do
    new_state = Map.put(state, key, value)
    {:ok, new_state}
  end

  def handle_cast({:delete, key}, state) do
    Map.delete(state, key)
  end
end

Usage:

{:ok, store} = GenericServer2.start(KeyValueStoreServer, %{})

# Put a value
GenericServer2.call(store, {:put, :foo, 42})

# Get a value
value = GenericServer2.call(store, {:get, :foo})
IO.puts("Value of :foo is #{value}")

# Delete a key asynchronously
GenericServer2.cast(store, {:delete, :foo})

# Try to get the deleted key
value = GenericServer2.call(store, {:get, :foo})
IO.puts("Value of :foo after deletion is #{value}")

Explanation:

  • API Functions (call/2 and cast/2): Provide synchronous and asynchronous interfaces to interact with the server.
  • Callback Module (KeyValueStoreServer): Defines how to handle synchronous calls (handle_call/2) and asynchronous casts (handle_cast/2).
  • Improved Abstraction: This implementation closely mirrors how GenServer works in Elixir, offering a more standardized way to build servers.

2.3 Benefits of Code Reuse and Standardization

Example: Reusing the Generic Server for Different Modules

By standardizing our server implementation, we can easily create different servers for various purposes without rewriting the boilerplate code.

Defining a Stack Server:

defmodule StackServer do
  def handle_call(:pop, [head | tail]) do
    {head, tail}
  end

  def handle_call(:pop, []) do
    {nil, []}
  end

  def handle_call({:push, value}, state) do
    {:ok, [value | state]}
  end

  def handle_cast(:clear, _state) do
    []
  end
end

Usage:

{:ok, stack} = GenericServer2.start(StackServer, [])

# Push values onto the stack
GenericServer2.call(stack, {:push, 1})
GenericServer2.call(stack, {:push, 2})
GenericServer2.call(stack, {:push, 3})

# Pop a value
value = GenericServer2.call(stack, :pop)
IO.puts("Popped value is #{value}")

# Clear the stack
GenericServer2.cast(stack, :clear)

# Try to pop from an empty stack
value = GenericServer2.call(stack, :pop)
IO.puts("Popped value from empty stack is #{inspect(value)}")

Explanation:

  • Code Reuse: We use the same GenericServer module to implement a stack server by just providing a new callback module (StackServer).
  • Standardization Benefits:
    • Consistency: All servers follow the same pattern, making the codebase easier to understand and maintain.
    • Interchangeability: We can swap out modules or update implementations without affecting the overall structure.
    • Reduced Errors: With a standard pattern, there’s less room for mistakes that commonly occur with boilerplate code.

Summary of Benefits:

  • Efficiency: Developers can focus on the business logic rather than boilerplate code.
  • Maintainability: Standardized code is easier to read, debug, and update.
  • Scalability: Adding new servers or functionalities becomes straightforward, facilitating growth.

By abstracting common patterns and implementing generic servers, we harness the power of code reuse and standardization, leading to more robust and maintainable applications.

Section 3: Deep Dive into GenServer

3.1 GenServer Anatomy and Callbacks

Example: Implementing a GenServer for a Key-Value Store

The GenServer module provides a generic server implementation that abstracts the common patterns of stateful processes. We’ll implement a simple key-value store using GenServer to explore its anatomy and callbacks.

Defining the GenServer Module:

defmodule KVStore do
  use GenServer

  # Client API

  def start_link(initial_state \\ %{}) do
    GenServer.start_link(__MODULE__, initial_state, name: __MODULE__)
  end

  def put(key, value) do
    GenServer.cast(__MODULE__, {:put, key, value})
  end

  def get(key) do
    GenServer.call(__MODULE__, {:get, key})
  end

  # Server (Callbacks)

  @impl true
  def init(initial_state) do
    # The init callback initializes the server state
    {:ok, initial_state}
  end

  @impl true
  def handle_call({:get, key}, _from, state) do
    # Synchronously handle the :get request
    value = Map.get(state, key, :not_found)
    {:reply, value, state}
  end

  @impl true
  def handle_cast({:put, key, value}, state) do
    # Asynchronously handle the :put request
    new_state = Map.put(state, key, value)
    {:noreply, new_state}
  end
end

Explanation:

  • use GenServer: Imports default implementations and necessary functions.
  • Client API Functions:
    • start_link/1: Starts the GenServer process.
    • put/2: Sends an asynchronous cast message.
    • get/1: Sends a synchronous call message.
  • Server Callbacks:
    • init/1: Initializes the server state.
    • handle_call/3: Handles synchronous call messages.
    • handle_cast/2: Handles asynchronous cast messages.
  • Annotations (@impl true): Optional but recommended, helps tools like Dialyzer ensure you’re correctly implementing callbacks.

Usage:

# Start the GenServer
{:ok, _pid} = KVStore.start_link()

# Put values into the store
KVStore.put(:foo, 42)
KVStore.put(:bar, "hello")

# Get values from the store
value_foo = KVStore.get(:foo)
IO.puts("Value of :foo is #{value_foo}")

value_bar = KVStore.get(:bar)
IO.puts("Value of :bar is #{value_bar}")

3.2 Synchronous vs. Asynchronous Calls

Example: Demonstrating call (Synchronous) vs. cast (Asynchronous)

We’ll modify the KVStore GenServer to include a delay in the handle_cast/2 function to simulate a time-consuming operation and observe the differences between call and cast.

Updating the handle_cast/2 Callback:

defmodule KVStoreDummy1 do
  @impl true
  def handle_cast({:put, key, value}, state) do
    # Simulate a time-consuming task
    :timer.sleep(2000)
    new_state = Map.put(state, key, value)
    {:noreply, new_state}
  end
end

Testing Synchronous call with Delay:

defmodule KVStoreDummy2 do
  def put_sync(key, value) do
    GenServer.call(__MODULE__, {:put_sync, key, value})
  end

  @impl true
  def handle_call({:put_sync, key, value}, _from, state) do
    # Simulate a time-consuming task
    :timer.sleep(2000)
    new_state = Map.put(state, key, value)
    {:reply, :ok, new_state}
  end
end

Putting it all together:

defmodule KVStore2 do
  use GenServer

  # Client API

  def start_link(initial_state \\ %{}) do
    GenServer.start_link(__MODULE__, initial_state, name: __MODULE__)
  end

  def put(key, value) do
    GenServer.cast(__MODULE__, {:put, key, value})
  end

  def put_sync(key, value) do
    GenServer.call(__MODULE__, {:put_sync, key, value})
  end

  def get(key) do
    GenServer.call(__MODULE__, {:get, key})
  end

  # Server (Callbacks)

  @impl true
  def init(initial_state) do
    # The init callback initializes the server state
    {:ok, initial_state}
  end

  @impl true
  def handle_call({:get, key}, _from, state) do
    # Synchronously handle the :get request
    value = Map.get(state, key, :not_found)
    {:reply, value, state}
  end

  @impl true
  def handle_call({:put_sync, key, value}, _from, state) do
    # Simulate a time-consuming task
    :timer.sleep(2000)
    new_state = Map.put(state, key, value)
    {:reply, :ok, new_state}
  end

  @impl true
  def handle_cast({:put, key, value}, state) do
    # Simulate a time-consuming task
    :timer.sleep(2000)
    new_state = Map.put(state, key, value)
    {:noreply, new_state}
  end
end

Usage:

# Start the GenServer
{:ok, _pid} = KVStore2.start_link()

# Time the asynchronous `cast` operation
start_time = :os.system_time(:millisecond)
KVStore2.put(:foo, 42)
end_time = :os.system_time(:millisecond)
IO.puts("Asynchronous put took #{end_time - start_time} milliseconds")

# Time the synchronous `call` operation
start_time_sync = :os.system_time(:millisecond)
KVStore2.put_sync(:bar, "hello")
end_time_sync = :os.system_time(:millisecond)
IO.puts("Synchronous put_sync took #{end_time_sync - start_time_sync} milliseconds")

Explanation:

  • Asynchronous cast:
    • The put/2 function uses GenServer.cast/2, which sends a message and returns immediately.
    • The delay in handle_cast/2 doesn’t block the caller.
  • Synchronous call:
    • The put_sync/2 function uses GenServer.call/2, which sends a message and waits for a reply.
    • The delay in handle_call/3 blocks the caller until the operation completes.
  • Timing Results:
    • The asynchronous cast operation completes almost instantly from the caller’s perspective.
    • The synchronous call operation takes at least as long as the delay.

3.3 Practical Examples and Best Practices

Example: Implementing a Rate-Limited Logger GenServer

We’ll create a GenServer that logs messages but limits the rate at which it writes to a file to avoid overwhelming the I/O system.

Defining the GenServer Module:

defmodule RateLimitedLogger do
  use GenServer

  # Client API

  def start_link(log_file) do
    GenServer.start_link(__MODULE__, log_file, name: __MODULE__)
  end

  def log(message) do
    GenServer.cast(__MODULE__, {:log, message})
  end

  # Server (Callbacks)

  @impl true
  def init(log_file) do
    {:ok, %{log_file: log_file, message_queue: [], timer: nil}}
  end

  @impl true
  def handle_cast({:log, message}, state) do
    new_queue = [message | state.message_queue]

    # If there's no timer, set one to flush the messages later
    new_state =
      if state.timer == nil do
        {:ok, timer} = :timer.send_after(1000, :flush)
        %{state | message_queue: new_queue, timer: timer}
      else
        %{state | message_queue: new_queue}
      end

    {:noreply, new_state}
  end

  @impl true
  def handle_info(:flush, state) do
    # Write all messages to the log file
    Enum.each(Enum.reverse(state.message_queue), fn msg ->
      File.write!(state.log_file, "#{msg}\n", [:append])
    end)

    # Reset the message queue and timer
    {:noreply, %{state | message_queue: [], timer: nil}}
  end
end

Best Practices Demonstrated:

  • Avoiding Bottlenecks: By batching log messages and writing them periodically, we reduce I/O overhead.
  • Using handle_info/2: To handle messages that are not call or cast, such as timer messages.
  • State Management: Keeping track of the timer and message queue within the server state.

Usage:

# Start the RateLimitedLogger
{:ok, _pid} = RateLimitedLogger.start_link("log.txt")

# Log multiple messages
RateLimitedLogger.log("Message 1")
RateLimitedLogger.log("Message 2")
RateLimitedLogger.log("Message 3")

# Wait to ensure messages are flushed
:timer.sleep(1500)

# Read and print the log file contents
{:ok, contents} = File.read("log.txt")
IO.puts("Log file contents:\n#{contents}")

Example: Implementing a GenServer with Supervision

To ensure fault tolerance, we’ll set up our GenServer under a Supervisor.

Supervisor Setup:

defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    children = [
      {KVStore, %{}},
      {RateLimitedLogger, "log.txt"}
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Best Practices Demonstrated:

  • Using Supervisors: To automatically restart child processes if they crash.
  • Application Structure: Organizing processes under an OTP application for better management.

Explanation:

  • children List: Contains the worker specifications for the GenServers.
  • strategy: :one_for_one: If a child process terminates, only that process is restarted.
  • Starting the Application: When the application starts, it also starts the Supervisor and its child processes.

Best Practices Summary:

  1. Use start_link/3 Over start/3:

    • start_link/3 creates a linked process, which is important for supervision trees.
  2. Define a Clear Client API:

    • Encapsulate call and cast operations within module functions.
    • Provides a clean interface for interacting with the GenServer.
  3. Handle Unexpected Messages:

    • Implement handle_info/2 to deal with non-standard messages.
    • Prevents the GenServer from crashing due to unhandled messages.
  4. Use Supervisors for Fault Tolerance:

    • Place GenServers under Supervisors to automatically handle crashes.
    • Choose appropriate supervision strategies (:one_for_one, :rest_for_one, etc.).
  5. State Management:

    • Keep the state immutable and pass it through callbacks.
    • Use pattern matching to destructure and update state efficiently.
  6. Avoid Blocking Operations in Callbacks:

    • Offload long-running tasks to separate processes or handle asynchronously.
    • Prevents the GenServer from becoming unresponsive.

Putting It All Together: A GenServer-Based Chat Room

Defining the Chat Room GenServer:

defmodule ChatRoom do
  use GenServer

  # Client API

  def start_link(room_name) do
    GenServer.start_link(__MODULE__, %{name: room_name, users: []}, name: via_tuple(room_name))
  end

  def join(room_name, user_pid) do
    GenServer.cast(via_tuple(room_name), {:join, user_pid})
  end

  def leave(room_name, user_pid) do
    GenServer.cast(via_tuple(room_name), {:leave, user_pid})
  end

  def send_message(room_name, message) do
    GenServer.cast(via_tuple(room_name), {:message, message})
  end

  # Server (Callbacks)

  @impl true
  def init(state) do
    {:ok, state}
  end

  @impl true
  def handle_cast({:join, user_pid}, state) do
    new_state = %{state | users: [user_pid | state.users]}
    {:noreply, new_state}
  end

  @impl true
  def handle_cast({:leave, user_pid}, state) do
    new_state = %{state | users: List.delete(state.users, user_pid)}
    {:noreply, new_state}
  end

  @impl true
  def handle_cast({:message, message}, state) do
    Enum.each(state.users, fn user ->
      send(user, {:new_message, state.name, message})
    end)

    {:noreply, state}
  end

  # Helper function for via tuple
  defp via_tuple(name) do
    {:via, Registry, {:chat_room_registry, name}}
  end
end

Setting Up a Registry for Named Processes:

defmodule MyOtherApp.Application do
  use Application

  def start(_type, _args) do
    children = [
      {Registry, keys: :unique, name: :chat_room_registry}
      # Other workers and supervisors...
    ]

    opts = [strategy: :one_for_one, name: MyOtherApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end
MyOtherApp.Application.start(:foo, nil)

Usage:

# Start the application (usually done automatically)

# Start a chat room
ChatRoom.start_link("lobby")

# Simulate user processes
self_pid = self()

spawn(fn ->
  ChatRoom.join("lobby", self())

  receive do
    {:new_message, room, msg} ->
      IO.puts("[#{room}] Received message: #{msg}")
  end
end)

# Send a message to the chat room
ChatRoom.send_message("lobby", "Hello, world!")

Explanation:

  • Registry Usage:
    • Allows processes to be registered and looked up by name.
    • Enables multiple chat rooms identified by their room names.
  • Process Communication:
    • Users receive messages asynchronously via their process mailbox.
  • Best Practices:
    • Encapsulation of state and logic within the GenServer.
    • Use of via_tuple for dynamic process names.
    • Supervising the Registry and other processes.

Final Thoughts:

By leveraging GenServer and adhering to best practices, we can build robust, maintainable, and fault-tolerant applications in Elixir. The GenServer abstraction simplifies the complexities of process management, allowing developers to focus on implementing business logic.

Key Takeaways:

  • Understand Callbacks:
    • Properly implement init/1, handle_call/3, handle_cast/2, and handle_info/2.
  • Choose Between call and cast:
    • Use call for synchronous operations where a response is needed.
    • Use cast for asynchronous operations where no response is required.
  • Implement Supervision Trees:
    • Organize processes under supervisors for automatic fault recovery.
  • Optimize State Management:
    • Keep state immutable and update it functionally.
  • Handle Errors Gracefully:
    • Use try...catch or pattern matching to handle exceptions within callbacks.
  • Document and Annotate:
    • Use @impl true and module documentation for better code clarity.