Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Oban Training—Delivering a Daily Digest

05_delivering_a_daily_digest.livemd

Oban Training—Delivering a Daily Digest

Mix.install([:faker, :kino, :oban, :postgrex])

Logger.configure(level: :info)

Application.put_env(:chow_mojo, ChowMojo.Repo,
  pool: Ecto.Adapters.SQL.Sandbox,
  url: "postgres://localhost:5432/chow_mojo_dev"
)

defmodule ChowMojo.Repo do
  use Ecto.Repo, otp_app: :chow_mojo, adapter: Ecto.Adapters.Postgres
end

defmodule ChowMojo.User do
  use Ecto.Schema

  schema "users" do
    field(:email, :string)
    field(:name, :string)

    timestamps()
  end
end

defmodule ChowMojo.Restaurant do
  use Ecto.Schema

  schema "restaurants" do
    field(:name, :string)
    field(:address, :string)
    field(:rating, :integer)

    timestamps()

    belongs_to(:owner, ChowMojo.User)
  end
end

defmodule ChowMojo.ObanCase do
  use ExUnit.CaseTemplate

  load = fn path ->
    [__DIR__, "samples", path]
    |> Path.join()
    |> File.read!()
    |> String.split("\n", trim: true)
  end

  @animals load.("animals.txt")
  @adjectives load.("adjectives.txt")
  @types load.("restaurants.txt")

  using do
    quote do
      use Oban.Testing, repo: ChowMojo.Repo
      import ChowMojo.ObanCase
    end
  end

  def insert(:restaurant, :active) do
    insert(name: name(), address: address())
  end

  def insert(:restaurant, :inactive) do
    insert(name: name())
  end

  def insert(params) do
    ChowMojo.Restaurant
    |> struct!(params)
    |> ChowMojo.Repo.insert!()
  end

  defp name do
    [Enum.random(@adjectives), Enum.random(@animals), Enum.random(@types)]
    |> Enum.map(&String.capitalize/1)
    |> Enum.join(" ")
  end

  defp address do
    Enum.join([Faker.Address.city(), Faker.Address.country()], ", ")
  end
end

defmodule AddRestaurantOwners do
  use Ecto.Migration

  def change do
    alter table(:restaurants) do
      add(:owner_id, references(:users))
    end
  end
end

defmodule ChowMojo do
  import Ecto.Query

  def all_active_restaurants do
    ChowMojo.Restaurant
    |> where([r], not is_nil(r.address))
    |> ChowMojo.Repo.all()
  end

  def deliver_daily_digest(restaurant) do
    send(self(), {:delivered, restaurant.id})

    :ok
  end
end

ChowMojo.Repo.start_link()

Ecto.Migrator.run(ChowMojo.Repo, [{3, AddRestaurantOwners}], :up, all: true)

🏅 Goals

Thus far we’ve inserted jobs manually or as part of an externally triggered function. In this exercise we’ll look at inserting jobs automatically on a schedule, the role of plugins in Oban, and how to manipulate queues at runtime.

Daily Digest Emails

ChowMojo restaurant owners would like to know how many orders they fulfilled and how much they earned each day. That’s a perfect fit for background jobs like we’ve written before, so let’s whip up a new DailyDigest worker.

The twist is that the new worker only receives empty args and fetches all restaurants. Define a perform/1 function that ignores args (or the entire job), then use ChowMojo.all_active_restaurants/0 to fetch all restaurants, then call ChowMojo.deliver_daily_digest/1 on each restaurant to accomplish delivery.

Use a Hint
use Oban.Worker, queue: :email

@impl Worker
def perform(_job) do
  Enum.each(ChowMojo.all_active_restaurants(), &ChowMojo.deliver_daily_digest/1)
end
defmodule ChowMojo.DailyDigest do
  # Your turn...
end

Before moving on to automating digest job creation, let’s test the worker using perform_job/3 as we’ve done before.

Use a Hint

Call perform_job/2 with an empty map:

assert :ok = perform_job(ChowMojo.DailyDigest, %{})
ExUnit.start(auto_run: false)

defmodule ChowMojo.DailyDigestTest do
  use ChowMojo.ObanCase

  test "delivering digest emails to all active restaurants" do
    %{id: id_1, name: name_1} = insert(:restaurant, :active)
    %{id: id_2, name: name_2} = insert(:restaurant, :inactive)
    %{id: id_3, name: name_3} = insert(:restaurant, :active)

    # Your turn...

    assert_received {:delivered, ^id_1}
    refute_received {:delivered, ^id_2}
    assert_received {:delivered, ^id_3}

    # For your amusement...
    IO.puts("Digest delivered to:\n\n* #{name_1}\n* #{name_2}\n* #{name_3}")
  end
end

ExUnit.run()

With confidence that the worker delivers digests to the correct restaurants, we’re ready to automate delivery on a schedule.

The ideal way to schedule periodic jobs is with the Cron plugin. In Oban, plugins are processes (GenServers) that insert, update, or delete jobs autonomously based on some configuration. We’ll look at other plugins and their role in production systems later on.

For now, let’s define options for an Oban instance with a Cron plugin that schedules a DailyDigest job every minute and starts an email queue in the paused state. Note that we’re scheduling jobs frequently for testing purposes, in reality we’d want to deliver digest emails once a day.

At this point we’re only validating the config, not starting the instance.

Use a Hint
conf_opts = [
  repo: ChowMojo.Repo,
  queues: [email: [limit: 1, paused: true]],
  plugins: [{Oban.Plugins.Cron, crontab: [{"* * * * *", ChowMojo.DailyDigest}]}]
]
# Your turn...

conf_opts = [
  repo: ChowMojo.Repo
]

Oban.Config.validate(conf_opts)

Oban’s config, including the config of every plugin is validated during startup using the same validate/1 function. As you may have noticed, if any configuration is invalid, including a cron schedule, it returns an explanatory error and prevents the instance from booting.

Now start the instance with Oban.start_link/1.

Oban.start_link(conf_opts)

Cron is running now and it will insert one job every minute. However, because the queue is paused none of those jobs can run and they’re left in the available state. We can check how many jobs have accumulated by querying the jobs table with Ecto.Query functions.

Compose a query to count Oban.Job rows grouped by state:

Use a Hint

Use Oban.Job as the base of a query that groups by state:

Oban.Job
|> where([j], j.queue == "email")
|> group_by(:state)
|> select([j], {j.state, count()})
|> ChowMojo.Repo.all()
import Ecto.Query

# Your turn...

Once a few jobs have accumulated we can resume the queue and allow it to start processing.

Use a Hint
Oban.resume_queue(queue: :email)
# Your turn...

Try reevaluating the count query above after you’ve resumed the queue. You should see that all the jobs are completed now.

☠️ Extra Challenges

Pause, Resume, Stop, Start

Play with the :email queue by pausing it, resuming it, stopping it, and then starting it again. Remember, at any point you can see which queues are running with Oban.config/0 and check the queue itself with Oban.check_queue/1.

Enqueue Sub-Jobs

Delivering all of the digest emails from a single queue is risky because a single error will cause the entire job to retry. A simple approach to compensating for errors is to enqueue sub-jobs for each digest delivery from the cron trigger. Try adding a new perform/1 clause that delivers a digest to a single restaurant and enqueue sub-jobs for every active restaurant.

Home

Backfilling Reviews