C2N2 – Relation entre les processus
Mix.install([
{:kino, "~> 0.16.1"},
{:benchee, "~> 1.4"},
{:kino_benchee, "~> 0.1.0"}
])
Détection de l’arrêt des processus
Jusqu’à maintenant, nous avons considérés des processus complètement indépendants les uns des autres. Si un de ces processus s’arrête - par exemple car sa fonction se termine, ou à cause d’une erreur à l’exécution – aucun autre processus n’est averti de cet arrêt et le bon fonctionnement du programme peut-être compromis: typiquement si un processus “parent” attend une réponse d’un processus “enfant” interrompu qui n’arrivera jamais.
Pour palier ce problème, il est possible de “monitorer” les processus afin d’être averti en cas d’arrêt intempestif:
Process.monitor/1
pour l’écoute du cycle de vie d’un processus
Le module Process
dispose de nombreuses fonctions destinées à agir directement sur les processus Elixir: arrêt forcé, création d’alias etc. Nous serons amené à explorer plusieurs de ces fonctionnalités durant ce cours au fur et à mesure que nous approfondirons ces notions.
La fonction Process.monitor/1
dont nous allons parler maintenant a une fonction très particulière: elle permet à un processus d’être informé du moment où le processus cicle s’arrête.
Commençons par un exemple pratique, que nous allons décortiquer:
enfant = spawn(fn ->
IO.puts "Message du processus enfant"
end)
# On lance le "monitoring" du processus enfant, par le processus courant (self())
Process.monitor(enfant)
receive do
msg ->
IO.inspect msg, label: "Message reçu par le parent"
end
On peut résumer le fonctionnement observé par la séquence suivante:
- nous avons lancé le processus enfant (c-à-d. demandé à la VM BEAM de créer un processus et d’y exécuter notre fonction)
- puis nous avons demandé à ce que le process courant (self()) monitore ce nouveau processus
-
le processus s’est exécuté et à affiché “
Message du processus enfant
“ dans le Terminal de (la cellule du) Livebook - le processus s’est terminé
-
la VM BEAM a informé le processus qui monitorait
enfant
(doncself
) de l’arrêt de ce processus via un message dédié
sequenceDiagram
autonumber
participant P as Parent (self)
participant C as Enfant (spawn)
participant VM as BEAM/VM
Note over P: Le parent crée un processus enfant
P->>C: spawn(fn -> IO.puts("Message du processus enfant") end)
activate C
Note over P: Le parent démarre un monitor sur C
P->>VM: Process.monitor(C)
VM-->>P: ref = #Reference<...>
Note right of C: L’enfant s’exécute puis se termine (raison: :normal)
C-->>C: IO.puts("Message du processus enfant")
deactivate C
Note over VM: À la terminaison de C, la VM notifie le parent
VM-->>P: {:DOWN, ref, :process, pid(C), :normal}
Note over P: Le parent reçoit et inspecte le message
P->>P: receive do msg -> IO.inspect(msg) end
Cette séquence se termine donc par l’envoi d’un message, qui s’est affiché lors de l’exécution de la cellule précédente et se décompose comme suit:
observons maintenant ce qu’il se passe si le process s’arrête à cause d’un dysfonctionnement. Pour cela nous allons utiliser la commande raise/1
d’Elixir qui va arrêter le processus courant avec une excpetion (de type RuntimeError
par défaut):
enfant = spawn(fn ->
IO.puts "Attention je vais lever une exception"
raise "This is a crash"
end)
Process.monitor(enfant)
receive do
msg -> msg
end
le message reçu par dans le cadre du monitoring est identique au cas précédent à l’exception du dernier paramètre, la raison de l’arrêt si n’est plus :normal
mais un tuple de la forme:
{%RuntimeError{message: "This is a crash"}, []}
Exercice – Calcul distributé tolérant aux pannes
Dans cet exercice, je vous donne un squelette de programme simple: il s’agit d’un programme qui calcule la somme des cubes de valeurs contenues dans des listes passées en argument.
prenez le temps de lire et comprendre ce code avant de passer à la suite du TP !
defmodule TP2Exercice1 do
# Calcule la somme des cubes des sous-listes
def sum_all(list_of_lists) do
worker_pids =
for list <- list_of_lists do
# lance les sous-âches
spawn(TP2Exercice1, :sum_cubes_in_list, [self(), list])
end
for pid <- worker_pids do
Process.monitor(pid)
end
# attend le résultat de toutes les tâches, et additionne le tout
receive_results(worker_pids, 0)
end
# Calcule la somme des cubes dans une des listes
def sum_cubes_in_list(master_pid, list) do
# Comme des cubes dans la liste
partial_sum =
list
|> Enum.map(fn x -> x ** 3 end)
|> Enum.sum()
# renvoie le résultat au parent
send(master_pid, {self(), partial_sum})
end
# Plus aucun worker en attente: on retourne le résultat final
defp receive_results([], sum), do: sum
# Mise en attente de réponse des workers
defp receive_results(worker_pids, sum) do
receive do
{worker_pid, partial_sum} ->
# on boucle:
receive_results(
# en enlevant le PID qui a répondu de la liste
List.delete(worker_pids, worker_pid),
# et en ajoutant la somme partielle
sum + partial_sum
)
end
end
end
petite_liste_de_listes = [
[ 1, 2, 3, 4, 5],
[ 11, 12, 13 ],
[ 155 ]
]
TP2Exercice1.sum_all(petite_liste_de_listes)
Par contre si on provoque un crash, par exemple en insérant dans notre liste d’entrée une valeur qui n’est pas un nombre, plus rien ne fonctionne:
⚠️ ATTENTION la fonction ne retournant “jamais” votre cellule va se retrouvée “bloquée”. Il faudra arrêter l’exécution avec le petit bouton “stop” en haut à gauche de la cellule en question:
Une fois que vous aurez fait votre essai commentez la ligne d’appel de la fonction pour ne pas bloquer le reste de la section.
Essayez sur la cellule suivante:
liste_invalide = [
[ 1, 2, 3, 4, 5],
[ 11, 12, 13 ],
[ 155, :pas_un_nombre ]
]
# 🛑 COMMENTEZ CETTE LIGNE TANT QUE VOUS N'AVEZ PAS RÉSOLU LE SOUCI
# TP2Exercice1.sum_all(liste_invalide)
Première “solution” simple: ajouter un “timeout”
Jusqu’à présent, nous avons utiliser receive/1
dans sa forme la plus simple, qui attend indéfiniment une réponse du serveur.
Il est cependant possible d’ajouter une clause after
à notre bloc do
…end
pour décider ce qui doit se passer lorsqu’aucune clause ne “match” les messages attendus pendant un temps déterminé.
Sur un exemple simple:
receive do
# 👇 ce message n'arrivera jamais, vu qu'on ne va rien envoyer...
{:message, msg} -> msg
after
5000 ->
IO.puts "Nous avons attendu 5 secondes sans recevoir de réponse"
end
Notez que cette fonctionnalité peut également être utilisée pour tester si l’on a des messages en attente sans bloquer le fonctionnement, par ex.:
# envoyons-nous un message 💌
send(self(), {:message, "Coucou moi-même"})
# Le message étant stocké dans la "mailbox" de notre processus courant, nous pouvons le lire
# avec receive/1:
receive do
{:message, msg} -> IO.puts("message dans la boite: #{msg}")
after
0 -> IO.puts("Aucun message dans la boite")
end
# Notre mailbox est désormais vide, mais grâce au "after 0" l'appel suivant ne va pas bloquer
# notre programme:
receive do
{:message, msg} -> IO.puts("message dans la boite: #{msg}")
after
0 -> IO.puts("Aucun message dans la boite")
end
Corrigez le code de TP2Exercice1
pour que le programme abandonne le calcul après quelques secondes, sans bloquer l’exécution indéfiniment.
(vous pouvez décommenter la ligne d’appel à TP2Exercice1.sum_all/1
dans la cellule de test)
Une meilleure solution: détecter les erreurs avec Process.monitor/1
Compter un “timeout” pour détecter le plantage des sous-processus n’est pas une bonne solution à long terme: par ex. si l’on souhaite effectuer un plus gros calcul, on pourrait arrêter avant la fin alors que le calcul est encore en cours !
Une meilleure solution sera donc d’utiliser un appel à Process.monitor/1
pour être informé en cas de crash d’un de nos “processus workers”.
Modifiez le code du TP pour utiliser le monitoring des processus et gérer correctement le défaut d’un des processus-fils
Perfomance du calcul distribué
Voici un code de départ, exposant deux fonctions:
-
la fonction
sum_all_parallel
qui reprend le code précédent, légèrement modifié -
la fonction
sum_all_sync
qui effectue le même calcul, sans avoir recours à des sous-processus
defmodule TP2Exercice2 do
def sum_all_sync(list_of_lists) do
list_of_lists
|> Enum.map(&sum_cubes_in_list/1)
|> Enum.sum()
end
def sum_all_parallel(list_of_lists) do
master_pid = self()
for list <- list_of_lists do
spawn(fn ->
partial_result = sum_cubes_in_list(list)
send(master_pid, {self(), partial_result})
end)
end
|> receive_results()
end
# Calcule la som
def sum_cubes_in_list(list) do
list
|> Enum.map(fn x -> x ** 3 end)
|> Enum.sum()
end
# 💡 Nouveauté : déclaration d'une valeur par défaut pour le paramètre "sum"
defp receive_results(worker_pids, sum \\ 0)
defp receive_results([], sum), do: sum
defp receive_results(worker_pids, sum) do
receive do
{worker_pid, partial_sum} ->
receive_results(
List.delete(worker_pids, worker_pid),
sum + partial_sum
)
end
end
end
Nous allons maintenant utiliser la bibliothèque Benchee
et plus particulièrement sa fonction Benchee.run/2
pour comparer les performances de ces deux implémentations sur des valeurs de test de petite taille.
- explorez ces résultats, arrivez-vous à les comprendre ?
- que constatez-vous ?
petite_liste = [
[1, 2, 3, 4, 5],
[11, 12, 13],
[155]
]
Benchee.run(
%{
"Version synchrone": &TP2Exercice2.sum_all_sync/1,
"Version concurrente": &TP2Exercice2.sum_all_parallel/1
},
inputs: %{
"petite liste" => petite_liste,
}
)
Ce n’est pas très concluant: malgré tous nos efforts, la solution parallèle semble plus lente que la simple version synchrone.
Mais de désespérons pas, il s’agit peut-être d’un souci de taille de l’échantillon considéré ! Pour en avoir le coeur net, nous allons re-tester notre code sur une liste de plus grande taille.
Générer un tableau de un million d’entrée, en créant une liste de 1000 listes de 1000 nombres aléatoires
Vous pouvez utiliser pour cela:
-
les compréhensions
for
d’élixir, associées -
les “range”
x..y
“ -
la fonction
:rand.uniform/1
d’Erlang
exemple:
for x <- 1..5 do
x
end
[ 1, 2, 3, 4, 5 ]
à vous de jouer:
# À COMPLÉTER
grande_liste = []
Benchee.run(
%{
"Version synchrone": &TP2Exercice2.sum_all_sync/1,
"Version concurrente": &TP2Exercice2.sum_all_parallel/1,
},
inputs: %{
"grande liste" => grande_liste
}
)
Processus liés
Il reste un cas de figure que nous n’avons pas encore envisagé, et qui s’illustre facilement avec l’exemple suivant:
spawn(fn ->
# attente de 5 secondes
Process.sleep(5000)
IO.puts "hello"
end)
raise "Le processus parent est crashé"
Vous pourrez observer le comportement suivant:
Malgré le crash du processus parent, le processus enfant a continué son exécution. Ce comportement est tout à fait normal, les processus étant indépendants.
L’exécution peut s’illustrer comme suit:
sequenceDiagram
autonumber
participant P as Processus parent
participant C as Processus enfant (non lié)
participant IO as Console
P->>C: spawn(...)
activate C
Note right of C: ⏳ L’enfant dort 5 s
P -x P: raise "Crash" %% auto-terminaison du parent
Note over P: Parent crashé
C->>IO: "hello"
deactivate C
Note over C: Pas de lien → l’enfant continue
Pourtant, dans de nombreux cas, on ne souhaitera pas continuer l’exécution des processus enfants dans le cas où le processus principal a crashé.
La notion de processus liés va nous donner une solution simple à cette problématique:
parent = spawn(fn ->
spawn_link(fn ->
# attente de 5 secondes
Process.sleep(5000)
IO.puts("hello")
end)
Process.sleep(1000)
raise "Déclenchement d'une RuntimeError"
end)
# on attend le message de "crash"
monitor_ref = Process.monitor(parent)
receive do
{:DOWN, ^monitor_ref, :process, ^parent, {reason, _stack}} ->
IO.inspect reason, label: "Le process parent a crashé avec"
end
Cette fois, l’enfant s’est arrêté automatiquement. Le “lien” entre l’enfant et le parent a permis de faire suivre le message de crash du parent au processus enfant:
sequenceDiagram
autonumber
participant M as Moniteur (process courant)
participant P as Parent
participant C as Enfant (lié)
participant IO as Console
M->>P: parent = spawn(...)
activate P
P->>C: spawn_link(...)
activate C
Note right of C: ⏳ L’enfant dort 5 s
P -x P: raise "RuntimeError" %% crash du parent
Note over P,C: Lien actif → le crash se propage
C -x C: Exit (propagation du lien) %% enfant termine aussi
M->>M: monitor_ref = Process.monitor(parent)
Note right of M: Reçoit {:DOWN, ^ref, :process, ^parent, {reason, _}}
Pour créer ce lien, nous avons simplement remplacé l’appel à spawn/1
par spawn_link/
qui lie les process parents et enfant.
Autres considérations
-
il existe des fonctions
Process.link/1
etProcess.unlink/1
qui permettent de liér / dé-lier des processus déjà lancés -
par défaut, le lien est bi-directionnel: si un des processus enfants s’arrête sur une erreur, le processus parent sera également impacté !
Cela peut être souhaité, mais si l’on souhaite avoir plus de contrôle – par ex. un processus parent qui ne s’arrête pas mais puisse traiter les crash de ses processus enfants – il faudra re-configurer le process parent avecProcess.flag(:trap_exit, true)
ce qui va convertir l’évènement de “demande d’arrêt de processus” en un message reçu dans la mailbox.
Cette notion HPC ne sera pas approfondie dans ce cours: nous découvrirons d’autres constructions pour organiser nos programmes avec une relation hiérarchique entre processus parents-enfants dans les notebooks à venir
Exercices supplémentaires
Les petits exercices suivants sont optionnels et uniquement là dans le but de pratiquer la programmation Elixir, si vous avez le temps.
Attente de la fin de “n” processus enfants
Dans l’exercice suivant, vous devrez lancer
- lancer des processus enfants
- monitorer ces processus
- creer une fonction d’attente qui va attendre que tous les processus soient arrêtés
Vous pouvez rpartir du code suivant
defmodule TP2ExerciceOpt1 do
def run do
parent = self()
# On lance 3 enfants avec des délais différents
delays = [{1, 200}, {2, 400}, {3, 150}]
# TODO: démarrer les process enfants et retourner leurs PIDs
# les processus enfants:
# - attendens le temps (ms) passé en argument
# - affichent "Enfant terminé" à la fin de l'exécutino
pids = []
# TODO: monitorer tous les processus
# TODO: lancer une boucle d'attente, qui s'arrête quand length(delays)
# process enfants ont répondu
end
end
Pour tester votre implémentation:
TP2ExerciceOpt1.run()
💡 Vous pouvez également modifier le code ci-dessous pour qu’il prenne en argument une liste de delays et crée un nombre variable de process en conséquence, par ex.:
TP2ExerciceOpt1.run([200, 400, 150]) # équivalent à la version précédente