Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

¿Pero qué demonios es un GenServer?

demonios_de_genserver.livemd

¿Pero qué demonios es un GenServer?

Puesta en marcha de este Livebook

  • Quizás la forma más sencilla de hacer este cuaderno sea clonando este repositorio y poniendo en marcha Livebook dentro de un contenedor:
docker run -p 8080:8080 -p 8081:8081 --pull always -u $(id -u):$(id -g) -v $(pwd):/data ghcr.io/livebook-dev/livebook:0.14.5
  • Entonces abre en un navegador la URL indicada en el arranque del contenedor.
  • Una vez en Livebook, puedes abrir este cuaderno directamente, el fichero pf_en_elixir.livemd.
  • Si no quieres clonar el repo, puedes abrir el cuaderno directamente “desde una URL”, por ejemplo copiando esta misma URL.

Procesos en Elixir: curso de choque

  • En elixir los procesos no comparten memoria
  • Cada proceso tiene un ID
  • Un proceso puede enviar mensajes a otros procesos conociendo el ID: send(pid, data)
  • El envío es no bloqueante (paso de mensajes asíncrono).
  • Cada proceso tiene un buzón de mensajes asociado (cola de mensajes)
  • Un proceso puede sacar el primer mensaje de su buzón: receive do msg -> <> end
  • Por supuesto la recepción es bloqueante: si no hay mensajes en el buzón el proceso espera a que llegue un mensaje.

  • Se puede conocer el PID del proceso con self().
my_livebook = self()
  • Un proceso puede enviarse un mensaje a si mismo (¡gracias a la asíncronía!).
send(my_livebook, 42)
  • Podemos ver que la expresión send devuelve el mensaje enviado.
  • Ahora podemos recibir el mensaje (42) e imprimirlo.
receive do
  msg -> IO.puts("El mensaje es '#{msg}'")
end
  • El valor de la expresión receive es el valor de la última expresión evaluada.
  • Enviemonos un 27 ahora:
send(my_livebook, 27)
  • Y hagamos esto al recibir:
receive do
  msg ->
    IO.puts("El mensaje ahora es '#{msg}'")
    msg + 1
end
  • Para poner en marcha un proceso usamos la función spawn que recibe como argumento otra función sin argumentos que va a ser lo que el nuevo proceso va a ejecutar.
  • spawn devuelve el PID del proceso recien creado.
pid = spawn(fn ->
  IO.puts("Soy el proceso #{inspect(self())}")
  receive do
    x -> 
      IO.puts("Y he recibido un mensje #{inspect(x)}")
    after 30000 -> 
      IO.puts("Y no he recibido nada en 30 segudos")
  end
end)
Process.alive?(pid)
send(pid, :ok)
defmodule Flush do
  def flush() do
    receive do
      x ->
        IO.puts(inspect(x))
        flush()
    after
      0 ->
        :ok
    end
  end
end
import Flush
send(my_livebook, 42)
flush()

¿Qué es esto?

  • Lo que sigue es un módulo de Elixir (Adivina) con una función que no sabemos qué hace y que se llama que_hace.
  • Por si no lo sabes, en Elixir no hay objetos y sólo hay funciones que cogen datos y devuelven datos.
defmodule Adivina1 do
  # ¿Qué hace esta función?
  def que_hace(l) do
    n = que_hace_aux(l)
    que_hace(n)
  end

  # Función auxiliar
  defp que_hace_aux(l) do
    receive do
      {:a, x} ->
        l ++ [x]
      {:b, pid} when l != []->
        [ x | r] = l
        send pid, {:r, x}
        r
      _ ->
        l
    end    
  end
end
import Adivina1
s = spawn(fn -> que_hace([]) end)
defmodule Adivina2 do
  def a(p, x) do
    send p, {:a, x}
  end
  
  def b(p) do
    send p, {:b, self()}
    receive do
      {:r, x} -> {:ok, x}
      after 10000 -> :error
    end
  end
end
import Adivina2
for x <- [2,3,5,7,11] do
  a(s,x)
end
b(s)

