Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Chapter 2 - Dynamic Supervisor

chapter2_dynamic-supervisor.livemd

Chapter 2 - Dynamic Supervisor

Intro

DynamicSupervisor is another ready to use supervisor shipped with Elixir. It can start any GenServer on demand.

It must use a :one_for_one strategy.

children = [
  {DynamicSupervisor, strategy: :one_for_one, name: Jobber.JobRunner}
]

opts = [strategy: :one_for_one, name: Jobber.Supervisor]

Supervisor.start_link(children, opts)

You can also define a DynamicSupervisor module using the behavior.

good_job = fn ->
  Process.sleep(5000)
  {:ok, []}
end

bad_job = fn ->
  Process.sleep(5000)
  :error
end
defmodule Jobber.Job do
  defstruct [:work, :id, :max_retries, retries: 0, status: "new"]
  use GenServer
  require Logger

  def init(args) do
    work = Keyword.fetch!(args, :work)
    id = Keyword.get(args, :id, random_job_id())
    max_retries = Keyword.get(args, :max_retries, 3)

    state = %Jobber.Job{id: id, work: work, max_retries: max_retries}
    {:ok, state, {:continue, :run}}
  end

  def handle_continue(:run, state) do
    new_state =
      state.work.()
      |> handle_job_result(state)

    if new_state.status == "errored" do
      Process.send_after(self(), :retry, 5000)
      {:noreply, new_state}
    else
      Logger.info("Job exiting #{state.id}")
      {:stop, :normal, new_state}
    end
  end

  def handle_info(:retry, state) do
    {:noreply, state, {:continue, :run}}
  end

  defp random_job_id() do
    :crypto.strong_rand_bytes(5)
    |> Base.url_encode64(padding: false)
  end

  defp handle_job_result({:ok, _data}, state) do
    Logger.info("Job completed #{state.id}")
    %Jobber.Job{state | status: "done"}
  end

  defp handle_job_result(:error, %{status: "new"} = state) do
    Logger.warn("Job errored #{state.id}")
    %Jobber.Job{state | status: "errored"}
  end

  defp handle_job_result(:error, %{status: "errored"} = state) do
    Logger.warn("Job retry failed #{state.id}")
    new_state = %Jobber.Job{state | retries: state.retries + 1}

    if new_state.retries == state.max_retries do
      %Jobber.Job{new_state | status: "failed"}
    else
      new_state
    end
  end
end
defmodule Jobber do
  alias Jobber.{JobRunner, Job}

  def start_job(args) do
    DynamicSupervisor.start_child(JobRunner, {Job, args})
  end
end
Jobber.start_job(work: good_job)

This fails because we haven’t implemented a start_link/1 function in Job yet.

defmodule Jobber.Job do
  defstruct [:work, :id, :max_retries, retries: 0, status: "new"]
  use GenServer
  require Logger

  def init(args) do
    work = Keyword.fetch!(args, :work)
    id = Keyword.get(args, :id, random_job_id())
    max_retries = Keyword.get(args, :max_retries, 3)

    state = %Jobber.Job{id: id, work: work, max_retries: max_retries}
    {:ok, state, {:continue, :run}}
  end

  def handle_continue(:run, state) do
    new_state =
      state.work.()
      |> handle_job_result(state)

    if new_state.status == "errored" do
      Process.send_after(self(), :retry, 5000)
      {:noreply, new_state}
    else
      Logger.info("Job exiting #{state.id}")
      {:stop, :normal, new_state}
    end
  end

  def handle_info(:retry, state) do
    {:noreply, state, {:continue, :run}}
  end

  def start_link(args) do
    GenServer.start_link(__MODULE__, args)
  end

  defp random_job_id() do
    :crypto.strong_rand_bytes(5)
    |> Base.url_encode64(padding: false)
  end

  defp handle_job_result({:ok, _data}, state) do
    Logger.info("Job completed #{state.id}")
    %Jobber.Job{state | status: "done"}
  end

  defp handle_job_result(:error, %{status: "new"} = state) do
    Logger.warn("Job errored #{state.id}")
    %Jobber.Job{state | status: "errored"}
  end

  defp handle_job_result(:error, %{status: "errored"} = state) do
    Logger.warn("Job retry failed #{state.id}")
    new_state = %Jobber.Job{state | retries: state.retries + 1}

    if new_state.retries == state.max_retries do
      %Jobber.Job{new_state | status: "failed"}
    else
      new_state
    end
  end
end
###
# this continually requeues jobs because the supervisor restarts the job when complete
# because a completed job looks like a problem to a supervisor expecting a process to
# stay running
###

# Jobber.start_job(work: good_job)
defmodule Jobber.Job do
  defstruct [:work, :id, :max_retries, retries: 0, status: "new"]
  use GenServer, restart: :transient
  require Logger

  def init(args) do
    work = Keyword.fetch!(args, :work)
    id = Keyword.get(args, :id, random_job_id())
    max_retries = Keyword.get(args, :max_retries, 3)

    state = %Jobber.Job{id: id, work: work, max_retries: max_retries}
    {:ok, state, {:continue, :run}}
  end

  def handle_continue(:run, state) do
    new_state =
      state.work.()
      |> handle_job_result(state)

    if new_state.status == "errored" do
      Process.send_after(self(), :retry, 5000)
      {:noreply, new_state}
    else
      Logger.info("Job exiting #{state.id}")
      {:stop, :normal, new_state}
    end
  end

  def handle_info(:retry, state) do
    {:noreply, state, {:continue, :run}}
  end

  def start_link(args) do
    GenServer.start_link(__MODULE__, args)
  end

  defp random_job_id() do
    :crypto.strong_rand_bytes(5)
    |> Base.url_encode64(padding: false)
  end

  defp handle_job_result({:ok, _data}, state) do
    Logger.info("Job completed #{state.id}")
    %Jobber.Job{state | status: "done"}
  end

  defp handle_job_result(:error, %{status: "new"} = state) do
    Logger.warn("Job errored #{state.id}")
    %Jobber.Job{state | status: "errored"}
  end

  defp handle_job_result(:error, %{status: "errored"} = state) do
    Logger.warn("Job retry failed #{state.id}")
    new_state = %Jobber.Job{state | retries: state.retries + 1}

    if new_state.retries == state.max_retries do
      %Jobber.Job{new_state | status: "failed"}
    else
      new_state
    end
  end
end
Jobber.start_job(work: good_job)
Jobber.start_job(work: bad_job)
doomed_job = fn ->
  Process.sleep(5000)
  raise "boom"
end

This will continually hit an error because the job is failing but not failing with enough frequency to cause the supervisor to consider it a problem

Jobber.start_job(work: doomed_job)
Supervisor.stop(Jobber.Supervisor)
job_runner_config = [
  strategy: :one_for_one,
  max_seconds: 30,
  name: Jobber.JobRunner
]

children = [
  {DynamicSupervisor, job_runner_config}
]

opts = [strategy: :one_for_one, name: Jobber.Supervisor]

Supervisor.start_link(children, opts)
# now should not error forever
Jobber.start_job(work: doomed_job)
Process.whereis(Jobber.JobRunner)

But we can see the supervisor itself restarted as a result (new PID).

That’s not great because any concurrent Job processes would also exit

We’ll fix this issue by adding a supervisor for every Job process