Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Oban Training — Backfilling Reviews

06_backfilling_reviews.livemd

Oban Training — Backfilling Reviews

Mix.install([:faker, :kino, :oban, :postgrex])

Logger.configure(level: :info)

Application.put_env(:chow_mojo, ChowMojo.Repo,
  pool: Ecto.Adapters.SQL.Sandbox,
  url: "postgres://localhost:5432/chow_mojo_dev"
)

defmodule ChowMojo.Repo do
  use Ecto.Repo, otp_app: :chow_mojo, adapter: Ecto.Adapters.Postgres
end

defmodule ChowMojo.Review do
  use Ecto.Schema

  schema "reviews" do
    field(:comment, :string)
    field(:sentiment, Ecto.Enum, values: [:negative, :neutral, :positive])
    field(:rating, :integer)

    timestamps()
  end
end

defmodule ChowMojo.ObanCase do
  use ExUnit.CaseTemplate

  load = fn path ->
    [__DIR__, "samples", path]
    |> Path.join()
    |> File.read!()
    |> String.split("\n", trim: true)
  end

  @reviews load.("reviews.txt")

  using do
    quote do
      use Oban.Testing, repo: ChowMojo.Repo
      import ChowMojo.ObanCase
    end
  end

  setup do
    Ecto.Adapters.SQL.Sandbox.mode(ChowMojo.Repo, {:shared, self()})
    Ecto.Adapters.SQL.Sandbox.checkout(ChowMojo.Repo)
    start_supervised!({Oban, repo: ChowMojo.Repo, testing: :manual})

    :ok
  end

  def insert(:review, :unprocessed) do
    insert(comment: comment(), rating: 0, sentiment: nil)
  end

  def insert(:review, :analyzed) do
    insert(comment: comment(), rating: 4, sentiment: :positive)
  end

  def insert(params) do
    ChowMojo.Review
    |> struct!(params)
    |> ChowMojo.Repo.insert!()
  end

  defp comment, do: Enum.random(@reviews)
end

defmodule AddRatings do
  use Ecto.Migration

  def change do
    create table(:reviews) do
      add(:comment, :text)
      add(:sentiment, :text)
      add(:rating, :integer, null: false)

      timestamps()
    end
  end
end

defmodule ChowMojo do
  def get_review(id), do: ChowMojo.Repo.get(ChowMojo.Review, id)

  def analyze_sentiment(_review), do: :ok
end

ChowMojo.Repo.start_link()

# Start with a clean slate
ChowMojo.Repo.delete_all(Oban.Job)

Ecto.Migrator.run(ChowMojo.Repo, [{4, AddRatings}], :up, all: true)

🏅 Goals

Until now we’ve worked with one job at a time. In this exercise we’ll learn about the nuances of inserting multiple jobs at once, how to deprioritize less essential jobs, and how to compose jobs together using introspection at runtime.

Sentiment Analysis for Reviews

ChowMojo customers frequently leave reviews after ordering from a restaurant. We’d like to use sentiment analysis to gauge how people feel about a restaurant. New reviews will be analyzed as their submitted, but there is an extensive backlog of reviews for us to process.

Create a standard worker that receives a review id, fetches the review with ChowMojo.get_review, and then uses ChowMojo.analyze_sentiment to handle the rest.

Use a Hint
use Oban.Worker, queue: :analysis

@impl Worker
def perform(%{args: %{"id" => review_id}}) do
  review_id
  |> ChowMojo.get_review()
  |> ChowMojo.analyze_sentiment()
end
defmodule ChowMojo.SentimentAnalyzer do
  # Your turn...
end

As new reviews are submitted we’ll analyze them, but we need to retroactively process all of the old reviews. Create an enqueue_backfill/0 function that queries ChowMojo.Review for records with null sentiment columns. Then build a new ChowMojo.SentimentAnalyzer changeset from each of those records and use Oban.insert_all/1 to enqueue them all at once.

Use a Hint
def enqueue_backfill do
  ChowMojo.Review
  |> select([r], r.id)
  |> where([r], is_nil(r.sentiment))
  |> ChowMojo.Repo.all()
  |> Enum.map(&ChowMojo.SentimentAnalyzer.new(%{id: &1}))
  |> Oban.insert_all()
end
defmodule ChowMojo.Analysis do
  import Ecto.Query

  # Your turn...
end

Now we’ll write a test to verify that enqueue_backfill/0 only inserts jobs for “unprocessed” reviews. Use all_enqueued/1 to extract enqueued jobs and assert on the job’s args.

Use a Hint
ChowMojo.Analysis.enqueue_backfill()

review_ids =
  [worker: ChowMojo.SentimentAnalyzer]
  |> all_enqueued()
  |> Enum.map(& &1.args["id"])
  |> Enum.sort()

assert [id_1, id_2, id_3] == review_ids
ExUnit.start(auto_run: false)

defmodule ChowMojo.AnalyzerTest do
  use ChowMojo.ObanCase

  test "delivering digest emails to all active restaurants" do
    %{id: id_1} = insert(:review, :unprocessed)
    %{id: id_2} = insert(:review, :unprocessed)
    %{id: id_3} = insert(:review, :unprocessed)
    %{id: _id_} = insert(:review, :analyzed)

    # Your turn...
  end
end

ExUnit.run()

Jobs usually execute in order they’re scheduled with the id as a secondary sort. For example, these three jobs would execute in the order they’re shown:

id scheduled_at
2 2023-09-03 00:00:00
1 2023-09-03 00:00:01
3 2023-09-03 00:00:02
4 2023-09-03 00:00:02

We say “usually” because it’s possible to change a job’s priority so that it runs before or after other jobs, regardless of when they were scheduled. The default priority is 0, and jobs with a higher value have a lower priority (like nice for OS processes). Rewriting the table above so that only job 4 is priority 0:

priority id scheduled_at
0 4 2023-09-03 00:00:02
1 2 2023-09-03 00:00:00
1 1 2023-09-03 00:00:01
1 3 2023-09-03 00:00:02

This is relevant to our backfill because we want to ensure that new reviews are analyzed first, not stuck behind all the backfill jobs.

Update enqueue_backfill/0 to set a lower priority modify the test below to confirm jobs are enqueued with the chosen priority.

Use a Hint

Change the test:

assert_enqueued worker: ChowMojo.SentimentAnalyzer, priority: 3

Then modify enqueue_backfill:

|> Enum.map(&ChowMojo.SentimentAnalyzer.new(%{id: &1}, priority: 3))
ExUnit.start(auto_run: false)

defmodule ChowMojo.BackfillPriorityTest do
  use ChowMojo.ObanCase

  test "backfill jobs have a lower priority" do
    insert(:review, :unprocessed)

    ChowMojo.Analysis.enqueue_backfill()

    assert_enqueued(worker: ChowMojo.SentimentAnalyzer, priority: 3)
  end
end

ExUnit.run()

☠️ Extra Challenges

Scaling up backfills

Inserting all the jobs at once is fine for backfilling a few thousand reviews, even tens of thousands, but it will be a performance problem when we backfill millions of reviews. With that many reviews it’s much better to batch reviewing and use recursive jobs, e.g. jobs that enqueue themselves with new arguments.

Redefine the original worker with two perform/1 clauses: one to kick off the backfill and another to continue the backfill. The kickoff clause should find the earliest record, process it, and then enqueue the next record.

Backfill monitoring job

Create a completion job that monitors the backfill and snoozes until it determines that the batch is complete. Optionally, you may use a field in meta to group the backfill together and differentiate it from regular analysis jobs.

Home

Ready for Production