Powered by AppSignal & Oban Pro

C2N2 – Relation entre les processus

2-process-relations.livemd

C2N2 – Relation entre les processus

Mix.install([
  {:kino, "~> 0.16.1"},
  {:benchee, "~> 1.4"},
  {:kino_benchee, "~> 0.1.0"}
])

Détection de l’arrêt des processus

Jusqu’à maintenant, nous avons considérés des processus complètement indépendants les uns des autres. Si un de ces processus s’arrête - par exemple car sa fonction se termine, ou à cause d’une erreur à l’exécution – aucun autre processus n’est averti de cet arrêt et le bon fonctionnement du programme peut-être compromis: typiquement si un processus “parent” attend une réponse d’un processus “enfant” interrompu qui n’arrivera jamais.

Pour palier ce problème, il est possible de “monitorer” les processus afin d’être averti en cas d’arrêt intempestif:

Process.monitor/1 pour l’écoute du cycle de vie d’un processus

Le module Process dispose de nombreuses fonctions destinées à agir directement sur les processus Elixir: arrêt forcé, création d’alias etc. Nous serons amené à explorer plusieurs de ces fonctionnalités durant ce cours au fur et à mesure que nous approfondirons ces notions.

La fonction Process.monitor/1 dont nous allons parler maintenant a une fonction très particulière: elle permet à un processus d’être informé du moment où le processus cicle s’arrête.

Commençons par un exemple pratique, que nous allons décortiquer:

enfant = spawn(fn -> 
  IO.puts "Message du processus enfant"
end)

# On lance le "monitoring" du processus enfant, par le processus courant (self())
Process.monitor(enfant)

receive do
  msg -> 
    IO.inspect msg, label: "Message reçu par le parent"
end

On peut résumer le fonctionnement observé par la séquence suivante:

  • nous avons lancé le processus enfant (c-à-d. demandé à la VM BEAM de créer un processus et d’y exécuter notre fonction)
  • puis nous avons demandé à ce que le process courant (self()) monitore ce nouveau processus
  • le processus s’est exécuté et à affiché “Message du processus enfant“ dans le Terminal de (la cellule du) Livebook
  • le processus s’est terminé
  • la VM BEAM a informé le processus qui monitorait enfant (donc self) de l’arrêt de ce processus via un message dédié
sequenceDiagram
    autonumber
    participant P as Parent (self)
    participant C as Enfant (spawn)
    participant VM as BEAM/VM

    Note over P: Le parent crée un processus enfant
    P->>C: spawn(fn -> IO.puts("Message du processus enfant") end)
    activate C

    Note over P: Le parent démarre un monitor sur C
    P->>VM: Process.monitor(C)
    VM-->>P: ref = #Reference<...>

    Note right of C: L’enfant s’exécute puis se termine (raison: :normal)
    C-->>C: IO.puts("Message du processus enfant")
    deactivate C

    Note over VM: À la terminaison de C, la VM notifie le parent
    VM-->>P: {:DOWN, ref, :process, pid(C), :normal}

    Note over P: Le parent reçoit et inspecte le message
    P->>P: receive do msg -> IO.inspect(msg) end

Cette séquence se termine donc par l’envoi d’un message, qui s’est affiché lors de l’exécution de la cellule précédente et se décompose comme suit:

observons maintenant ce qu’il se passe si le process s’arrête à cause d’un dysfonctionnement. Pour cela nous allons utiliser la commande raise/1 d’Elixir qui va arrêter le processus courant avec une excpetion (de type RuntimeError par défaut):

enfant = spawn(fn -> 
  IO.puts "Attention je vais lever une exception"
  raise "This is a crash"
end)

Process.monitor(enfant)

receive do
  msg -> msg
end

le message reçu par dans le cadre du monitoring est identique au cas précédent à l’exception du dernier paramètre, la raison de l’arrêt si n’est plus :normal mais un tuple de la forme:

 {%RuntimeError{message: "This is a crash"}, []}

Exercice – Calcul distributé tolérant aux pannes

Dans cet exercice, je vous donne un squelette de programme simple: il s’agit d’un programme qui calcule la somme des cubes de valeurs contenues dans des listes passées en argument.

