Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

C2N1 – Processus et programmation concurrente

1-intro-to-concurrency.livemd

C2N1 – Processus et programmation concurrente

Mix.install([
  {:kino, "~> 0.16.1"},
])

defmodule TP do
  def trace(trace_target \\ :all, trace_function) do
    Kino.Process.render_seq_trace(
      trace_target,
      trace_function,
      message_label: fn msg ->
        case msg do
          msg when is_binary(msg) -> {:ok, "MSG: #{msg}"}
          {pid, msg} when is_pid(pid) and is_binary(msg) -> {:ok, "MSG: #{msg}"}
          {pid, msg} when is_pid(pid) and is_atom(msg) -> {:ok, "MSG: #{inspect(msg)}"}
          {:result, _, value} -> {:ok, "RES: #{value}"}  
          _ -> :continue
        end
      end
    )
  end
end

Objectifs de la séance

  • Comprendre ce qu’est un processus Elixir (vs thread OS) et pourquoi ils sont si légers.
  • Savoir créer un processus (spawn/1).
  • Communiquer entre processus (send/receive, boîtes aux lettres, timeouts).

BEAM, isolation et tolérance aux pannes

Vous avez peut-être déjà enetendu le terme “processus” en parlant de l’exécution de programmes sur votre ordinateur ?

De façon générales, les prgrammes informatiques s’exécutent en parallèle, dans des environnement isolés: ils se partagent la puissance de calcul et la mémoire, sans se “bloquer” les uns les autres ou “écraser” zones de mémoires qu’ils utilisent les uns les autres.

Au sein d’un programme Elixir, différents processus (“process”) s’exécutent également en parallèle. Ces processus sont différents des process habituels gérés par le système d’exploitation: ici c’est la VM BEAM – l’environnement d’exécution hérité d’Erlang – qui la gérer et coordonner ces processus au sein de notre programme. Cette couche d’abstraction supplémentaire a un certain nombre d’avantages:

  • contrairement aux processus systèmes qui sont relativement coûteux en ressources, les process BEAM sont extrêmement légers: il n’est pas rare qu’un système Erlang/Elixir lance des millions de process simultannément pour gérer un grand flux d’information
  • ils disposent d’un système de communication inter-processus – dit par messages très performant et automatisé
  • il profitent de la sécurité qu’offre la mémoire immutable de la VM

Ainsi, chaque processus Erlang créé obtient les ressources suivantes:

  • un identifiant unique, dit “Process ID” ou “PID“ qui permet de le retrouver et de lui envoyer des messages
  • une “boîte aux lettres” (“Mailbox“) qui stockera automatiquement tous les messages adressés à ce process

le schéma suivant illustre la communcation entre plusieurs process:

flowchart LR
  subgraph VM[BEAM VM]
    A[Processus A 
PID <0.101.0>
Mailbox]:::proc -->|message| B[Processus B
PID <0.102.0>
Mailbox]:::proc C[Processus C
PID <0.103.0>
Mailbox]:::proc -->|message| B end classDef proc fill:#e5f6ff,stroke:#0284c7,stroke-width:2px,rx:8,ry:8;

Affichage des PIDs

Tout celà peut paraître très abstrait, mais nous allons maintenant illustrer ces concepts.

Le Livebook que vous exécutez est un programme Elixir, et votre notebook s’exécute dans un process Elixir.

Nous pouvons utiliser self/0 pour afficher le PID du process responsable de la cellule suivante:

self()

Évidemment, nous pouvons démarrer de nouveaux process via une commande dédiée – spawn/1 – qui va créer un nouveau processus puis lancer la fonction passée en argument au sein de ce nouveau processus:

IO.inspect self(), label: "PID du process parent"

pid_du_nouveau_process = spawn(fn ->
  IO.puts "Coucou depuis le nouveau process !"

  #          👇 cet appel à self() est exécuté dans le nouveau process et sera donc différent
  IO.inspect self(), label: "PID du process parent"
end)

Vous devriez avoir un résultat qui ressemble au suivant:

💡 Quelques remarques avant de continuer:

  • l’appel à self/0 retourne toujours le PID du process courant: c’est pourquoi elle renvoie un résultat différent lorsqu’elle est appelée dans le nouveau processus créé par spawn/1
  • le PID du nouveau processus est également retourné par l’appel à spawn/1 au parent: l’intérêt étant que grâce à ce PID le parent pourra envoyer des messages au process enfant !

Communication entre les processus

Une fois le PID du process avec lequel nous essayons de communiqué connu, il reste deux opérations à mener:

  1. envoyer un message au processus
  2. au sein de ce processus capturer le message reçu

Pour se faire, Elixir dispose de fonctionnalités dédiées:

  • la fonction send/2 qui prend un PID et un message (n’importe quelle valeur Elixir) à envoyer
  • la fonction receive/1 qui prend un bloc de fonctions à pattern-matcher avec ce message (pensez au fonctionnement de case)

L’exemple très simple ci-dessous continue là où nous avions commencé :

# on démarre un processus enfant
enfant = spawn(fn ->
  receive do
    message ->
      IO.inspect(message, label: "ENFANT")
  end
end)

# envoyons un message à l'enfant:
send(enfant, "Coucou")

Nous avons donc le process parent qui envoie un message au process enfant, et ce dernier, lorqu’il le reçoit dans son bloc receive va afficher un message dans la console.

Afin d’illustrer ce fonctionnement, nous pouvons ré-utiliser le code précédent dans une fonction spécifique TP.trace/1 (spécifique à ce TP et basée sur Kino.Process.render_sql_trace/1) de Livebook qui permet de tracer l’exécution en détail:

TP.trace(fn ->
  
  enfant = spawn(fn ->
    receive do
      message ->
        IO.inspect(message, label: "ENFANT")
    end
  end)
  
  send(enfant, "Coucou")
  
end)

On voit que tout fonctionne bien, dans un sens. Il reste cependant une question: comment faire pour que le process enfant puisse “répondre” au parent ?

La solution la plus simple consiste à passer le process parent par closure, c’est-à-dire une variable définie avant la fonction et qui fera partie de son contexte d’exécuption, illustration:

# on sauvetarde le PID du parent dans une variable
parent = self()

TP.trace(parent, fn ->
  
  enfant =
    # Cette fonction anonyme étant définie "après" la variable et peut la référencer
    # ici la référence vers "parent" est passé implicitement à la fonction via son context de
    # déclaration: on parle de "fermeture" ou "closure" en anglais.
    spawn(fn ->
      receive do
        message ->
          IO.inspect(message, label: "ENFANT")

          # renvoi du message au parent, dont on connaît désormais le PID
          send(parent, "Coucou toi aussi")
      end
    end)

  # Envoi du message à l'enfant
  send(enfant, "Coucou")

  # Attente d'un message de la part de l'enfant
  receive do
    message -> message  
  end
  
end)

Exercice

Reprenez le code précédent, mais supprimez la variable parent au début du programme. Vous devez modifier le code pour envoyer un message qui contienne à la voir la chaîne de caractères à afficher et également le PID auquel il doit répondre.

enfant =
  spawn(fn ->
    receive do
      # À COMPLÉTER
    end
  end)

# À COMPLÉTER
send(enfant, "???")

# À DÉCOMMENTER QUAND VOUS ÊTES PRÊT
# receive do
#   message -> message
# end
# CORRECTION POSSIBLE

enfant =
  spawn(fn ->
    receive do
      {pid, msg} ->
        IO.puts(msg)
        send(pid, :ok)
    end
  end)

# À COMPLÉTER
send(enfant, {self(), "Coucou avec réponse"})

# Attente d'un message de la part de l'enfant
receive do
  message -> message
end

Un “long-lived process”

Dans tous nos exemples ci-dessus, nos processus s’arrêtent dès la fin de l’exécution de la fonction. Cependant, il serait intéressant d’avoir des processus capables de traiter plusieurs messages d’affilée sans s’arrêter.

Pour cela, nous allons utiliser deux fonctionnalités d’Élixir que nous avons déjà rencontrés:

  • les modules
  • et la récursion 🔥

