Building a simple job queue with Ecto in 46 lines
Mix.install([{:ecto_qlc, "~> 0.1.0"}])
Setting up our Ecto schema, migration and repo
defmodule Job do
use Ecto.Schema
schema "jobs" do
field(:completed_at, :utc_datetime_usec)
field(:started_at, :utc_datetime_usec)
field(:mfa, {:array, :any})
timestamps()
end
end
defmodule CreateJobs do
use Ecto.Migration
def change do
create table(:jobs) do
add(:completed_at, :utc_datetime_usec)
add(:started_at, :utc_datetime_usec)
add(:mfa, {:array, :any})
timestamps()
end
end
end
defmodule Jobs do
import Ecto.Query
def execute_job() do
%Job{mfa: [{m, f, a}], id: id} =
Job
|> order_by(asc: :inserted_at)
|> first()
|> where([job], is_nil(job.completed_at) and is_nil(job.started_at))
|> Repo.one()
now = DateTime.utc_now()
Repo.update_all(where(Job, id: ^id), set: [started_at: now, updated_at: now])
result = apply(m, f, a)
now = DateTime.utc_now()
Repo.update_all(where(Job, id: ^id), set: [completed_at: now, updated_at: now])
result
end
end
defmodule Repo do
use Ecto.Repo, otp_app: :my_app, adapter: EctoQLC.Adapters.ETS
end
Let’s try it out!
Repo.start_link([])
Ecto.Migrator.up(Repo, 1, CreateJobs)
Repo.delete_all(Job)
Repo.insert!(%Job{mfa: [{IO, :inspect, ["Hello World!"]}]})
Repo.insert!(%Job{mfa: [{IO, :inspect, ["Bonjour Monde!"]}]})
"Hello World!" = Jobs.execute_job()
"Bonjour Monde!" = Jobs.execute_job()