prenez le temps de lire et comprendre ce code avant de passer à la suite du TP !

defmodule TP2Exercice1 do
  # Calcule la somme des cubes des sous-listes
  def sum_all(list_of_lists) do
    worker_pids =
      for list <- list_of_lists do
        # lance les sous-âches
        spawn(TP2Exercice1, :sum_cubes_in_list, [self(), list])
      end

    for pid <- worker_pids do
      Process.monitor(pid)
    end

    # attend le résultat de toutes les tâches, et additionne le tout
    receive_results(worker_pids, 0)
  end

  # Calcule la somme des cubes dans une des listes
  def sum_cubes_in_list(master_pid, list) do
    # Comme des cubes dans la liste
    partial_sum =
      list
      |> Enum.map(fn x -> x ** 3 end)
      |> Enum.sum()

    # renvoie le résultat au parent
    send(master_pid, {self(), partial_sum})
  end

  # Plus aucun worker en attente: on retourne le résultat final
  defp receive_results([], sum), do: sum

  # Mise en attente de réponse des workers
  defp receive_results(worker_pids, sum) do
    receive do
      {worker_pid, partial_sum} ->
        # on boucle:
        receive_results(
          # en enlevant le PID qui a répondu de la liste
          List.delete(worker_pids, worker_pid),
          # et en ajoutant la somme partielle
          sum + partial_sum
        )          
    end
  end
end
petite_liste_de_listes = [
  [ 1, 2, 3, 4, 5],
  [ 11, 12, 13 ],
  [ 155 ]
]

TP2Exercice1.sum_all(petite_liste_de_listes)

Par contre si on provoque un crash, par exemple en insérant dans notre liste d’entrée une valeur qui n’est pas un nombre, plus rien ne fonctionne:

⚠️ ATTENTION la fonction ne retournant “jamais” votre cellule va se retrouvée “bloquée”. Il faudra arrêter l’exécution avec le petit bouton “stop” en haut à gauche de la cellule en question:

Une fois que vous aurez fait votre essai commentez la ligne d’appel de la fonction pour ne pas bloquer le reste de la section.

Essayez sur la cellule suivante:

liste_invalide = [
  [ 1, 2, 3, 4, 5],
  [ 11, 12, 13 ],
  [ 155, :pas_un_nombre ]
]

# 🛑 COMMENTEZ CETTE LIGNE TANT QUE VOUS N'AVEZ PAS RÉSOLU LE SOUCI
# TP2Exercice1.sum_all(liste_invalide)

Première “solution” simple: ajouter un “timeout”

Jusqu’à présent, nous avons utiliser receive/1 dans sa forme la plus simple, qui attend indéfiniment une réponse du serveur.

Il est cependant possible d’ajouter une clause after à notre bloc doend pour décider ce qui doit se passer lorsqu’aucune clause ne “match” les messages attendus pendant un temps déterminé.

Sur un exemple simple:

receive do
  # 👇 ce message n'arrivera jamais, vu qu'on ne va rien envoyer...
  {:message, msg} -> msg
after
  5000 ->
    IO.puts "Nous avons attendu 5 secondes sans recevoir de réponse"
end

Notez que cette fonctionnalité peut également être utilisée pour tester si l’on a des messages en attente sans bloquer le fonctionnement, par ex.:

# envoyons-nous un message 💌
send(self(), {:message, "Coucou moi-même"})

# Le message étant stocké dans la "mailbox" de notre processus courant, nous pouvons le lire
# avec receive/1:
receive do
  {:message, msg} -> IO.puts("message dans la boite: #{msg}")
after
  0 -> IO.puts("Aucun message dans la boite")
end

# Notre mailbox est désormais vide, mais grâce au "after 0" l'appel suivant ne va pas bloquer
# notre programme:
receive do
  {:message, msg} -> IO.puts("message dans la boite: #{msg}")
after
  0 -> IO.puts("Aucun message dans la boite")
end

Corrigez le code de TP2Exercice1 pour que le programme abandonne le calcul après quelques secondes, sans bloquer l’exécution indéfiniment.

(vous pouvez décommenter la ligne d’appel à TP2Exercice1.sum_all/1 dans la cellule de test)

