Powered by AppSignal & Oban Pro

C2N4 – Pour aller plus loin

4-aller-plus-loin.livemd

C2N4 – Pour aller plus loin

Mix.install([
  {:req, "~> 0.5.15"},
  {:kino, "~> 0.16.1"}
])

defmodule TP do
  def trace(trace_target \\ :all, trace_function) do
    Kino.Process.render_seq_trace(
      trace_target,
      trace_function,
      message_label: fn msg ->
        case msg do
          msg when is_binary(msg) -> {:ok, "MSG: #{msg}"}
          _ -> :continue
        end
      end
    )
  end
end

Une nouvelle commande: Task.async_stream

Nous sommes presque au bout de notre exploration des primitives pour la concurrence, bravo !

Il est désormais temps d’aborder une série de concepts plus avancés, à commencer par la fonction Task.async_stream/3 qui permet de lancer des calculs en parallèle.

Commencez par examiner (et exécuter) le code suivant:

inputs = 1..12

slow_cube = fn x ->
  Process.sleep(100 + :rand.uniform(150)) # on simule une attente de durée aléatoire
  x * x * x
end

# TP.trace(fn ->
  inputs
  |> Task.async_stream(slow_cube, max_concurrency: 4, timeout: 2_000)
  |> Enum.to_list()
# end)

Ces quelques lignes ont:

  • lancé une série de calculs (12 en tout) en parallèle
  • récupérer les résultats retournés par tous les processus
  • retourner tous ces résultats dans une liste

Tout cela est équivalent au programme beaucoup plus complexe que nous avions écrit dans les premiers notebooks, avec en plus:

  • une gestion de la limite du nombre de processus enfants s’exécutant en parallèle (option max_concurrency:)
  • une gestion du temps limite d’exécution (option timeout:)

💡 Cette fonction retourne un nouveau type d’objet que nous n’avions pas encore vu: un Stream. Il s’agit d’objet ressemblant à des listes mais dont l’évaluation est “paresseuse”.

Nous ne creuseront pas beaucoup ces notions à ce stade, mais ce que vous devez retenir c’est qu’on peut les passer à une boucle for ou aux fonctions du module Enum de la même façon qu’une liste.

Ainsi, nous pourrions ré-écrire notre exemple ainsi:

    # lance les streams et génère une "stream" de tâches
for tasks <- Task.async_stream(inputs, slow_cube, max_concurrency: 4, timeout: 2_000),
    # les résultats sont retournés sous forme de tuples {:ok, value}, on pattern-match
    {:ok, result} = tasks do
  result
end

Exercice: chargement de requêtes depuis internet

Nous allons mettre à profit les constructions présentées ci-dessus pour charger des informations depuis internet.

urls = [
  "https://swapi.info/api/people/1",
  "https://swapi.info/api/people/2",
  "https://swapi.info/api/people/3",
  "https://swapi.info/api/people/100",
]

# Complétez le code ci-dessous
for url <- urls do
  # 1. lancez des tâches pour télécharger les éléments ci-dessus (avec Req.get!/1)
  # 2. ne gardez que les réponses dont le status est "200" (ce qui signifient qu'elles ont réussi)
  # 3. extrayez le nom du personnage depuis le body
  url
end

Le GenServer

Il est souvent nécessaire de lancer des processus qui attendent et répondent à des messages en Elixir. Cependant, la structure récursive qui boucle sur “receive“ n’est pas très pratique.

Afin d’abstraire ces concepts, Elixir dispose du concept de GenServer, qui sont des modules spécialisés et définis comme suit:

defmodule MonPremierGenserver do
  def init(_) do
    {:ok, []}
  end

  # 🧙‍♂️ Déclenché à une appel GenServer.cast(pid, {:push, val})
  #    le client continue sans attendre de réponse (d'où le :noreply)
  def handle_cast({:push, element}, state) do
    new_state = [element | state]
    {:noreply, new_state}
  end

  # 📞 Déclenché à une appel GenServer.call(pid, {:push, val})
  #    l'appelant arrête son exécution dans l'attente d'une réponse
  def handle_call(:pop, _from, state) do
    [to_caller | new_state] = state
    # to_caller contient la réponse qui sera renvoyée au client
    {:reply, to_caller, new_state}
  end
end

Dans ce module, nous avons défini 3 méthods:

  • une fonction init/1 qui prend les arguments passés au démarrage, et doit retourner un tuple {:ok, state}state est l’”état” du processus (c-à-d. les informations stockées dans le processus)
  • une fonction handle_cast/2 qui recevra les messages envoyés par l’appel GenServer.cast/2. Cette fonction doit renvoyer {:noreply, state}state est l’état du processus, mis à jour en réaction au message reçu
  • une fonction handle_call/3 qui recevra les messages envoyés par l’appel GenServer.call/2. Cette fonction doit renvoyer {:reply, réponse, state}réponse est le message renvoyé à l’appelant et state est l’état du processus, mis à jour en réaction au message reçu
{:ok, stack} = GenServer.start_link(MonPremierGenserver, :ignored_arg)

