Oban Training — Backfilling Reviews
Mix.install([:faker, :kino, :oban, :postgrex])
Logger.configure(level: :info)
Application.put_env(:chow_mojo, ChowMojo.Repo,
pool: Ecto.Adapters.SQL.Sandbox,
url: "postgres://localhost:5432/chow_mojo_dev"
)
defmodule ChowMojo.Repo do
use Ecto.Repo, otp_app: :chow_mojo, adapter: Ecto.Adapters.Postgres
end
defmodule ChowMojo.Review do
use Ecto.Schema
schema "reviews" do
field(:comment, :string)
field(:sentiment, Ecto.Enum, values: [:negative, :neutral, :positive])
field(:rating, :integer)
timestamps()
end
end
defmodule ChowMojo.ObanCase do
use ExUnit.CaseTemplate
load = fn path ->
[__DIR__, "samples", path]
|> Path.join()
|> File.read!()
|> String.split("\n", trim: true)
end
@reviews load.("reviews.txt")
using do
quote do
use Oban.Testing, repo: ChowMojo.Repo
import ChowMojo.ObanCase
end
end
setup do
Ecto.Adapters.SQL.Sandbox.mode(ChowMojo.Repo, {:shared, self()})
Ecto.Adapters.SQL.Sandbox.checkout(ChowMojo.Repo)
start_supervised!({Oban, repo: ChowMojo.Repo, testing: :manual})
:ok
end
def insert(:review, :unprocessed) do
insert(comment: comment(), rating: 0, sentiment: nil)
end
def insert(:review, :analyzed) do
insert(comment: comment(), rating: 4, sentiment: :positive)
end
def insert(params) do
ChowMojo.Review
|> struct!(params)
|> ChowMojo.Repo.insert!()
end
defp comment, do: Enum.random(@reviews)
end
defmodule AddRatings do
use Ecto.Migration
def change do
create table(:reviews) do
add(:comment, :text)
add(:sentiment, :text)
add(:rating, :integer, null: false)
timestamps()
end
end
end
defmodule ChowMojo do
def get_review(id), do: ChowMojo.Repo.get(ChowMojo.Review, id)
def analyze_sentiment(_review), do: :ok
end
ChowMojo.Repo.start_link()
# Start with a clean slate
ChowMojo.Repo.delete_all(Oban.Job)
Ecto.Migrator.run(ChowMojo.Repo, [{4, AddRatings}], :up, all: true)
🏅 Goals
Until now we’ve worked with one job at a time. In this exercise we’ll learn about the nuances of inserting multiple jobs at once, how to deprioritize less essential jobs, and how to compose jobs together using introspection at runtime.
Sentiment Analysis for Reviews
ChowMojo customers frequently leave reviews after ordering from a restaurant. We’d like to use sentiment analysis to gauge how people feel about a restaurant. New reviews will be analyzed as their submitted, but there is an extensive backlog of reviews for us to process.
Create a standard worker that receives a review id, fetches the review with ChowMojo.get_review
, and then uses ChowMojo.analyze_sentiment
to handle the rest.
use Oban.Worker, queue: :analysis
@impl Worker
def perform(%{args: %{"id" => review_id}}) do
review_id
|> ChowMojo.get_review()
|> ChowMojo.analyze_sentiment()
end
defmodule ChowMojo.SentimentAnalyzer do
# Your turn...
end
As new reviews are submitted we’ll analyze them, but we need to retroactively process all of the old reviews. Create an enqueue_backfill/0
function that queries ChowMojo.Review
for records with null
sentiment columns. Then build a new ChowMojo.SentimentAnalyzer
changeset from each of those records and use Oban.insert_all/1
to enqueue them all at once.
def enqueue_backfill do
ChowMojo.Review
|> select([r], r.id)
|> where([r], is_nil(r.sentiment))
|> ChowMojo.Repo.all()
|> Enum.map(&ChowMojo.SentimentAnalyzer.new(%{id: &1}))
|> Oban.insert_all()
end
defmodule ChowMojo.Analysis do
import Ecto.Query
# Your turn...
end
Now we’ll write a test to verify that enqueue_backfill/0
only inserts jobs for “unprocessed” reviews. Use all_enqueued/1
to extract enqueued jobs and assert on the job’s args
.
ChowMojo.Analysis.enqueue_backfill()
review_ids =
[worker: ChowMojo.SentimentAnalyzer]
|> all_enqueued()
|> Enum.map(& &1.args["id"])
|> Enum.sort()
assert [id_1, id_2, id_3] == review_ids
ExUnit.start(auto_run: false)
defmodule ChowMojo.AnalyzerTest do
use ChowMojo.ObanCase
test "delivering digest emails to all active restaurants" do
%{id: id_1} = insert(:review, :unprocessed)
%{id: id_2} = insert(:review, :unprocessed)
%{id: id_3} = insert(:review, :unprocessed)
%{id: _id_} = insert(:review, :analyzed)
# Your turn...
end
end
ExUnit.run()
Jobs usually execute in order they’re scheduled with the id
as a secondary sort. For example, these three jobs would execute in the order they’re shown:
id | scheduled_at |
---|---|
2 | 2023-09-03 00:00:00 |
1 | 2023-09-03 00:00:01 |
3 | 2023-09-03 00:00:02 |
4 | 2023-09-03 00:00:02 |
We say “usually” because it’s possible to change a job’s priority so that it runs before or after other jobs, regardless of when they were scheduled. The default priority is 0, and jobs with a higher value have a lower priority (like nice
for OS processes). Rewriting the table above so that only job 4
is priority 0:
priority | id | scheduled_at |
---|---|---|
0 | 4 | 2023-09-03 00:00:02 |
1 | 2 | 2023-09-03 00:00:00 |
1 | 1 | 2023-09-03 00:00:01 |
1 | 3 | 2023-09-03 00:00:02 |
This is relevant to our backfill because we want to ensure that new reviews are analyzed first, not stuck behind all the backfill jobs.
Update enqueue_backfill/0
to set a lower priority modify the test below to confirm jobs are enqueued with the chosen priority.
Change the test:
assert_enqueued worker: ChowMojo.SentimentAnalyzer, priority: 3
Then modify enqueue_backfill
:
|> Enum.map(&ChowMojo.SentimentAnalyzer.new(%{id: &1}, priority: 3))
ExUnit.start(auto_run: false)
defmodule ChowMojo.BackfillPriorityTest do
use ChowMojo.ObanCase
test "backfill jobs have a lower priority" do
insert(:review, :unprocessed)
ChowMojo.Analysis.enqueue_backfill()
assert_enqueued(worker: ChowMojo.SentimentAnalyzer, priority: 3)
end
end
ExUnit.run()
☠️ Extra Challenges
Scaling up backfills
Inserting all the jobs at once is fine for backfilling a few thousand reviews, even tens of thousands, but it will be a performance problem when we backfill millions of reviews. With that many reviews it’s much better to batch reviewing and use recursive jobs, e.g. jobs that enqueue themselves with new arguments.
Redefine the original worker with two perform/1 clauses: one to kick off the backfill and another to continue the backfill. The kickoff clause should find the earliest record, process it, and then enqueue the next record.
Backfill monitoring job
Create a completion job that monitors the backfill and snoozes until it determines that the batch is complete. Optionally, you may use a field in meta
to group the backfill together and differentiate it from regular analysis jobs.