Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Building a simple job queue with Ecto in 46 lines

building_a_simple_job_queue_with_ecto_in_46_lines.livemd

Building a simple job queue with Ecto in 46 lines

Mix.install([{:ecto_qlc, "~> 0.1.0"}])

Setting up our Ecto schema, migration and repo

defmodule Job do
  use Ecto.Schema

  schema "jobs" do
    field(:completed_at, :utc_datetime_usec)
    field(:started_at, :utc_datetime_usec)
    field(:mfa, {:array, :any})
    timestamps()
  end
end

defmodule CreateJobs do
  use Ecto.Migration

  def change do
    create table(:jobs) do
      add(:completed_at, :utc_datetime_usec)
      add(:started_at, :utc_datetime_usec)
      add(:mfa, {:array, :any})
      timestamps()
    end
  end
end

defmodule Jobs do
  import Ecto.Query

  def execute_job() do
    %Job{mfa: [{m, f, a}], id: id} =
      Job
      |> order_by(asc: :inserted_at)
      |> first()
      |> where([job], is_nil(job.completed_at) and is_nil(job.started_at))
      |> Repo.one()

    now = DateTime.utc_now()
    Repo.update_all(where(Job, id: ^id), set: [started_at: now, updated_at: now])
    result = apply(m, f, a)
    now = DateTime.utc_now()
    Repo.update_all(where(Job, id: ^id), set: [completed_at: now, updated_at: now])
    result
  end
end

defmodule Repo do
  use Ecto.Repo, otp_app: :my_app, adapter: EctoQLC.Adapters.ETS
end

Let’s try it out!

Repo.start_link([])
Ecto.Migrator.up(Repo, 1, CreateJobs)
Repo.delete_all(Job)

Repo.insert!(%Job{mfa: [{IO, :inspect, ["Hello World!"]}]})
Repo.insert!(%Job{mfa: [{IO, :inspect, ["Bonjour Monde!"]}]})

"Hello World!" = Jobs.execute_job()
"Bonjour Monde!" = Jobs.execute_job()