Oban Training—Placing an Order
Mix.install([:faker, :kino, :oban, :postgrex])
Logger.configure(level: :info)
Application.put_env(:chow_mojo, ChowMojo.Repo,
pool: Ecto.Adapters.SQL.Sandbox,
url: "postgres://localhost:5432/chow_mojo_dev"
)
defmodule ChowMojo.Repo do
use Ecto.Repo, otp_app: :chow_mojo, adapter: Ecto.Adapters.Postgres
end
defmodule ChowMojo.Order do
use Ecto.Schema
schema "orders" do
field(:notes, :string)
field(:total, :integer, default: 0)
field(:dispatched_at, :naive_datetime_usec)
field(:delivered_at, :naive_datetime_usec)
timestamps()
end
end
defmodule CreateRestaurants do
use Ecto.Migration
def change do
create table(:restaurants) do
add(:name, :text, null: false)
add(:address, :text)
add(:rating, :float)
timestamps()
end
create(index(:restaurants, [:name], unique: true))
create table(:orders) do
add(:notes, :text)
add(:total, :integer, default: 0, null: false)
add(:dispatched_at, :naive_datetime)
add(:delivered_at, :naive_datetime)
timestamps()
add(:restaurant_id, references(:restaurants))
add(:user_id, references(:users))
end
end
end
defmodule ChowMojo.ObanCase do
use ExUnit.CaseTemplate
using do
quote do
use Oban.Testing, repo: ChowMojo.Repo
import ChowMojo.ObanCase
end
end
setup do
Ecto.Adapters.SQL.Sandbox.mode(ChowMojo.Repo, {:shared, self()})
Ecto.Adapters.SQL.Sandbox.checkout(ChowMojo.Repo)
end
def insert(:order, :success) do
insert(notes: "Get it here quickly", total: 30_000)
end
def insert(:order, :failure) do
insert(notes: "This will never work", total: -1)
end
def insert(:order, :rate_limited) do
insert(notes: "Hope you aren't too busy", total: -2)
end
def insert(:order, :dispatched) do
insert(notes: "We keep ordering", total: 10_000, dispatched_at: NaiveDateTime.utc_now())
end
def insert(params) do
ChowMojo.Order
|> struct!(params)
|> ChowMojo.Repo.insert!()
end
end
defmodule ChowMojo do
def get_order(id), do: ChowMojo.Repo.get(ChowMojo.Order, id)
def notify_restaurant(%{total: -1}), do: {:error, 503, "Service Unavailable"}
def notify_restaurant(%{total: -2}), do: {:error, 429, "Too Many Requests"}
def notify_restaurant(_order), do: :ok
end
ChowMojo.Repo.start_link()
Ecto.Migrator.run(ChowMojo.Repo, [{2, CreateRestaurants}], :up, all: true)
🏅 Goals
In this exercise you’ll coordinate notifying a ChowMojo restaurant about a delivery order. You’ll learn how to make the job resilient to expected and unexpected errors through return values, custom backoff, and timeouts.
Restaurant Notification
The ChowMojo app allows customers to browse through a restaurant’s menu, select items, and place an order—standard food delivery stuff. Assuming we have a functional order flow, now it’s time to create a worker that handles notifying a restaurant about the order.
We’ll create a RestaurantNotifier
worker that receives an order id in args, fetches the order with ChowMojo.get_order/1
, and then issues a notification with ChowMojo.notify_restaurant/1
.
defmodule ChowMojo.RestaurantNotifier do
use Oban.Worker, queue: :notifications
@impl Worker
def perform(%Job{args: %{"id" => order_id}}) do
order_id
|> ChowMojo.get_order()
|> ChowMojo.notify_restaurant()
end
end
defmodule ChowMojo.RestaurantNotifier do
# Your turn...
end
Here we’re writing a test that uses another Oban.Testing
helper, perform_job/3
. The perform_job/3
function is an essential tool for unit testing workers. It validates args
, constructs a job, executes it in the test process, and helps make assertions about the return value.
In our first RestaurantNotifier
unit test we’ll verify that a successful notification returns :ok
. Note that the test uses ChowMojo.ObanCase
to import testing helpers, automate connection checkout, and insert test data.
Call perform_job/2
with the name of the module and the args, then assert that the job’s perform/1
function returns :ok
.
assert :ok = perform_job(ChowMojo.RestaurantNotifier, %{id: order.id})
ExUnit.start(auto_run: false)
defmodule ChowMojo.NotifierSuccessTest do
use ChowMojo.ObanCase
test "successfully notifying the restaurant about an order" do
order = insert(:order, :success)
# Your turn...
end
end
ExUnit.run()
Notifying a restaurant involves a handful of external requests, any of which could disconnect, timeout, or otherwise error. Let’s write another test that demonstrates we can handle a standard 503 Service Unavailable
HTTP error from an external server.
Modify the RestaurantNotifier
worker to return an {:error, reason}
when notification fails. The error tuple indicates that the job failed, records the failure on the job, and reschedules the job to try again later (if there are attempts remaining).
Within the test you’ll assert on a two element :error
tuple:
assert {:error, {503, _}} = perform_job(ChowMojo.RestaurantNotifier, %{id: order.id})
Change the worker to convert the three element error tuple to two elements:
def perform(%Job{args: %{"id" => order_id}}) do
order_id
|> ChowMojo.get_order()
|> ChowMojo.notify_restaurant()
|> case do
:ok -> :ok
{:error, status, body} -> {:error, {status, body}}
end
end
ExUnit.start(auto_run: false)
defmodule ChowMojo.NotifierFailureTest do
use ChowMojo.ObanCase
test "encountering an unexpected error during notification" do
order = insert(:order, :failure)
# Your turn...
end
end
ExUnit.run()
Above, we handled a 503
error, which is an exceptional event that we’d consider important enough to record and try again later. Sometimes less exceptional errors occur, such as overloaded systems or being rate limited. In that situation we can snooze for an arbitrary period of time and try again later.
Modify the RestaurantNotifier
to translate {:error, 429, "Too Many Requests"}
into a slight 10s snooze and assert the return in the test below.
Assert the snooze tuple:
assert {:snooze, 10} = perform_job(ChowMojo.RestaurantNotifier, %{id: order.id})
Add an extra clause to translate the rate limit error to snooze:
def perform(%Job{args: %{"id" => order_id}}) do
order_id
|> ChowMojo.get_order()
|> ChowMojo.notify_restaurant()
|> case do
:ok -> :ok
{:error, 429, _} -> {:snooze, 10}
{:error, status, body} -> {:error, {status, body}}
end
end
ExUnit.start(auto_run: false)
defmodule ChowMojo.NotifierRateLimitedTest do
use ChowMojo.ObanCase
test "snoozing when order notification is rate limited" do
order = insert(:order, :rate_limited)
# Your turn...
end
end
ExUnit.run()
The final situation we’ll handle is when the order is already dispatched to the customer when we haven’t notified the restaurant yet. Maybe it’s a duplicate order, or maybe we have a bug—either way, we don’t want to double notify the restaurant.
For this situation we’ll return a {:cancel, reason}
tuple to indicate that this job will never succeed but we don’t want to keep retrying it. Modify the worker and test to ensure that orders with a dispatched_at
timestamp are cancelled.
Assert on the :cancel
tuple:
assert {:cancel, "already dispatched"} =
perform_job(ChowMojo.RestaurantNotifier, %{id: order.id})
Modify the worker to return the :cancel
tuple when dispatched_at
isn’t nil
:
def perform(%Job{args: %{"id" => order_id}}) do
with {:ok, order} <- fetch_order(order_id) do
case ChowMojo.notify_restaurant(order) do
:ok -> :ok
{:error, 429, _} -> {:snooze, 10}
{:error, status, body} -> {:error, {status, body}}
end
end
end
defp fetch_order(order_id) do
case ChowMojo.get_order(order_id) do
%{dispatched_at: at} when is_struct(at) -> {:cancel, "already dispatched"}
order -> {:ok, order}
end
end
ExUnit.start(auto_run: false)
defmodule ChowMojo.NotifierDispatchedTest do
use ChowMojo.ObanCase
test "cancelling notification when the order is already dispatched" do
order = insert(:order, :dispatched)
# Your turn...
end
end
ExUnit.run()
☠️ Extra Challenges
Use a custom backoff
Oban automatically retries failed jobs up to a maximum number of attempts (max_attempts
). Each attempt is spaced out by a customizable backoff period. The default algorithm is exponential with jitter, which is excellent for making sure a job will complete eventually, but not ideal for an urgent operation like notifying a restaurant they have an order.
Implement a backoff/1
callback and write a test that exercises that backoff using various attempts.
Limit execution time with a timeout
Unless a timeout is configured jobs may execute indefinitely. That’s a problem when our system has tight deadlines or external systems are unresponsive, i.e. sometimes restaurant notifications hang forever.
Implement a timeout/1
callback to limit execution time and try writing a perform_job/3
test that exercises the timeout.