Chapter 2 - Building a job processing system
Intro
GenServers are cheap! Instead of a single GenServer running all email delivery we’ll make a GenServer for each email delivery. Boom!
defmodule Jobber.Job do
defstruct [:work, :id, :max_retries, retries: 0, status: "new"]
use GenServer
require Logger
def init(args) do
work = Keyword.fetch!(args, :work)
id = Keyword.get(args, :id, random_job_id())
max_retries = Keyword.get(args, :max_retries, 3)
state = %Jobber.Job{id: id, work: work, max_retries: max_retries}
{:ok, state, {:continue, :run}}
end
def handle_continue(:run, state) do
new_state =
state.work.()
|> handle_job_result(state)
if new_state.status == "errored" do
Process.send_after(self(), :retry, 5000)
{:noreply, new_state}
else
Logger.info("Job exiting #{state.id}")
{:stop, :normal, new_state}
end
end
def handle_info(:retry, state) do
{:noreply, state, {:continue, :run}}
end
defp random_job_id() do
:crypto.strong_rand_bytes(5)
|> Base.url_encode64(padding: false)
end
defp handle_job_result({:ok, _data}, state) do
Logger.info("Job completed #{state.id}")
%Jobber.Job{state | status: "done"}
end
defp handle_job_result(:error, %{status: "new"} = state) do
Logger.warn("Job errored #{state.id}")
%Jobber.Job{state | status: "errored"}
end
defp handle_job_result(:error, %{status: "errored"} = state) do
Logger.warn("Job retry failed #{state.id}")
new_state = %Jobber.Job{state | retries: state.retries + 1}
if new_state.retries == state.max_retries do
%Jobber.Job{new_state | status: "failed"}
else
new_state
end
end
end
success_job = fn ->
Process.sleep(5000)
{:ok, []}
end
GenServer.start(Jobber.Job, work: success_job)
failed_job = fn ->
Process.sleep(5000)
:error
end
GenServer.start(Jobber.Job, work: failed_job)
Note: GenServer.start_link/2 will link the parent process with the GenServer process
which means a crash in the GenServer process will propagate to the parent process.
The best approach will be to run all the jobs under a supervisor.