Il est alors possible d’utiliser spawn/3 avec trois arguments pour invoquer la fonction dans notre module. La syntaxe est la suivante:

Voyons un exemple:

defmodule LongLivedProcess do

  # Notre fonction prend deux argument: un nom à afficher dans la console, 
  # et un nombre de boucles à effectuer
  def loop(name, count) do
    
    # Attente du message
    receive do

      # Pattern-matching: on s'attend à recevoir un tuple { PID, string }
      {pid, message} ->
        IO.puts "#{name}: Received #{message} from #{inspect(pid)}"

        response =
          case message do
            :ping -> :pong # si on reçoit :ping, on renvoie :pong
            :pong -> :ping # si on reçoit :pong, on renvoie :ping
          end
        send(pid, { self(), response })
    end

    # Pour éviter de créer des "boucles infinies" on décrémente le compteur à chaque itération
    if count > 0 do
      loop(name, count-1)
    end
    
  end
  
end

Pour exécuter ce code, nous allons créer deux processus enfants et les faire communiquer entre eux.

💡 Vous verrez apparaître un process supplémentaire sur ce shéma, qui recoit des message io_request et io_reply: il s’agit du process qui gère l’affichage des logs dans la cellule de Livebook (quand vous appelez IO.puts/1). Vous pouvez l’ignorer.

En pratique:

# On crée deux process
process1 = spawn(LongLivedProcess, :loop, ["PROCESS1", 3])
process2 = spawn(LongLivedProcess, :loop, ["PROCESS2", 3])

# On affiche leur PIDs (pour aider à suivre l'exécution)
IO.inspect(process1, label: "PID du PROCESS1")
IO.inspect(process2, label: "PID du PROCESS2")

TP.trace([process1, process2], fn ->
  
  # on "lance la machine" en simulant l'envoi d'un message du process 1 au process 2
  send(process2, {process1, :ping})

  # on bloque 300ms pour "attendre" que tous les messages soient envoyés
  Process.sleep(300)
  
end)

Exercice d’application: MapReduce

Nous allons terminer ce TP en créant un premier module d’exécution distributée.

Objectif: nous allons créer un programme qui va “compter le nombre de mots” dans un texte”. Et nous alons de faire de façon concurrente, en séparant le travail entre plusieurs process.

  • nous allons utiliser une approche “MapReduce”
  • nous appelons la fonction MapReduce1.run/1 en lui passant une liste de textes à analyzer
  • pour chaque texte, nous lançons un processus d’analyze (dans start_all/1)
  • puis dans gather nous attendons toutes les réponses pour finaliser le calcul final