# On rajoute des éléments à notre tas
GenServer.cast(stack, {:push, 1})
GenServer.cast(stack, {:push, 2})
GenServer.cast(stack, {:push, 3})

# On lit les éléments stockés
IO.puts GenServer.call(stack, :pop)
IO.puts GenServer.call(stack, :pop)
IO.puts GenServer.call(stack, :pop)

# On termine ce processus sans erreur (reason = :normal)
GenServer.stop(stack)

on peut illustrer le fonctionnement global comme suit:

sequenceDiagram
participant Client
participant GS as MonPremierGenserver (GenServer)

Client->>GS: GenServer.start_link(MonPremierGenserver, [])
activate GS
Note right of GS: init(_) -> {:ok, []}
state = [] Client-->>GS: GenServer.cast(stack, {:push, 1}) Note over Client,GS: handle_cast({:push, 1}, state) GS->>GS: state = [1] Client-->>GS: GenServer.cast(stack, {:push, 2}) Note over Client,GS: handle_cast({:push, 2}, state) GS->>GS: state = [2, 1] Client-->>GS: GenServer.cast(stack, {:push, 3}) Note over Client,GS: handle_cast({:push, 3}, state) GS->>GS: state = [3, 2, 1] Client->>+GS: GenServer.call(stack, :pop) Note over Client,GS: handle_call(:pop, _from, state) GS-->>-Client: {:reply, 3} GS->>GS: state = [2, 1] Client->>+GS: GenServer.call(stack, :pop) Note over Client,GS: handle_call(:pop, _from, state) GS-->>-Client: {:reply, 2} GS->>GS: state = [1] Client->>+GS: GenServer.call(stack, :pop) Note over Client,GS: handle_call(:pop, _from, state) GS-->>-Client: {:reply, 1} GS->>GS: state = [] deactivate GS

Définition d’une “interface client”

Vous verrez souvent des modules GenServer définissant définissant des fonctions pour le “client” (le client qui envoie les message au GenServer) et une pour le “serveur” (le process qui exécute le GenServer lui-même).

Voici par ex. une façon de modifier notre exemple précédent:

defmodule MonStack do  
  ###
  # API CLIENT
  ###

  def start_link(initial_state \\ []), do: 
    # Ici __MODULE__ est le nom du module courant, soit "MonStack"
    GenServer.start_link(__MODULE__, initial_state)

  def push(pid, value), do:
    GenServer.cast(pid, {:push, value})

  def pop(pid), do:
    GenServer.call(pid, :pop)

  ###
  # API SERVEUR
  ###
  
  def init(initial_state), do: 
    {:ok, initial_state}

  def handle_cast({:push, element}, state) do
    new_state = [element | state]
    {:noreply, new_state}
  end

  def handle_call(:pop, _from, state) do
    [to_caller | new_state] = state
    {:reply, to_caller, new_state}
  end
end

Ce qui permet d’avoir un code plus expressif:

{:ok, stack} = MonStack.start_link()

MonStack.push(stack, 1)
MonStack.push(stack, 2)
MonStack.push(stack, 3)

IO.puts MonStack.pop(stack)
IO.puts MonStack.pop(stack)
IO.puts MonStack.pop(stack)

Autres messages envoyés à un GenServer

Il est toujours possible d’envoyer des messages directement à un GenServer – c-à-d. en dehors de call/2 et cast/2 – via send.

Ces messages arrivent dans un autre callback: handle_info/2

defmodule CallMe do

  def init(_), do:
    {:ok, :state_not_used}

  def handle_info(msg, state) do
    IO.puts "Message reçu par #{inspect(self())}: #{msg}"
    {:noreply, state}
  end
end
{:ok, pid} = GenServer.start_link(CallMe, nil, 
  # ✨ Nouveauté: on donne un "nom" à notre process qui est démarré
  name: CallMe)
  
TP.trace([self(), pid], fn ->

  # on envoie un message via send(pid, message)
  IO.puts("Envoi du message à #{inspect(pid)}")
  send(pid, "Salut 👋")

  Process.sleep(500) # 👈 attente que tous les messages soient envoyés pour le trace()  
end)

GenServer.stop(CallMe) # 👈 On peut passer le nom aux appels au lieu du pid

Cette façon de faire peut également être utilisée à l’intérieur même du module GenServer, par exemple pour s’appeler lui-même, démo:

defmodule SimpleTicker do
  def init(delay_ms) do
    # S'envoie un message à lui-même après delay_ms millisecondes
    Process.send_after(self(), :tick, delay_ms)
    {:ok, delay_ms}
  end

  def handle_info(:tick, delay_ms) do
    IO.puts "Tick !"
    Process.send_after(self(), :tick, delay_ms)
    {:noreply, delay_ms}
  end
end
{:ok, pid} = GenServer.start_link(SimpleTicker, 500)

Process.sleep(3_000) # 👈 attente de 3 secondes, le temps que le ticker tourne plusieurs fois

