Chapter 2 - Dynamic Supervisor
Intro
DynamicSupervisor
is another ready to use supervisor shipped with Elixir. It can start any
GenServer on demand.
It must use a :one_for_one
strategy.
children = [
{DynamicSupervisor, strategy: :one_for_one, name: Jobber.JobRunner}
]
opts = [strategy: :one_for_one, name: Jobber.Supervisor]
Supervisor.start_link(children, opts)
You can also define a DynamicSupervisor
module using the behavior.
good_job = fn ->
Process.sleep(5000)
{:ok, []}
end
bad_job = fn ->
Process.sleep(5000)
:error
end
defmodule Jobber.Job do
defstruct [:work, :id, :max_retries, retries: 0, status: "new"]
use GenServer
require Logger
def init(args) do
work = Keyword.fetch!(args, :work)
id = Keyword.get(args, :id, random_job_id())
max_retries = Keyword.get(args, :max_retries, 3)
state = %Jobber.Job{id: id, work: work, max_retries: max_retries}
{:ok, state, {:continue, :run}}
end
def handle_continue(:run, state) do
new_state =
state.work.()
|> handle_job_result(state)
if new_state.status == "errored" do
Process.send_after(self(), :retry, 5000)
{:noreply, new_state}
else
Logger.info("Job exiting #{state.id}")
{:stop, :normal, new_state}
end
end
def handle_info(:retry, state) do
{:noreply, state, {:continue, :run}}
end
defp random_job_id() do
:crypto.strong_rand_bytes(5)
|> Base.url_encode64(padding: false)
end
defp handle_job_result({:ok, _data}, state) do
Logger.info("Job completed #{state.id}")
%Jobber.Job{state | status: "done"}
end
defp handle_job_result(:error, %{status: "new"} = state) do
Logger.warn("Job errored #{state.id}")
%Jobber.Job{state | status: "errored"}
end
defp handle_job_result(:error, %{status: "errored"} = state) do
Logger.warn("Job retry failed #{state.id}")
new_state = %Jobber.Job{state | retries: state.retries + 1}
if new_state.retries == state.max_retries do
%Jobber.Job{new_state | status: "failed"}
else
new_state
end
end
end
defmodule Jobber do
alias Jobber.{JobRunner, Job}
def start_job(args) do
DynamicSupervisor.start_child(JobRunner, {Job, args})
end
end
Jobber.start_job(work: good_job)
This fails because we haven’t implemented a start_link/1
function in Job
yet.
defmodule Jobber.Job do
defstruct [:work, :id, :max_retries, retries: 0, status: "new"]
use GenServer
require Logger
def init(args) do
work = Keyword.fetch!(args, :work)
id = Keyword.get(args, :id, random_job_id())
max_retries = Keyword.get(args, :max_retries, 3)
state = %Jobber.Job{id: id, work: work, max_retries: max_retries}
{:ok, state, {:continue, :run}}
end
def handle_continue(:run, state) do
new_state =
state.work.()
|> handle_job_result(state)
if new_state.status == "errored" do
Process.send_after(self(), :retry, 5000)
{:noreply, new_state}
else
Logger.info("Job exiting #{state.id}")
{:stop, :normal, new_state}
end
end
def handle_info(:retry, state) do
{:noreply, state, {:continue, :run}}
end
def start_link(args) do
GenServer.start_link(__MODULE__, args)
end
defp random_job_id() do
:crypto.strong_rand_bytes(5)
|> Base.url_encode64(padding: false)
end
defp handle_job_result({:ok, _data}, state) do
Logger.info("Job completed #{state.id}")
%Jobber.Job{state | status: "done"}
end
defp handle_job_result(:error, %{status: "new"} = state) do
Logger.warn("Job errored #{state.id}")
%Jobber.Job{state | status: "errored"}
end
defp handle_job_result(:error, %{status: "errored"} = state) do
Logger.warn("Job retry failed #{state.id}")
new_state = %Jobber.Job{state | retries: state.retries + 1}
if new_state.retries == state.max_retries do
%Jobber.Job{new_state | status: "failed"}
else
new_state
end
end
end
###
# this continually requeues jobs because the supervisor restarts the job when complete
# because a completed job looks like a problem to a supervisor expecting a process to
# stay running
###
# Jobber.start_job(work: good_job)
defmodule Jobber.Job do
defstruct [:work, :id, :max_retries, retries: 0, status: "new"]
use GenServer, restart: :transient
require Logger
def init(args) do
work = Keyword.fetch!(args, :work)
id = Keyword.get(args, :id, random_job_id())
max_retries = Keyword.get(args, :max_retries, 3)
state = %Jobber.Job{id: id, work: work, max_retries: max_retries}
{:ok, state, {:continue, :run}}
end
def handle_continue(:run, state) do
new_state =
state.work.()
|> handle_job_result(state)
if new_state.status == "errored" do
Process.send_after(self(), :retry, 5000)
{:noreply, new_state}
else
Logger.info("Job exiting #{state.id}")
{:stop, :normal, new_state}
end
end
def handle_info(:retry, state) do
{:noreply, state, {:continue, :run}}
end
def start_link(args) do
GenServer.start_link(__MODULE__, args)
end
defp random_job_id() do
:crypto.strong_rand_bytes(5)
|> Base.url_encode64(padding: false)
end
defp handle_job_result({:ok, _data}, state) do
Logger.info("Job completed #{state.id}")
%Jobber.Job{state | status: "done"}
end
defp handle_job_result(:error, %{status: "new"} = state) do
Logger.warn("Job errored #{state.id}")
%Jobber.Job{state | status: "errored"}
end
defp handle_job_result(:error, %{status: "errored"} = state) do
Logger.warn("Job retry failed #{state.id}")
new_state = %Jobber.Job{state | retries: state.retries + 1}
if new_state.retries == state.max_retries do
%Jobber.Job{new_state | status: "failed"}
else
new_state
end
end
end
Jobber.start_job(work: good_job)
Jobber.start_job(work: bad_job)
doomed_job = fn ->
Process.sleep(5000)
raise "boom"
end
This will continually hit an error because the job is failing but not failing with enough frequency to cause the supervisor to consider it a problem
Jobber.start_job(work: doomed_job)
Supervisor.stop(Jobber.Supervisor)
job_runner_config = [
strategy: :one_for_one,
max_seconds: 30,
name: Jobber.JobRunner
]
children = [
{DynamicSupervisor, job_runner_config}
]
opts = [strategy: :one_for_one, name: Jobber.Supervisor]
Supervisor.start_link(children, opts)
# now should not error forever
Jobber.start_job(work: doomed_job)
Process.whereis(Jobber.JobRunner)
But we can see the supervisor itself restarted as a result (new PID).
That’s not great because any concurrent Job processes would also exit
We’ll fix this issue by adding a supervisor for every Job
process