¿Has visto el patrón?

  • que_hace/1 es un server
  • Que mantiene un secuencia de datos como estado interno
  • Tiene dos operaciones
  • Una (asíncrona) para añadir datos: send(server, {:a, x})
  • Otra (síncrona) para borrar datos: send(server, {:b, self()} y luego receive para esperar el dato borrado
  • Todo esto se puede encapsular en funciones: a(server, dato) y b(server)
  • Este patrón se puede aplicar a cualquier servicio pero…
    • Hay mucho bolierplate
    • Es muy fácil meter la pata

CacheServer con GenServer

  • GenServer es un módulo estándar de Elixir que ya implementa ese patrón
  • Para aplicar el patrón hay que definir un módulo con la lógica del server (ej. CacheServer)
  • Los clientes pueden entonces:
    1. Crear un server con esa lógica: server = GenServer.start_link(CacheServer, initial_state)
    2. Enviar peticiones síncronas (call) al servidor: GenServer.call(server, request, timeout)
    3. Enviar peticiones asíncronas (cast) al servidor: GenServer.cast(server, request)

La lógica del server: callbacks

  • El desarrollador del server tiene que definir en el módulo CacheServer las siguientes funciones:
    1. init/1: función que devuelve el estado inicial del server
    2. handle_call/3: función que dice cómo se trata una petición síncrona
    3. handle_cast/3: función que dice cómo se trata una petición asíncrona
  • Diseño:
    • ¿Cuál es el estado del server?
    • ¿Cuáles son las operaciones?
    • ¿Qué debe hacer el server como respuesta a cada operación?
  • Operaciones
    • Añadir dato: clave-valor
      • Asíncrona
    • Leer dato: a partir de una clave
      • Síncrona
      • Si la clave no está en la caché el proceso espera
    • Invalidar dato: dada una clave
      • Asíncrona
  • Estado
    • Un diccionario para los datos
    • Un diccionario con lista de procesos esperando cada clave
defmodule CacheServer do
  use GenServer

  def init(_initial_data) do
    # El estado inicial es el diccioanrio de datos y el diccionario de procesos
    {:ok, {%{}, %{}}}
  end

  def handle_cast({:set, k, v}, {data, waiting}) do
    new_data = Map.put(data, k, v)
    {waiting_for_k, new_waiting} = Map.pop(waiting, k, [])
    Enum.each(waiting_for_k, fn pid ->
      GenServer.reply(pid, v)
    end)
    {:noreply, {new_data, new_waiting}}
  end

  def handle_cast({:del, k}, {data, waiting}) do
    new_data = Map.delete(data, k)
    {:noreply, {new_data, waiting}}
  end

  def handle_call({:get, k}, client_pid, {data, waiting}) do
    case Map.fetch(data, k) do
      {:ok, value} ->
        {:reply, value, {data, waiting}}
      :error ->
        waiting_for_k = Map.get(waiting, k, [])
        new_waiting = Map.put(waiting, k, [client_pid | waiting_for_k])
        {:noreply, {data, new_waiting}}
    end
  end
end
{:ok, cache_server} = GenServer.start(CacheServer, nil)
GenServer.cast(cache_server, {:set, :a, 1})
GenServer.call(cache_server, {:get, :a})

¡Llamada bloqueante!

k = :b
GenServer.cast(cache_server, {:del, k})
GenServer.call(cache_server, {:get, k}, :infinity)

¡Desbloqueo!

¡Atentos a la celda anterior!

GenServer.cast(cache_server, {:set, :b, 2})

Público y privado

  • Es muy probable que no queramos que los clientes usen un API tan feo como el de usar directamente las funciones start, call y cast de GenServer
  • Lo normal es ocultarlo en un módulo público como el siguiente dejando el módulo CacheServer como privado (lamentablemente Elixir no permite ocultar un módulo y no es posible hacer privadas las funciones de CacheServer asi que la ocultación se sostiene por convención)-
defmodule Cache do
  def crear() do
    GenServer.start(CacheServer, nil)
  end

  def leer(cache, k) do
    GenServer.call(cache, {:get, k}, :infinity)
  end

  def escribir(cache,k,v) do
    GenServer.cast(cache, {:set, k, v})
  end

  def borrar(cache,k) do
    GenServer.cast(cache, {:del, k})
  end
end

“Famous last words”

  • Muchos tutoriales y ejemplos de GenServer hacen un uso de varios elementos que convierten en un singleton un GenServer y que generan mucha confusión en el aprendeizaje:
    1. Elixir permite registar un proceso con un nombre y en concreto un GenServercon un nombre usando GenServer.start(CacheServer, nil, name: nombre)
    2. Se puede usar como nombre CacheServer
    3. Se pueden colocar las funciones públicas de Cache en CacheServer
    4. Hay una macro __MODULE__ que ese expande al nombre del módulo
  • Combinando todo esto, por convención de los desarrolladores, es habitual ver una definición como la siguiente:
defmodule CacheSync do
  # API Pública
  def start() do
    GenServer.start(CacheServer, nil, name: __MODULE__)
  end

  def leer(k) do
    GenServer.call(__MODULE__, {:get, k}, :infinity)
  end

  def escribir(k,v) do
    GenServer.cast(__MODULE__, {:set, k, v})
  end

  def borrar(k) do
    GenServer.cast(__MODULE__, {:del, k})
  end

  # Callbacks de GenServer
  use GenServer

  def init(_initial_data) do
    # El estado inicial es el diccioanrio de datos y el diccionario de procesos
    {:ok, {%{}, %{}}}
  end

  def handle_cast({:set, k, v}, {data, waiting}) do
    new_data = Map.put(data, k, v)
    {waiting_for_k, new_waiting} = Map.pop(waiting, k, [])
    Enum.each(waiting_for_k, fn pid ->
      GenServer.reply(pid, v)
    end)
    {:noreply, {new_data, new_waiting}}
  end

  def handle_cast({:del, k}, {data, waiting}) do
    new_data = Map.delete(data, k)
    {:noreply, {new_data, waiting}}
  end

  def handle_call({:get, k}, client_pid, {data, waiting}) do
    case Map.fetch(data, k) do
      {:ok, value} ->
        {:reply, value, {data, waiting}}
      :error ->
        waiting_for_k = Map.get(waiting, k, [])
        new_waiting = Map.put(waiting, k, [client_pid | waiting_for_k])
        {:noreply, {data, new_waiting}}
    end
  end
end
  • Y ahora se puede poner en marcha una única caché (prueba a evaluarlo dos veces para ver cómo falla en el segundo intento)
CacheSync.start
  • Supongamos que queremos cachear la url con la imagen de Lingokids:
url = "https://lingokids.com/wp-content/uploads/2023/02/logo-lingo_eng.svg"
:inets.start()
:ssl.start()
{:ok, {_, _, body}} = :httpc.request(url)
content = to_string(body) 
CacheSync.escribir(url, content)
  • Y ahora queremos rescatarla
CacheSync.leer(url)

¡Llamada bloqueante!

CacheSync.leer("https://babel.upm.es/~angel/")

¡Desbloqueo!

url = "https://babel.upm.es/~angel/"
{:ok, {_, _, body}} = :httpc.request(url)
content = to_string(body) 
CacheSync.escribir(url, content)