GenServer.stop(pid)

La supervision – principes généraux

Nous avons découvert comment les processus Elixir peuvent se monitorer les uns les autres. Ces fonctionnalités permettent définir des stratégies de “supervision”, avec un processus principal qui monitore et relance les processus enfants au fur et à mesure.

Le module Supervisor d’elixir propose une série de fonctions spécifiques pour gérer les cas les plus courants.

💡 Pour que ces processus puissent être lancés, relancés et arrêtés correctment par les fonctions du module Supervisor, ils doivent être définis en tant que GenServers.

Les stratégies pour relancer les processus

En cas d’arrêt des processus fils, ils sont relancés en suivant une des logiques suivantes (caractérisées par leur nom):

  • :one_for_one
  • :one_for_all
  • :rest_for_one

Stratégie :one_for_one

Le superviseur relance le processus enfant qui s’est arrêté:

flowchart LR
  S[Supervisor]:::sup --> A[Child A]
  S --> B[Child B]
  S --> C[Child C]

  classDef sup fill:#eef,stroke:#36c,stroke-width:2px;
  classDef dead fill:#fee,stroke:#c33,stroke-width:2px;

  %% Crash de B
  B -. crash .-> Bx((B redémarré))
  class Bx dead

Stratégie :one_for_all

flowchart LR
  S[Supervisor]:::sup --> A[Child A]
  S --> B[Child B]
  S --> C[Child C]

  classDef sup fill:#eef,stroke:#36c,stroke-width:2px;
  classDef reset fill:#ffd,stroke:#c90,stroke-width:2px;

  B -. crash .-> All((Tous redémarrés))
  class A reset
  class C reset
  class B reset

Stratégie :rest_for_one

flowchart TB
  S[Supervisor]:::sup --> A[Child A]
  S --> B[Child B]
  S --> C[Child C]

  classDef sup fill:#eef,stroke:#36c,stroke-width:2px;
  classDef reset fill:#ffd,stroke:#c90,stroke-width:2px;

  B -. crash .-> R((B et C redémarrés, A conservé))
  class R reset

La supervision – en pratique

Passons à un aspect plus pratique de la supervision. Nous allons écrire un module TStackicker qui est une extension du MonStack défini plus tôt:

les modifications par rapport au code initial sont notés par un symbole 🔍

defmodule Stack do
  
  # API CLIENT

  # 🔍 Au démarrage on passe en argument un "nom d'enregistrement" pour appeler ce GenServer
  def start_link(name),
    do: GenServer.start_link(__MODULE__, [], name: name)

  def push(stack, value), do: GenServer.cast(stack, {:push, value})
  
  def pop(stack), do: GenServer.call(stack, :pop)

  # API SERVEUR

  def init(initial_state), do: {:ok, initial_state}

  def handle_cast({:push, element}, state) do
    new_state = [element | state]
    {:noreply, new_state}
  end

  def handle_call(:pop, _from, state) do
    [to_caller | new_state] = state
    {:reply, to_caller, new_state}
  end

  # 🔍 On ajoute une commande pour "planter" le Stack à la demande
  def handle_info(:boom, state) do
    raise "BOOM"
    {:noreply, state}
  end
end
{:ok, pid} = Supervisor.start_link(
  [
    %{
      id: Stack1, # on démarre 
      start: {Stack, :start_link, [Stack1]}
    },
    %{
      id: Stack2,
      start: {Stack, :start_link, [Stack2]}
    }
  ],
  strategy: :one_for_all
)

# Affichons un graphique représentant l'arborescence de supervision:
Kino.Process.render_sup_tree(pid)

# 👇 début des tests

# Utilisation normale du Stack
IO.inspect GenServer.whereis(Stack1), label: "Stack1 tourne dans le pid"
IO.inspect GenServer.whereis(Stack2), label: "Stack2 tourne dans le pid"

Stack.push(Stack1, 1)
Stack.push(Stack1, 2)
Stack.push(Stack1, 3)

IO.puts Stack.pop(Stack1)
IO.puts Stack.pop(Stack1)
IO.puts Stack.pop(Stack1)

# Maintenant va crasher notre Stack
IO.puts "CRASH du processus Stack1"
send(Stack1, :boom)
Process.sleep(100) # et on laisse du temps au Superviseur pour re-lancer notre Stack

# On peut ré-utiliser le Stack désormais
IO.inspect GenServer.whereis(Stack1), label: "Stack1 tourne dans le pid"
Stack.push(Stack1, 1)
IO.puts Stack.pop(Stack1)

# On peut également appeler Stack2
IO.inspect GenServer.whereis(Stack2), label: "Stack2 tourne dans le pid"
Stack.push(Stack2, 11)
Stack.push(Stack2, 22)

IO.puts Stack.pop(Stack2)
IO.puts Stack.pop(Stack2)



Supervisor.stop(pid)

💡 Si vous changez la stratégie de supervision de :one_for_one vers :one_for_all vous constaterez que le Stack2 est également re-démarré: son PID change.