¿Pero qué demonios es un GenServer?
Puesta en marcha de este Livebook
- Quizás la forma más sencilla de hacer este cuaderno sea clonando este repositorio y poniendo en marcha Livebook dentro de un contenedor:
docker run -p 8080:8080 -p 8081:8081 --pull always -u $(id -u):$(id -g) -v $(pwd):/data ghcr.io/livebook-dev/livebook:0.14.5
- Entonces abre en un navegador la URL indicada en el arranque del contenedor.
-
Una vez en Livebook, puedes abrir este cuaderno directamente, el fichero
pf_en_elixir.livemd
. - Si no quieres clonar el repo, puedes abrir el cuaderno directamente “desde una URL”, por ejemplo copiando esta misma URL.
Procesos en Elixir: curso de choque
- En elixir los procesos no comparten memoria
- Cada proceso tiene un ID
-
Un proceso puede enviar mensajes a otros procesos conociendo el ID:
send(pid, data)
- El envío es no bloqueante (paso de mensajes asíncrono).
- Cada proceso tiene un buzón de mensajes asociado (cola de mensajes)
-
Un proceso puede sacar el primer mensaje de su buzón:
receive do msg -> <> end
- Por supuesto la recepción es bloqueante: si no hay mensajes en el buzón el proceso espera a que llegue un mensaje.
-
Se puede conocer el PID del proceso con
self()
.
my_livebook = self()
- Un proceso puede enviarse un mensaje a si mismo (¡gracias a la asíncronía!).
send(my_livebook, 42)
-
Podemos ver que la expresión
send
devuelve el mensaje enviado. -
Ahora podemos recibir el mensaje (
42
) e imprimirlo.
receive do
msg -> IO.puts("El mensaje es '#{msg}'")
end
-
El valor de la expresión
receive
es el valor de la última expresión evaluada. -
Enviemonos un
27
ahora:
send(my_livebook, 27)
- Y hagamos esto al recibir:
receive do
msg ->
IO.puts("El mensaje ahora es '#{msg}'")
msg + 1
end
-
Para poner en marcha un proceso usamos la función
spawn
que recibe como argumento otra función sin argumentos que va a ser lo que el nuevo proceso va a ejecutar. -
spawn
devuelve el PID del proceso recien creado.
pid = spawn(fn ->
IO.puts("Soy el proceso #{inspect(self())}")
receive do
x ->
IO.puts("Y he recibido un mensje #{inspect(x)}")
after 30000 ->
IO.puts("Y no he recibido nada en 30 segudos")
end
end)
Process.alive?(pid)
send(pid, :ok)
defmodule Flush do
def flush() do
receive do
x ->
IO.puts(inspect(x))
flush()
after
0 ->
:ok
end
end
end
import Flush
send(my_livebook, 42)
flush()
¿Qué es esto?
-
Lo que sigue es un módulo de Elixir (
Adivina
) con una función que no sabemos qué hace y que se llamaque_hace
. - Por si no lo sabes, en Elixir no hay objetos y sólo hay funciones que cogen datos y devuelven datos.
defmodule Adivina1 do
# ¿Qué hace esta función?
def que_hace(l) do
n = que_hace_aux(l)
que_hace(n)
end
# Función auxiliar
defp que_hace_aux(l) do
receive do
{:a, x} ->
l ++ [x]
{:b, pid} when l != []->
[ x | r] = l
send pid, {:r, x}
r
_ ->
l
end
end
end
import Adivina1
s = spawn(fn -> que_hace([]) end)
defmodule Adivina2 do
def a(p, x) do
send p, {:a, x}
end
def b(p) do
send p, {:b, self()}
receive do
{:r, x} -> {:ok, x}
after 10000 -> :error
end
end
end
import Adivina2
for x <- [2,3,5,7,11] do
a(s,x)
end
b(s)
¿Has visto el patrón?
-
que_hace/1
es un server - Que mantiene un secuencia de datos como estado interno
- Tiene dos operaciones
-
Una (asíncrona) para añadir datos:
send(server, {:a, x})
-
Otra (síncrona) para borrar datos:
send(server, {:b, self()}
y luegoreceive
para esperar el dato borrado -
Todo esto se puede encapsular en funciones:
a(server, dato)
yb(server)
-
Este patrón se puede aplicar a cualquier servicio pero…
- Hay mucho bolierplate
- Es muy fácil meter la pata
CacheServer con GenServer
-
GenServer
es un módulo estándar de Elixir que ya implementa ese patrón -
Para aplicar el patrón hay que definir un módulo con la lógica del server (ej.
CacheServer
) -
Los clientes pueden entonces:
-
Crear un server con esa lógica:
server = GenServer.start_link(CacheServer, initial_state)
-
Enviar peticiones síncronas (call) al servidor:
GenServer.call(server, request, timeout)
-
Enviar peticiones asíncronas (cast) al servidor:
GenServer.cast(server, request)
-
Crear un server con esa lógica:
La lógica del server: callbacks
-
El desarrollador del server tiene que definir en el módulo
CacheServer
las siguientes funciones:-
init/1
: función que devuelve el estado inicial del server -
handle_call/3
: función que dice cómo se trata una petición síncrona -
handle_cast/3
: función que dice cómo se trata una petición asíncrona
-
-
Diseño:
- ¿Cuál es el estado del server?
- ¿Cuáles son las operaciones?
- ¿Qué debe hacer el server como respuesta a cada operación?
-
Operaciones
-
Añadir dato: clave-valor
- Asíncrona
-
Leer dato: a partir de una clave
- Síncrona
- Si la clave no está en la caché el proceso espera
-
Invalidar dato: dada una clave
- Asíncrona
-
Añadir dato: clave-valor
-
Estado
- Un diccionario para los datos
- Un diccionario con lista de procesos esperando cada clave
defmodule CacheServer do
use GenServer
def init(_initial_data) do
# El estado inicial es el diccioanrio de datos y el diccionario de procesos
{:ok, {%{}, %{}}}
end
def handle_cast({:set, k, v}, {data, waiting}) do
new_data = Map.put(data, k, v)
{waiting_for_k, new_waiting} = Map.pop(waiting, k, [])
Enum.each(waiting_for_k, fn pid ->
GenServer.reply(pid, v)
end)
{:noreply, {new_data, new_waiting}}
end
def handle_cast({:del, k}, {data, waiting}) do
new_data = Map.delete(data, k)
{:noreply, {new_data, waiting}}
end
def handle_call({:get, k}, client_pid, {data, waiting}) do
case Map.fetch(data, k) do
{:ok, value} ->
{:reply, value, {data, waiting}}
:error ->
waiting_for_k = Map.get(waiting, k, [])
new_waiting = Map.put(waiting, k, [client_pid | waiting_for_k])
{:noreply, {data, new_waiting}}
end
end
end
{:ok, cache_server} = GenServer.start(CacheServer, nil)
GenServer.cast(cache_server, {:set, :a, 1})
GenServer.call(cache_server, {:get, :a})
¡Llamada bloqueante!
k = :b
GenServer.cast(cache_server, {:del, k})
GenServer.call(cache_server, {:get, k}, :infinity)
¡Desbloqueo!
¡Atentos a la celda anterior!
GenServer.cast(cache_server, {:set, :b, 2})
Público y privado
-
Es muy probable que no queramos que los clientes usen un API tan feo como el de usar directamente las funciones
start
,call
ycast
deGenServer
-
Lo normal es ocultarlo en un módulo público como el siguiente dejando el módulo
CacheServer
como privado (lamentablemente Elixir no permite ocultar un módulo y no es posible hacer privadas las funciones deCacheServer
asi que la ocultación se sostiene por convención)-
defmodule Cache do
def crear() do
GenServer.start(CacheServer, nil)
end
def leer(cache, k) do
GenServer.call(cache, {:get, k}, :infinity)
end
def escribir(cache,k,v) do
GenServer.cast(cache, {:set, k, v})
end
def borrar(cache,k) do
GenServer.cast(cache, {:del, k})
end
end
“Famous last words”
-
Muchos tutoriales y ejemplos de
GenServer
hacen un uso de varios elementos que convierten en un singleton unGenServer
y que generan mucha confusión en el aprendeizaje:-
Elixir permite registar un proceso con un nombre y en concreto un
GenServer
con un nombre usandoGenServer.start(CacheServer, nil, name: nombre)
-
Se puede usar como nombre
CacheServer
-
Se pueden colocar las funciones públicas de
Cache
enCacheServer
-
Hay una macro
__MODULE__
que ese expande al nombre del módulo
-
Elixir permite registar un proceso con un nombre y en concreto un
- Combinando todo esto, por convención de los desarrolladores, es habitual ver una definición como la siguiente:
defmodule CacheSync do
# API Pública
def start() do
GenServer.start(CacheServer, nil, name: __MODULE__)
end
def leer(k) do
GenServer.call(__MODULE__, {:get, k}, :infinity)
end
def escribir(k,v) do
GenServer.cast(__MODULE__, {:set, k, v})
end
def borrar(k) do
GenServer.cast(__MODULE__, {:del, k})
end
# Callbacks de GenServer
use GenServer
def init(_initial_data) do
# El estado inicial es el diccioanrio de datos y el diccionario de procesos
{:ok, {%{}, %{}}}
end
def handle_cast({:set, k, v}, {data, waiting}) do
new_data = Map.put(data, k, v)
{waiting_for_k, new_waiting} = Map.pop(waiting, k, [])
Enum.each(waiting_for_k, fn pid ->
GenServer.reply(pid, v)
end)
{:noreply, {new_data, new_waiting}}
end
def handle_cast({:del, k}, {data, waiting}) do
new_data = Map.delete(data, k)
{:noreply, {new_data, waiting}}
end
def handle_call({:get, k}, client_pid, {data, waiting}) do
case Map.fetch(data, k) do
{:ok, value} ->
{:reply, value, {data, waiting}}
:error ->
waiting_for_k = Map.get(waiting, k, [])
new_waiting = Map.put(waiting, k, [client_pid | waiting_for_k])
{:noreply, {data, new_waiting}}
end
end
end
- Y ahora se puede poner en marcha una única caché (prueba a evaluarlo dos veces para ver cómo falla en el segundo intento)
CacheSync.start
- Supongamos que queremos cachear la url con la imagen de Lingokids:
url = "https://lingokids.com/wp-content/uploads/2023/02/logo-lingo_eng.svg"
:inets.start()
:ssl.start()
{:ok, {_, _, body}} = :httpc.request(url)
content = to_string(body)
CacheSync.escribir(url, content)
- Y ahora queremos rescatarla
CacheSync.leer(url)
¡Llamada bloqueante!
CacheSync.leer("https://babel.upm.es/~angel/")
¡Desbloqueo!
url = "https://babel.upm.es/~angel/"
{:ok, {_, _, body}} = :httpc.request(url)
content = to_string(body)
CacheSync.escribir(url, content)