C2N1 – Processus et programmation concurrente
Mix.install([
{:kino, "~> 0.16.1"},
])
defmodule TP do
def trace(trace_target \\ :all, trace_function) do
Kino.Process.render_seq_trace(
trace_target,
trace_function,
message_label: fn msg ->
case msg do
msg when is_binary(msg) -> {:ok, "MSG: #{msg}"}
{pid, msg} when is_pid(pid) and is_binary(msg) -> {:ok, "MSG: #{msg}"}
{pid, msg} when is_pid(pid) and is_atom(msg) -> {:ok, "MSG: #{inspect(msg)}"}
{:result, _, value} -> {:ok, "RES: #{value}"}
_ -> :continue
end
end
)
end
end
Objectifs de la séance
- Comprendre ce qu’est un processus Elixir (vs thread OS) et pourquoi ils sont si légers.
-
Savoir créer un processus (
spawn/1
). -
Communiquer entre processus (
send
/receive
, boîtes aux lettres, timeouts).
BEAM, isolation et tolérance aux pannes
Vous avez peut-être déjà enetendu le terme “processus” en parlant de l’exécution de programmes sur votre ordinateur ?
De façon générales, les prgrammes informatiques s’exécutent en parallèle, dans des environnement isolés: ils se partagent la puissance de calcul et la mémoire, sans se “bloquer” les uns les autres ou “écraser” zones de mémoires qu’ils utilisent les uns les autres.
Au sein d’un programme Elixir, différents processus (“process”) s’exécutent également en parallèle. Ces processus sont différents des process habituels gérés par le système d’exploitation: ici c’est la VM BEAM – l’environnement d’exécution hérité d’Erlang – qui la gérer et coordonner ces processus au sein de notre programme. Cette couche d’abstraction supplémentaire a un certain nombre d’avantages:
- contrairement aux processus systèmes qui sont relativement coûteux en ressources, les process BEAM sont extrêmement légers: il n’est pas rare qu’un système Erlang/Elixir lance des millions de process simultannément pour gérer un grand flux d’information
- ils disposent d’un système de communication inter-processus – dit par messages très performant et automatisé
- il profitent de la sécurité qu’offre la mémoire immutable de la VM
Ainsi, chaque processus Erlang créé obtient les ressources suivantes:
- un identifiant unique, dit “Process ID” ou “PID“ qui permet de le retrouver et de lui envoyer des messages
- une “boîte aux lettres” (“Mailbox“) qui stockera automatiquement tous les messages adressés à ce process
le schéma suivant illustre la communcation entre plusieurs process:
flowchart LR
subgraph VM[BEAM VM]
A[Processus A
PID <0.101.0>
Mailbox]:::proc -->|message| B[Processus B
PID <0.102.0>
Mailbox]:::proc
C[Processus C
PID <0.103.0>
Mailbox]:::proc -->|message| B
end
classDef proc fill:#e5f6ff,stroke:#0284c7,stroke-width:2px,rx:8,ry:8;
Affichage des PIDs
Tout celà peut paraître très abstrait, mais nous allons maintenant illustrer ces concepts.
Le Livebook que vous exécutez est un programme Elixir, et votre notebook s’exécute dans un process Elixir.
Nous pouvons utiliser self/0
pour afficher le PID du process responsable de la cellule suivante:
self()
Évidemment, nous pouvons démarrer de nouveaux process via une commande dédiée – spawn/1
– qui va créer un nouveau processus puis lancer la fonction passée en argument au sein de ce nouveau processus:
IO.inspect self(), label: "PID du process parent"
pid_du_nouveau_process = spawn(fn ->
IO.puts "Coucou depuis le nouveau process !"
# 👇 cet appel à self() est exécuté dans le nouveau process et sera donc différent
IO.inspect self(), label: "PID du process parent"
end)
Vous devriez avoir un résultat qui ressemble au suivant:
💡 Quelques remarques avant de continuer:
-
l’appel à
self/0
retourne toujours le PID du process courant: c’est pourquoi elle renvoie un résultat différent lorsqu’elle est appelée dans le nouveau processus créé parspawn/1
-
le PID du nouveau processus est également retourné par l’appel à
spawn/1
au parent: l’intérêt étant que grâce à ce PID le parent pourra envoyer des messages au process enfant !
Communication entre les processus
Une fois le PID du process avec lequel nous essayons de communiqué connu, il reste deux opérations à mener:
- envoyer un message au processus
- au sein de ce processus capturer le message reçu
Pour se faire, Elixir dispose de fonctionnalités dédiées:
-
la fonction
send/2
qui prend un PID et un message (n’importe quelle valeur Elixir) à envoyer -
la fonction
receive/1
qui prend un bloc de fonctions à pattern-matcher avec ce message (pensez au fonctionnement decase
)
L’exemple très simple ci-dessous continue là où nous avions commencé :
# on démarre un processus enfant
enfant = spawn(fn ->
receive do
message ->
IO.inspect(message, label: "ENFANT")
end
end)
# envoyons un message à l'enfant:
send(enfant, "Coucou")
Nous avons donc le process parent qui envoie un message au process enfant, et ce dernier, lorqu’il le reçoit dans son bloc receive
va afficher un message dans la console.
Afin d’illustrer ce fonctionnement, nous pouvons ré-utiliser le code précédent dans une fonction spécifique TP.trace/1
(spécifique à ce TP et basée sur Kino.Process.render_sql_trace/1
) de Livebook qui permet de tracer l’exécution en détail:
TP.trace(fn ->
enfant = spawn(fn ->
receive do
message ->
IO.inspect(message, label: "ENFANT")
end
end)
send(enfant, "Coucou")
end)
On voit que tout fonctionne bien, dans un sens. Il reste cependant une question: comment faire pour que le process enfant puisse “répondre” au parent ?
La solution la plus simple consiste à passer le process parent par closure, c’est-à-dire une variable définie avant la fonction et qui fera partie de son contexte d’exécuption, illustration:
# on sauvetarde le PID du parent dans une variable
parent = self()
TP.trace(parent, fn ->
enfant =
# Cette fonction anonyme étant définie "après" la variable et peut la référencer
# ici la référence vers "parent" est passé implicitement à la fonction via son context de
# déclaration: on parle de "fermeture" ou "closure" en anglais.
spawn(fn ->
receive do
message ->
IO.inspect(message, label: "ENFANT")
# renvoi du message au parent, dont on connaît désormais le PID
send(parent, "Coucou toi aussi")
end
end)
# Envoi du message à l'enfant
send(enfant, "Coucou")
# Attente d'un message de la part de l'enfant
receive do
message -> message
end
end)
Exercice
Reprenez le code précédent, mais supprimez la variable parent
au début du programme. Vous devez modifier le code pour envoyer un message qui contienne à la voir la chaîne de caractères à afficher et également le PID auquel il doit répondre.
enfant =
spawn(fn ->
receive do
# À COMPLÉTER
end
end)
# À COMPLÉTER
send(enfant, "???")
# À DÉCOMMENTER QUAND VOUS ÊTES PRÊT
# receive do
# message -> message
# end
# CORRECTION POSSIBLE
enfant =
spawn(fn ->
receive do
{pid, msg} ->
IO.puts(msg)
send(pid, :ok)
end
end)
# À COMPLÉTER
send(enfant, {self(), "Coucou avec réponse"})
# Attente d'un message de la part de l'enfant
receive do
message -> message
end
Un “long-lived process”
Dans tous nos exemples ci-dessus, nos processus s’arrêtent dès la fin de l’exécution de la fonction. Cependant, il serait intéressant d’avoir des processus capables de traiter plusieurs messages d’affilée sans s’arrêter.
Pour cela, nous allons utiliser deux fonctionnalités d’Élixir que nous avons déjà rencontrés:
- les modules
- et la récursion 🔥
Il est alors possible d’utiliser spawn/3
avec trois arguments pour invoquer la fonction dans notre module. La syntaxe est la suivante:
Voyons un exemple:
defmodule LongLivedProcess do
# Notre fonction prend deux argument: un nom à afficher dans la console,
# et un nombre de boucles à effectuer
def loop(name, count) do
# Attente du message
receive do
# Pattern-matching: on s'attend à recevoir un tuple { PID, string }
{pid, message} ->
IO.puts "#{name}: Received #{message} from #{inspect(pid)}"
response =
case message do
:ping -> :pong # si on reçoit :ping, on renvoie :pong
:pong -> :ping # si on reçoit :pong, on renvoie :ping
end
send(pid, { self(), response })
end
# Pour éviter de créer des "boucles infinies" on décrémente le compteur à chaque itération
if count > 0 do
loop(name, count-1)
end
end
end
Pour exécuter ce code, nous allons créer deux processus enfants et les faire communiquer entre eux.
💡 Vous verrez apparaître un process supplémentaire sur ce shéma, qui recoit des message io_request
et io_reply
: il s’agit du process qui gère l’affichage des logs dans la cellule de Livebook (quand vous appelez IO.puts/1
). Vous pouvez l’ignorer.
En pratique:
# On crée deux process
process1 = spawn(LongLivedProcess, :loop, ["PROCESS1", 3])
process2 = spawn(LongLivedProcess, :loop, ["PROCESS2", 3])
# On affiche leur PIDs (pour aider à suivre l'exécution)
IO.inspect(process1, label: "PID du PROCESS1")
IO.inspect(process2, label: "PID du PROCESS2")
TP.trace([process1, process2], fn ->
# on "lance la machine" en simulant l'envoi d'un message du process 1 au process 2
send(process2, {process1, :ping})
# on bloque 300ms pour "attendre" que tous les messages soient envoyés
Process.sleep(300)
end)
Exercice d’application: MapReduce
Nous allons terminer ce TP en créant un premier module d’exécution distributée.
Objectif: nous allons créer un programme qui va “compter le nombre de mots” dans un texte”. Et nous alons de faire de façon concurrente, en séparant le travail entre plusieurs process.
- nous allons utiliser une approche “MapReduce”
-
nous appelons la fonction
MapReduce1.run/1
en lui passant une liste de textes à analyzer -
pour chaque texte, nous lançons un processus d’analyze (dans
start_all/1
) -
puis dans
gather
nous attendons toutes les réponses pour finaliser le calcul final
flowchart LR
D[textes]
M["texts |> start_all |> gather"]
D -->|run| M
M -->|spawn| W1[Worker #1]:::worker
M -->|spawn| W2[Worker #2]:::worker
M -->|spawn| Wn[Worker #n]:::worker
W1 -->|"{:result, ..., count}"| M
W2 -->|"{:result, ..., count}"| M
Wn -->|"{:result, ..., count}"| M
Voici le code à compléter:
defmodule MapReduce1 do
# Map → Reduce
def run(texts) do
texts |> start_all() |> gather()
end
# TODO
def start_all(texts) do
[] # TODO: lancer les process worker et retourner la liste des PIDs
end
# TODO: déterminer les arguments à passer
def worker(:todo) do
# TODO:
# - compter les mots du texte
# - répondre au parent avec le résultat: format suggéré {:result, pid_worker, count_des_mots}
end
# 4) Boucle d'agrégation: reçoit results et DOWN, relance si besoin
defp gather(pids) do
# TODO:
# - attendre les résultats des workers
# - additioner les résultats de chaque worker
# - retourner le résultat final
0
end
# Cadeau 🎁
def count_words(text) do
text
|> String.downcase()
|> String.replace(~r/[^[:alnum:]\p{L}\s-]/u, " ")
|> String.split(~r/\s+/, trim: true)
|> Enum.count()
end
end
haiku_dakotsu = [
"Du temps passé",
"Me revient le souvenir",
"Les biches au printemps."
]
TP.trace(self(), fn ->
MapReduce1.run(haiku_dakotsu)
end)
lac_lamartine = [
"Ainsi, toujours poussés vers de nouveaux rivages,",
"Dans la nuit éternelle emportés sans retour,",
"Ne pourrons-nous jamais sur l'océan des âges",
"Jeter l'ancre un seul jour ?",
"Ô lac ! l'année à peine a fini sa carrière,",
"Et près des flots chéris qu'elle devait revoir,",
"Regarde ! je viens seul m'asseoir sur cette pierre",
"Où tu la vis s'asseoir !",
"Tu mugissais ainsi sous ces roches profondes ;",
"Ainsi tu te brisais sur leurs flancs déchirés ;",
"Ainsi le vent jetait l'écume de tes ondes",
"Sur ses pieds adorés.",
"Un soir, t'en souvient-il ? nous voguions en silence ;",
"On n'entendait au loin, sur l'onde et sous les cieux,",
"Que le bruit des rameurs qui frappaient en cadence",
"Tes flots harmonieux.",
"Tout à coup des accents inconnus à la terre",
"Du rivage charmé frappèrent les échos ;",
"Le flot fut attentif, et la voix qui m'est chère",
"Laissa tomber ces mots :",
"Ô temps ! suspends ton vol, et vous, heures propices,",
"Suspendez votre cours !",
"Laissez-nous savourer les rapides délices",
"Des plus beaux de nos jours !",
"Assez de malheureux ici-bas vous implorent ;",
"Coulez, coulez pour eux ;",
"Prenez avec leurs jours les soins qui les dévorent ;",
"Oubliez les heureux.",
"Mais je demande en vain quelques moments encore,",
"Le temps m'échappe et fuit ;",
"Je dis à cette nuit : Sois plus lente ; et l'aurore",
"Va dissiper la nuit.",
"Aimons donc, aimons donc ! de l'heure fugitive,",
"Hâtons-nous, jouissons !",
"L'homme n'a point de port, le temps n'a point de rive ;",
"Il coule, et nous passons !",
"Temps jaloux, se peut-il que ces moments d'ivresse,",
"Où l'amour à longs flots nous verse le bonheur,",
"S'envolent loin de nous de la même vitesse",
"Que les jours de malheur ?",
"Hé quoi ! n'en pourrons-nous fixer au moins la trace ?",
"Quoi ! passés pour jamais ! quoi ! tout entiers perdus !",
"Ce temps qui les donna, ce temps qui les efface,",
"Ne nous les rendra plus !",
"Éternité, néant, passé, sombres abîmes,",
"Que faites-vous des jours que vous engloutissez ?",
"Parlez : nous rendrez-vous ces extases sublimes",
"Que vous nous ravissez ?",
"Ô lac ! rochers muets ! grottes ! forêt obscure !",
"Vous, que le temps épargne ou qu'il peut rajeunir,",
"Gardez de cette nuit, gardez, belle nature,",
"Au moins le souvenir !",
"Qu'il soit dans ton repos, qu'il soit dans tes orages,",
"Beau lac, et dans l'aspect de tes riants coteaux,",
"Et dans ces noirs sapins, et dans ces rocs sauvages",
"Qui pendent sur tes eaux.",
"Qu'il soit dans le zéphyr qui frémit et qui passe,",
"Dans les sons de tes bords par tes rivages portés,",
"Dans l'astre au front d'argent qui blanchit ta surface",
"De ses molles clartés !",
"Que le vent qui gémit, le roseau qui soupire,",
"Que les parfums légers de ton air embaumé,",
"Que tout ce qu'on entend, l'on voit et l'on respire,",
"Tout dise : Ils ont aimé !"
]
# Avec trace visuel (optionnel)
TP.trace(self(), fn ->
MapReduce1.run(lac_lamartine)
end)
Vous pouvez maintenant vous rendre au TP suivant pour continuer votre exploration de la programmation concurrente avec Elixir.