Une meilleure solution: détecter les erreurs avec Process.monitor/1

Compter un “timeout” pour détecter le plantage des sous-processus n’est pas une bonne solution à long terme: par ex. si l’on souhaite effectuer un plus gros calcul, on pourrait arrêter avant la fin alors que le calcul est encore en cours !

Une meilleure solution sera donc d’utiliser un appel à Process.monitor/1 pour être informé en cas de crash d’un de nos “processus workers”.

Modifiez le code du TP pour utiliser le monitoring des processus et gérer correctement le défaut d’un des processus-fils

Perfomance du calcul distribué

Voici un code de départ, exposant deux fonctions:

  • la fonction sum_all_parallel qui reprend le code précédent, légèrement modifié
  • la fonction sum_all_sync qui effectue le même calcul, sans avoir recours à des sous-processus
defmodule TP2Exercice2 do
  def sum_all_sync(list_of_lists) do
    list_of_lists
    |> Enum.map(&amp;sum_cubes_in_list/1)
    |> Enum.sum()
  end

  def sum_all_parallel(list_of_lists) do
    master_pid = self()

    for list <- list_of_lists do
      spawn(fn ->
        partial_result = sum_cubes_in_list(list)
        send(master_pid, {self(), partial_result})
      end)
    end
    |> receive_results()
  end

  # Calcule la som
  def sum_cubes_in_list(list) do
    list
    |> Enum.map(fn x -> x ** 3 end)
    |> Enum.sum()
  end

  # 💡 Nouveauté : déclaration d'une valeur par défaut pour le paramètre "sum"
  defp receive_results(worker_pids, sum \\ 0)

  defp receive_results([], sum), do: sum

  defp receive_results(worker_pids, sum) do
    receive do
      {worker_pid, partial_sum} ->
        receive_results(
          List.delete(worker_pids, worker_pid),
          sum + partial_sum
        )
    end
  end
end

Nous allons maintenant utiliser la bibliothèque Benchee et plus particulièrement sa fonction Benchee.run/2 pour comparer les performances de ces deux implémentations sur des valeurs de test de petite taille.

  • explorez ces résultats, arrivez-vous à les comprendre ?
  • que constatez-vous ?
petite_liste = [
  [1, 2, 3, 4, 5],
  [11, 12, 13],
  [155]
]

Benchee.run(
  %{
    "Version synchrone": &amp;TP2Exercice2.sum_all_sync/1,
    "Version concurrente": &amp;TP2Exercice2.sum_all_parallel/1
  },
  inputs: %{
    "petite liste" => petite_liste,
  }
)

Ce n’est pas très concluant: malgré tous nos efforts, la solution parallèle semble plus lente que la simple version synchrone.

Mais de désespérons pas, il s’agit peut-être d’un souci de taille de l’échantillon considéré ! Pour en avoir le coeur net, nous allons re-tester notre code sur une liste de plus grande taille.

Générer un tableau de un million d’entrée, en créant une liste de 1000 listes de 1000 nombres aléatoires

Vous pouvez utiliser pour cela:

exemple:

for x <- 1..5 do
  x
end
[ 1, 2, 3, 4, 5 ]

à vous de jouer:

# À COMPLÉTER
grande_liste = []

Benchee.run(
  %{
    "Version synchrone": &amp;TP2Exercice2.sum_all_sync/1,
    "Version concurrente": &amp;TP2Exercice2.sum_all_parallel/1,
  },
  inputs: %{
    "grande liste" => grande_liste
  }
)

Processus liés

Il reste un cas de figure que nous n’avons pas encore envisagé, et qui s’illustre facilement avec l’exemple suivant:

spawn(fn ->
  # attente de 5 secondes
  Process.sleep(5000)
  
  IO.puts "hello"
end)

raise "Le processus parent est crashé"

Vous pourrez observer le comportement suivant:

Malgré le crash du processus parent, le processus enfant a continué son exécution. Ce comportement est tout à fait normal, les processus étant indépendants.

L’exécution peut s’illustrer comme suit:

sequenceDiagram
    autonumber
    participant P as Processus parent
    participant C as Processus enfant (non lié)
    participant IO as Console

    P->>C: spawn(...)
    activate C
    Note right of C: ⏳ L’enfant dort 5 s

    P -x P: raise "Crash"  %% auto-terminaison du parent
    Note over P: Parent crashé

    C->>IO: "hello"
    deactivate C
    Note over C: Pas de lien → l’enfant continue
    

Pourtant, dans de nombreux cas, on ne souhaitera pas continuer l’exécution des processus enfants dans le cas où le processus principal a crashé.

La notion de processus liés va nous donner une solution simple à cette problématique:

parent = spawn(fn ->
  spawn_link(fn ->
    # attente de 5 secondes
    Process.sleep(5000)

    IO.puts("hello")
  end)

  Process.sleep(1000)
  raise "Déclenchement d'une RuntimeError"
end)

# on attend le message de "crash"
monitor_ref = Process.monitor(parent)
receive do
  {:DOWN, ^monitor_ref, :process, ^parent, {reason, _stack}} ->
    IO.inspect reason, label: "Le process parent a crashé avec"
end

Cette fois, l’enfant s’est arrêté automatiquement. Le “lien” entre l’enfant et le parent a permis de faire suivre le message de crash du parent au processus enfant:

sequenceDiagram
    autonumber
    participant M as Moniteur (process courant)
    participant P as Parent
    participant C as Enfant (lié)
    participant IO as Console

    M->>P: parent = spawn(...)
    activate P

    P->>C: spawn_link(...)
    activate C
    Note right of C: ⏳ L’enfant dort 5 s

    P -x P: raise "RuntimeError"  %% crash du parent
    Note over P,C: Lien actif → le crash se propage

    C -x C: Exit (propagation du lien)  %% enfant termine aussi

    M->>M: monitor_ref = Process.monitor(parent)
    Note right of M: Reçoit {:DOWN, ^ref, :process, ^parent, {reason, _}}

Pour créer ce lien, nous avons simplement remplacé l’appel à spawn/1 par spawn_link/ qui lie les process parents et enfant.

Autres considérations

  • il existe des fonctions Process.link/1 et Process.unlink/1 qui permettent de liér / dé-lier des processus déjà lancés
  • par défaut, le lien est bi-directionnel: si un des processus enfants s’arrête sur une erreur, le processus parent sera également impacté !
    Cela peut être souhaité, mais si l’on souhaite avoir plus de contrôle – par ex. un processus parent qui ne s’arrête pas mais puisse traiter les crash de ses processus enfants – il faudra re-configurer le process parent avec Process.flag(:trap_exit, true) ce qui va convertir l’évènement de “demande d’arrêt de processus” en un message reçu dans la mailbox.
    Cette notion HPC ne sera pas approfondie dans ce cours: nous découvrirons d’autres constructions pour organiser nos programmes avec une relation hiérarchique entre processus parents-enfants dans les notebooks à venir

Exercices supplémentaires

Les petits exercices suivants sont optionnels et uniquement là dans le but de pratiquer la programmation Elixir, si vous avez le temps.

Attente de la fin de “n” processus enfants

Dans l’exercice suivant, vous devrez lancer

  • lancer des processus enfants
  • monitorer ces processus
  • creer une fonction d’attente qui va attendre que tous les processus soient arrêtés

Vous pouvez rpartir du code suivant

defmodule TP2ExerciceOpt1 do
  def run do
    parent = self()

    # On lance 3 enfants avec des délais différents
    delays = [{1, 200}, {2, 400}, {3, 150}]

    # TODO: démarrer les process enfants et retourner leurs PIDs
    #    les processus enfants:
    #    - attendens le temps (ms) passé en argument
    #    - affichent "Enfant  terminé" à la fin de l'exécutino
    pids = [] 

    # TODO: monitorer tous les processus

    # TODO: lancer une boucle d'attente, qui s'arrête quand length(delays) 
    #       process enfants ont répondu
  end
end

Pour tester votre implémentation:

TP2ExerciceOpt1.run()

💡 Vous pouvez également modifier le code ci-dessous pour qu’il prenne en argument une liste de delays et crée un nombre variable de process en conséquence, par ex.:

TP2ExerciceOpt1.run([200, 400, 150]) # équivalent à la version précédente