Oban Training—Delivering a Daily Digest
Mix.install([:faker, :kino, :oban, :postgrex])
Logger.configure(level: :info)
Application.put_env(:chow_mojo, ChowMojo.Repo,
pool: Ecto.Adapters.SQL.Sandbox,
url: "postgres://localhost:5432/chow_mojo_dev"
)
defmodule ChowMojo.Repo do
use Ecto.Repo, otp_app: :chow_mojo, adapter: Ecto.Adapters.Postgres
end
defmodule ChowMojo.User do
use Ecto.Schema
schema "users" do
field(:email, :string)
field(:name, :string)
timestamps()
end
end
defmodule ChowMojo.Restaurant do
use Ecto.Schema
schema "restaurants" do
field(:name, :string)
field(:address, :string)
field(:rating, :integer)
timestamps()
belongs_to(:owner, ChowMojo.User)
end
end
defmodule ChowMojo.ObanCase do
use ExUnit.CaseTemplate
load = fn path ->
[__DIR__, "samples", path]
|> Path.join()
|> File.read!()
|> String.split("\n", trim: true)
end
@animals load.("animals.txt")
@adjectives load.("adjectives.txt")
@types load.("restaurants.txt")
using do
quote do
use Oban.Testing, repo: ChowMojo.Repo
import ChowMojo.ObanCase
end
end
def insert(:restaurant, :active) do
insert(name: name(), address: address())
end
def insert(:restaurant, :inactive) do
insert(name: name())
end
def insert(params) do
ChowMojo.Restaurant
|> struct!(params)
|> ChowMojo.Repo.insert!()
end
defp name do
[Enum.random(@adjectives), Enum.random(@animals), Enum.random(@types)]
|> Enum.map(&String.capitalize/1)
|> Enum.join(" ")
end
defp address do
Enum.join([Faker.Address.city(), Faker.Address.country()], ", ")
end
end
defmodule AddRestaurantOwners do
use Ecto.Migration
def change do
alter table(:restaurants) do
add(:owner_id, references(:users))
end
end
end
defmodule ChowMojo do
import Ecto.Query
def all_active_restaurants do
ChowMojo.Restaurant
|> where([r], not is_nil(r.address))
|> ChowMojo.Repo.all()
end
def deliver_daily_digest(restaurant) do
send(self(), {:delivered, restaurant.id})
:ok
end
end
ChowMojo.Repo.start_link()
Ecto.Migrator.run(ChowMojo.Repo, [{3, AddRestaurantOwners}], :up, all: true)
🏅 Goals
Thus far we’ve inserted jobs manually or as part of an externally triggered function. In this exercise we’ll look at inserting jobs automatically on a schedule, the role of plugins in Oban, and how to manipulate queues at runtime.
Daily Digest Emails
ChowMojo restaurant owners would like to know how many orders they fulfilled and how much they earned each day. That’s a perfect fit for background jobs like we’ve written before, so let’s whip up a new DailyDigest
worker.
The twist is that the new worker only receives empty args
and fetches all restaurants. Define a perform/1
function that ignores args (or the entire job), then use ChowMojo.all_active_restaurants/0
to fetch all restaurants, then call ChowMojo.deliver_daily_digest/1
on each restaurant to accomplish delivery.
use Oban.Worker, queue: :email
@impl Worker
def perform(_job) do
Enum.each(ChowMojo.all_active_restaurants(), &ChowMojo.deliver_daily_digest/1)
end
defmodule ChowMojo.DailyDigest do
# Your turn...
end
Before moving on to automating digest job creation, let’s test the worker using perform_job/3
as we’ve done before.
Call perform_job/2
with an empty map:
assert :ok = perform_job(ChowMojo.DailyDigest, %{})
ExUnit.start(auto_run: false)
defmodule ChowMojo.DailyDigestTest do
use ChowMojo.ObanCase
test "delivering digest emails to all active restaurants" do
%{id: id_1, name: name_1} = insert(:restaurant, :active)
%{id: id_2, name: name_2} = insert(:restaurant, :inactive)
%{id: id_3, name: name_3} = insert(:restaurant, :active)
# Your turn...
assert_received {:delivered, ^id_1}
refute_received {:delivered, ^id_2}
assert_received {:delivered, ^id_3}
# For your amusement...
IO.puts("Digest delivered to:\n\n* #{name_1}\n* #{name_2}\n* #{name_3}")
end
end
ExUnit.run()
With confidence that the worker delivers digests to the correct restaurants, we’re ready to automate delivery on a schedule.
The ideal way to schedule periodic jobs is with the Cron plugin. In Oban, plugins are processes (GenServers) that insert, update, or delete jobs autonomously based on some configuration. We’ll look at other plugins and their role in production systems later on.
For now, let’s define options for an Oban instance with a Cron plugin that schedules a DailyDigest
job every minute and starts an email
queue in the paused state. Note that we’re scheduling jobs frequently for testing purposes, in reality we’d want to deliver digest emails once a day.
At this point we’re only validating the config, not starting the instance.
Use a Hintconf_opts = [
repo: ChowMojo.Repo,
queues: [email: [limit: 1, paused: true]],
plugins: [{Oban.Plugins.Cron, crontab: [{"* * * * *", ChowMojo.DailyDigest}]}]
]
# Your turn...
conf_opts = [
repo: ChowMojo.Repo
]
Oban.Config.validate(conf_opts)
Oban’s config, including the config of every plugin is validated during startup using the same validate/1
function. As you may have noticed, if any configuration is invalid, including a cron schedule, it returns an explanatory error and prevents the instance from booting.
Now start the instance with Oban.start_link/1
.
Oban.start_link(conf_opts)
Cron is running now and it will insert one job every minute. However, because the queue is paused none of those jobs can run and they’re left in the available
state. We can check how many jobs have accumulated by querying the jobs table with Ecto.Query
functions.
Compose a query to count Oban.Job
rows grouped by state:
Use Oban.Job
as the base of a query that groups by state:
Oban.Job
|> where([j], j.queue == "email")
|> group_by(:state)
|> select([j], {j.state, count()})
|> ChowMojo.Repo.all()
import Ecto.Query
# Your turn...
Once a few jobs have accumulated we can resume the queue and allow it to start processing.
Use a HintOban.resume_queue(queue: :email)
# Your turn...
Try reevaluating the count query above after you’ve resumed the queue. You should see that all the jobs are completed
now.
☠️ Extra Challenges
Pause, Resume, Stop, Start
Play with the :email
queue by pausing it, resuming it, stopping it, and then starting it again. Remember, at any point you can see which queues are running with Oban.config/0
and check the queue itself with Oban.check_queue/1
.
Enqueue Sub-Jobs
Delivering all of the digest emails from a single queue is risky because a single error will cause the entire job to retry. A simple approach to compensating for errors is to enqueue sub-jobs for each digest delivery from the cron trigger. Try adding a new perform/1
clause that delivers a digest to a single restaurant and enqueue sub-jobs for every active restaurant.