Skip to content
Draft
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,13 @@ The following events will be emitted automatically by workers as jobs are reserv
- **delayed.job.run** - an event measuring the duration of a job's execution
- **delayed.job.error** - an event indicating that a job has errored and may be retried (no duration attached)
- **delayed.job.failure** - an event indicating that a job has permanently failed (no duration attached)
- **delayed.job.enqueue** - an event measuring the time it takes to enqueue a job
- **delayed.job.enqueue** - an event measuring the time it takes to enqueue one or more jobs (fires once per `Delayed::Job.enqueue` call and once per `perform_all_later` / `enqueue_all` batch)
- **delayed.worker.reserve_jobs** - an event measuring the duration of the job "pickup query"

The "run", "error", "failure" and "enqueue" events will include a `:job` argument in the event's payload,
providing access to the job instance.
The "run", "error", and "failure" events will include a `:job` argument in the event's payload,
providing access to the job instance. The "enqueue" event will include a `:jobs` array (along with a
`:count`) — note that elements may be either `Delayed::Job` instances (when enqueued directly via
`Delayed::Job.enqueue`) or `ActiveJob` instances (when enqueued via `perform_later` / `perform_all_later`).

```ruby
ActiveSupport::Notifications.subscribe('delayed.job.run') do |*args|
Expand Down
55 changes: 46 additions & 9 deletions lib/delayed/active_job_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,63 @@ def enqueue_after_transaction_commit?
end

def enqueue(job)
_enqueue(job)
enqueue_all([job])
job
end

def enqueue_at(job, timestamp)
_enqueue(job, run_at: Time.at(timestamp)) # rubocop:disable Rails/TimeZone
job.scheduled_at = Time.at(timestamp) # rubocop:disable Rails/TimeZone
enqueue_all([job])
job
end

def enqueue_all(jobs)
return 0 if jobs.empty?

assert_safe_to_enqueue!(jobs)

Delayed.lifecycle.run_callbacks(:enqueue, jobs) do
now = Delayed::Job.db_time_now
rows = jobs.map { |job| build_insert_row(job, now) }
result = Delayed::Job.insert_all(rows) # rubocop:disable Rails/SkipsModelValidations
assign_provider_job_ids(jobs, result) if Delayed::Job.connection.supports_insert_returning?
end
Comment on lines +25 to +31
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we should move this entire body into Delayed::Backend::Base to live alongside enqueue and enqueue_job. (There are potentially further simplifications we could in turn make over there as well.)


mark_successfully_enqueued(jobs)
jobs.size
end

private

def _enqueue(job, opts = {})
if enqueue_after_transaction_commit_enabled?(job)
def assert_safe_to_enqueue!(jobs)
if jobs.any? { |job| enqueue_after_transaction_commit_enabled?(job) }
raise UnsafeEnqueueError, "The ':delayed' ActiveJob adapter is not compatible with enqueue_after_transaction_commit"
end
unless Delayed::Worker.delay_jobs == true
raise UnsafeEnqueueError, "The ':delayed' ActiveJob adapter is not compatible with delay_jobs false"
end
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not confident on this restriction, but removing it makes the code quite a bit messier.

end

opts.merge!({ queue: job.queue_name, priority: job.priority }.compact)
.merge!(job.provider_attributes || {})
def assign_provider_job_ids(jobs, result)
ids = result.rows.map(&:first)
jobs.zip(ids) { |job, id| job.provider_job_id = id }
end

Delayed::Job.enqueue(JobWrapper.new(job), opts).tap do |dj|
job.provider_job_id = dj.id
end
def mark_successfully_enqueued(jobs)
jobs.each { |job| job.successfully_enqueued = true if job.respond_to?(:successfully_enqueued=) }
Copy link
Copy Markdown
Member

@smudge smudge May 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Were you able to determine if this API is a necessary part of the contract?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is sorta optional: rails/rails#41191. delayed by definition makes successfully_enqueued sorta trivial due to our transactionality assurances. If the job wasn't successfully enqueued, there would be a ActiveRecord::Rollback to indicate

end

def build_insert_row(job, now)
opts = { queue: job.queue_name, priority: job.priority }.compact
opts.merge!(job.provider_attributes || {})
opts[:run_at] = coerce_scheduled_at(job.scheduled_at) if job.scheduled_at

prepared = Delayed::Backend::JobPreparer.new(JobWrapper.new(job), opts).prepare
Delayed::Job.new(prepared).attributes.compact.merge('created_at' => now, 'updated_at' => now)
end

def coerce_scheduled_at(value)
value.is_a?(Numeric) ? Time.at(value) : value # rubocop:disable Rails/TimeZone
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does job.scheduled_at really need to be coerced? Seems reasonable to expect the implementation to construct a valid Time object.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure if we needed to support old versions of ActiveJob where it was a numeric: https://apidock.com/rails/ActiveJob/Core/scheduled_at%3D

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, looks like that landed in 7.1, so we'd need to restrict our activejob dependency in turn. The activejob dependency is technically optional, but based on our appraisals we still support >= 6.0... 🤔

I think we can consider changing support in a separate PR and keep the coercion in place here, if that makes sense to you.

end

def enqueue_after_transaction_commit_enabled?(job)
Expand Down
2 changes: 1 addition & 1 deletion lib/delayed/backend/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def enqueue(*args)

def enqueue_job(options)
new(options).tap do |job|
Delayed.lifecycle.run_callbacks(:enqueue, job) do
Delayed.lifecycle.run_callbacks(:enqueue, [job]) do
job.hook(:enqueue)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This job.hook(:enqueue) is potentially going to call enqueue on every job in the list -- another thing to be mindful of if we're wanting to make enqueuing of many jobs fast by default.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically I think we also deprecated hook methods like this, so maybe it's time to finally remove support.

Delayed::Worker.delay_job?(job) ? job.save : job.invoke_job
end
Expand Down
2 changes: 1 addition & 1 deletion lib/delayed/lifecycle.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ class InvalidCallback < RuntimeError; end
class Lifecycle
EVENTS = {
execute: [nil],
enqueue: [:job],
enqueue: [:jobs],
perform: %i(worker job),
error: %i(worker job),
failure: %i(worker job),
Expand Down
16 changes: 13 additions & 3 deletions lib/delayed/plugins/instrumentation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ module Delayed
module Plugins
class Instrumentation < Plugin
callbacks do |lifecycle|
lifecycle.around(:enqueue) do |job, *args, &block|
Comment thread
CelticMajora marked this conversation as resolved.
ActiveSupport::Notifications.instrument('delayed.job.enqueue', active_support_notifications_tags(job)) do
block.call(job, *args)
lifecycle.around(:enqueue) do |jobs, &block|
ActiveSupport::Notifications.instrument('delayed.job.enqueue', bulk_enqueue_tags(jobs)) do
block.call(jobs)
end
end

Expand Down Expand Up @@ -34,6 +34,16 @@ def self.active_support_notifications_tags(job)
job: job,
}
end

def self.bulk_enqueue_tags(jobs)
Comment thread
CelticMajora marked this conversation as resolved.
{
count: jobs.size,
table: Delayed::Job.table_name,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most accurate value here would be something like jobs.map { |j| j.class.table_name }.uniq.sole, but jobs.first.class.table_name might be a slightly cheaper proxy (assuming we enforce at the bulk enqueue API that all jobs go to the same table).

The reason I wouldn't assume it's always Delayed::Job.table_name is because it's possible (though not common) to create specialized job types for use with secondary DB connections or table names.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, it might make sense to emit counts by class and let the caller decide what to do with those class names, e.g. jobs.group_by { |j| j.class }.transform_values(&:count)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@effron wdyt?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might also make sense to return IDs in this payload.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think having a metric per class is valuable, and we can use that as a proxy to know that all jobs with the same class have the same table etc?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or, job_name, instead of class

database: Delayed::Job.database_name,
database_adapter: Delayed::Job.database_adapter_name,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment here w.r.t. relying on jobs to derive DB name and adatper.

jobs: jobs,
}
end
end
end
end
249 changes: 246 additions & 3 deletions spec/delayed/active_job_adapter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ def serialize(*)
expect { JobClass.perform_later }.to raise_error(RuntimeError, "uh oh, serialize failed!")
end

it 'bubbles out an error if Delayed::Job.enqueue fails' do
allow(Delayed::Job).to receive(:enqueue).and_raise("uh oh, enqueue failed!")
it 'bubbles out an error if the underlying insert fails' do
allow(Delayed::Job).to receive(:insert_all).and_raise('uh oh, insert failed!')

expect { JobClass.perform_later }.to raise_error(RuntimeError, "uh oh, enqueue failed!")
expect { JobClass.perform_later }.to raise_error(RuntimeError, 'uh oh, insert failed!')
end

it 'deserializes even if the underlying job class is not defined' do
Expand Down Expand Up @@ -397,4 +397,247 @@ def perform(arg, kwarg:)
end
end
end

describe '.enqueue_all' do # rubocop:disable Metrics/BlockLength
let(:adapter) { ActiveJob::Base.queue_adapter }

it 'returns 0 when given no jobs' do
expect(adapter.enqueue_all([])).to eq(0)
end

context 'when Delayed::Worker.delay_jobs is false' do
around do |example|
was = Delayed::Worker.delay_jobs
Delayed::Worker.delay_jobs = false
example.run
ensure
Delayed::Worker.delay_jobs = was
end

it 'raises UnsafeEnqueueError and inserts nothing' do
expect { adapter.enqueue_all([JobClass.new]) }
.to raise_error(Delayed::ActiveJobAdapter::UnsafeEnqueueError)
expect(Delayed::Job.count).to eq(0)
end

it 'also raises for single-job perform_later' do
expect { JobClass.perform_later }
.to raise_error(Delayed::ActiveJobAdapter::UnsafeEnqueueError)
expect(Delayed::Job.count).to eq(0)
end
end

context 'when Delayed::Worker.delay_jobs is :always' do
around do |example|
was = Delayed::Worker.delay_jobs
Delayed::Worker.delay_jobs = :always
example.run
ensure
Delayed::Worker.delay_jobs = was
end

it 'raises UnsafeEnqueueError' do
expect { adapter.enqueue_all([JobClass.new]) }
.to raise_error(Delayed::ActiveJobAdapter::UnsafeEnqueueError)
end
end

context 'when the database adapter does not support INSERT RETURNING (e.g. MySQL)' do
before do
allow(Delayed::Job.connection).to receive(:supports_insert_returning?).and_return(false)
end

it 'enqueues successfully but leaves provider_job_id nil' do
jobs = Array.new(2) { JobClass.new }

expect(adapter.enqueue_all(jobs)).to eq(2)
expect(Delayed::Job.count).to eq(2)
expect(jobs.map(&:provider_job_id)).to all(be_nil)
end
end

it 'inserts multiple jobs in a single INSERT' do
jobs = Array.new(3) { JobClass.new }

expect { adapter.enqueue_all(jobs) }
.to emit_notification('sql.active_record').with_payload(hash_including(sql: a_string_matching(/\AINSERT INTO/i)))
expect(Delayed::Job.count).to eq(3)
end

it 'returns the count of successfully enqueued jobs' do
jobs = Array.new(3) { JobClass.new }
expect(adapter.enqueue_all(jobs)).to eq(3)
end

it 'sets provider_job_id on each input job when the adapter supports INSERT RETURNING' do
skip 'requires INSERT ... RETURNING support' unless Delayed::Job.connection.supports_insert_returning?

jobs = Array.new(3) { JobClass.new }
adapter.enqueue_all(jobs)
expect(jobs.map(&:provider_job_id)).to match_array(Delayed::Job.pluck(:id))
end

if ActiveJob.gem_version.release >= Gem::Version.new('7.1')
it 'sets successfully_enqueued on each input job' do
jobs = Array.new(2) { JobClass.new }
adapter.enqueue_all(jobs)
expect(jobs).to all(be_successfully_enqueued)
end
end

it 'honors per-job scheduled_at' do
skip 'requires INSERT ... RETURNING support' unless Delayed::Job.connection.supports_insert_returning?

job = JobClass.new
job.scheduled_at = arbitrary_time
adapter.enqueue_all([JobClass.new, job])
expect(Delayed::Job.find(job.provider_job_id).run_at).to eq(arbitrary_time)
end

it 'applies db_time_now to run_at when no scheduled_at is set' do
Timecop.freeze(arbitrary_time) do
adapter.enqueue_all([JobClass.new])
expect(Delayed::Job.last.run_at).to eq(arbitrary_time)
end
end

it 'honors per-job queue and priority overrides' do
a = JobClass.new.tap do |j|
j.queue_name = 'q-a'
j.priority = 3
end
b = JobClass.new.tap do |j|
j.queue_name = 'q-b'
j.priority = 7
end

adapter.enqueue_all([a, b])

rows = Delayed::Job.order(:id).to_a
expect(rows[0]).to have_attributes(queue: 'q-a', priority: 3)
expect(rows[1]).to have_attributes(queue: 'q-b', priority: 7)
end

it 'supports a mix of job classes in one call' do
other_class = Class.new(ActiveJob::Base) do # rubocop:disable Rails/ApplicationJob
def perform; end
end
stub_const('OtherJobClass', other_class)

adapter.enqueue_all([JobClass.new, OtherJobClass.new])

names = Delayed::Job.order(:id).pluck(:name)
expect(names).to eq(%w(JobClass OtherJobClass))
end

it 'sets the name column from display_name' do
adapter.enqueue_all([JobClass.new])
expect(Delayed::Job.last.name).to eq('JobClass')
end

it "fires Delayed's :enqueue lifecycle callback once with the jobs array" do
observed = []
lifecycle_was = Delayed.lifecycle
Delayed.instance_variable_set(:@lifecycle, Delayed::Lifecycle.new)
Delayed.lifecycle.before(:enqueue) { |jobs| observed << jobs }

input = Array.new(3) { JobClass.new }
adapter.enqueue_all(input)

expect(observed.size).to eq(1)
expect(observed.first).to eq(input)
ensure
Delayed.instance_variable_set(:@lifecycle, lifecycle_was)
end

it 'populates provider_job_id before after(:enqueue) callbacks fire' do
skip 'requires INSERT ... RETURNING support' unless Delayed::Job.connection.supports_insert_returning?

ids_seen = nil
lifecycle_was = Delayed.lifecycle
Delayed.instance_variable_set(:@lifecycle, Delayed::Lifecycle.new)
Delayed.lifecycle.after(:enqueue) { |jobs| ids_seen = jobs.map(&:provider_job_id) }

adapter.enqueue_all([JobClass.new, JobClass.new])

expect(ids_seen).to all(be_a(Integer))
ensure
Delayed.instance_variable_set(:@lifecycle, lifecycle_was)
end

it 'does not fire ActiveJob before/around/after_enqueue callbacks' do
fires = []
JobClass.before_enqueue { fires << :before }
JobClass.around_enqueue do |_j, block|
fires << :around_before
block.call
fires << :around_after
end
JobClass.after_enqueue { fires << :after }

adapter.enqueue_all([JobClass.new, JobClass.new])

expect(fires).to be_empty
end

if ActiveJob.gem_version.release >= Gem::Version.new('7.2')
context 'when a job sets enqueue_after_transaction_commit to :always' do
before do
JobClass.include ActiveJob::EnqueueAfterTransactionCommit
JobClass.enqueue_after_transaction_commit = :always
end

it 'raises UnsafeEnqueueError and inserts nothing' do
ActiveJob.deprecator.silence do
expect { adapter.enqueue_all([JobClass.new]) }.to raise_error(Delayed::ActiveJobAdapter::UnsafeEnqueueError)
end
expect(Delayed::Job.count).to eq(0)
end
end
end

context 'when a job has a stale run_at and deny_stale_enqueues is enabled' do
around do |example|
was = Delayed::Worker.deny_stale_enqueues
Delayed::Worker.deny_stale_enqueues = true
example.run
ensure
Delayed::Worker.deny_stale_enqueues = was
end

it 'raises StaleEnqueueError and inserts nothing' do
job = JobClass.new
job.scheduled_at = Time.now.utc - 1.day
expect { adapter.enqueue_all([JobClass.new, job]) }.to raise_error(Delayed::StaleEnqueueError)
expect(Delayed::Job.count).to eq(0)
end
end
end

describe 'single-job perform_later routes through insert_all' do
it 'invokes insert_all (not Delayed::Job.enqueue)' do
expect(Delayed::Job).to receive(:insert_all).and_call_original # rubocop:disable RSpec/MessageSpies
expect(Delayed::Job).not_to receive(:enqueue) # rubocop:disable RSpec/MessageSpies

JobClass.perform_later
end

it 'persists the job exactly once' do
expect { JobClass.perform_later }.to change { Delayed::Job.count }.by(1)
end
end

if ActiveJob.gem_version.release >= Gem::Version.new('7.1')
describe 'ActiveJob.perform_all_later' do
it 'bulk-enqueues all jobs with a single INSERT' do
expect { ActiveJob.perform_all_later([JobClass.new, JobClass.new, JobClass.new]) }
.to emit_notification('sql.active_record').with_payload(hash_including(sql: a_string_matching(/\AINSERT INTO/i)))
expect(Delayed::Job.count).to eq(3)
end

it 'returns nil' do
expect(ActiveJob.perform_all_later([JobClass.new])).to be_nil
end
end
end
end
Loading
Loading