flowchart LR
  D[textes]
  M["texts |> start_all |> gather"]

  D -->|run| M

  M -->|spawn| W1[Worker #1]:::worker
  M -->|spawn| W2[Worker #2]:::worker
  M -->|spawn| Wn[Worker #n]:::worker

  W1 -->|"{:result, ..., count}"| M
  W2 -->|"{:result, ..., count}"| M
  Wn -->|"{:result, ..., count}"| M

Voici le code à compléter:

defmodule MapReduce1 do
  # Map → Reduce
  def run(texts) do
    texts |> start_all() |> gather()
  end

  # TODO
  def start_all(texts) do
    [] # TODO: lancer les process worker et retourner la liste des PIDs
  end

  # TODO: déterminer les arguments à passer
  def worker(:todo) do
    # TODO: 
    # - compter les mots du texte
    # - répondre au parent avec le résultat: format suggéré {:result, pid_worker, count_des_mots} 
  end

  # 4) Boucle d'agrégation: reçoit results et DOWN, relance si besoin
  defp gather(pids) do
    # TODO:
    # - attendre les résultats des workers
    # - additioner les résultats de chaque worker
    # - retourner le résultat final
    0
  end

  # Cadeau 🎁
  def count_words(text) do
    text
    |> String.downcase()
    |> String.replace(~r/[^[:alnum:]\p{L}\s-]/u, " ")
    |> String.split(~r/\s+/, trim: true)
    |> Enum.count()
  end
end
haiku_dakotsu = [
  "Du temps passé", 
  "Me revient le souvenir", 
  "Les biches au printemps."
]

TP.trace(self(), fn ->
  MapReduce1.run(haiku_dakotsu)
end)
lac_lamartine = [
  "Ainsi, toujours poussés vers de nouveaux rivages,",
  "Dans la nuit éternelle emportés sans retour,",
  "Ne pourrons-nous jamais sur l'océan des âges",
  "Jeter l'ancre un seul jour ?",

  "Ô lac ! l'année à peine a fini sa carrière,",
  "Et près des flots chéris qu'elle devait revoir,",
  "Regarde ! je viens seul m'asseoir sur cette pierre",
  "Où tu la vis s'asseoir !",

  "Tu mugissais ainsi sous ces roches profondes ;",
  "Ainsi tu te brisais sur leurs flancs déchirés ;",
  "Ainsi le vent jetait l'écume de tes ondes",
  "Sur ses pieds adorés.",

  "Un soir, t'en souvient-il ? nous voguions en silence ;",
  "On n'entendait au loin, sur l'onde et sous les cieux,",
  "Que le bruit des rameurs qui frappaient en cadence",
  "Tes flots harmonieux.",

  "Tout à coup des accents inconnus à la terre",
  "Du rivage charmé frappèrent les échos ;",
  "Le flot fut attentif, et la voix qui m'est chère",
  "Laissa tomber ces mots :",

  "Ô temps ! suspends ton vol, et vous, heures propices,",
  "Suspendez votre cours !",
  "Laissez-nous savourer les rapides délices",
  "Des plus beaux de nos jours !",

  "Assez de malheureux ici-bas vous implorent ;",
  "Coulez, coulez pour eux ;",
  "Prenez avec leurs jours les soins qui les dévorent ;",
  "Oubliez les heureux.",

  "Mais je demande en vain quelques moments encore,",
  "Le temps m'échappe et fuit ;",
  "Je dis à cette nuit : Sois plus lente ; et l'aurore",
  "Va dissiper la nuit.",

  "Aimons donc, aimons donc ! de l'heure fugitive,",
  "Hâtons-nous, jouissons !",
  "L'homme n'a point de port, le temps n'a point de rive ;",
  "Il coule, et nous passons !",

  "Temps jaloux, se peut-il que ces moments d'ivresse,",
  "Où l'amour à longs flots nous verse le bonheur,",
  "S'envolent loin de nous de la même vitesse",
  "Que les jours de malheur ?",

  "Hé quoi ! n'en pourrons-nous fixer au moins la trace ?",
  "Quoi ! passés pour jamais ! quoi ! tout entiers perdus !",
  "Ce temps qui les donna, ce temps qui les efface,",
  "Ne nous les rendra plus !",

  "Éternité, néant, passé, sombres abîmes,",
  "Que faites-vous des jours que vous engloutissez ?",
  "Parlez : nous rendrez-vous ces extases sublimes",
  "Que vous nous ravissez ?",

  "Ô lac ! rochers muets ! grottes ! forêt obscure !",
  "Vous, que le temps épargne ou qu'il peut rajeunir,",
  "Gardez de cette nuit, gardez, belle nature,",
  "Au moins le souvenir !",

  "Qu'il soit dans ton repos, qu'il soit dans tes orages,",
  "Beau lac, et dans l'aspect de tes riants coteaux,",
  "Et dans ces noirs sapins, et dans ces rocs sauvages",
  "Qui pendent sur tes eaux.",

  "Qu'il soit dans le zéphyr qui frémit et qui passe,",
  "Dans les sons de tes bords par tes rivages portés,",
  "Dans l'astre au front d'argent qui blanchit ta surface",
  "De ses molles clartés !",

  "Que le vent qui gémit, le roseau qui soupire,",
  "Que les parfums légers de ton air embaumé,",
  "Que tout ce qu'on entend, l'on voit et l'on respire,",
  "Tout dise : Ils ont aimé !"
]

# Avec trace visuel (optionnel)
TP.trace(self(), fn ->
  MapReduce1.run(lac_lamartine)
end)

Vous pouvez maintenant vous rendre au TP suivant pour continuer votre exploration de la programmation concurrente avec Elixir.