Skip to content
Draft
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,13 @@ The following events will be emitted automatically by workers as jobs are reserv
- **delayed.job.run** - an event measuring the duration of a job's execution
- **delayed.job.error** - an event indicating that a job has errored and may be retried (no duration attached)
- **delayed.job.failure** - an event indicating that a job has permanently failed (no duration attached)
- **delayed.job.enqueue** - an event measuring the time it takes to enqueue a job
- **delayed.job.enqueue** - an event measuring the time it takes to enqueue one or more jobs (fires once per `Delayed::Job.enqueue` call and once per `perform_all_later` / `enqueue_all` batch)
- **delayed.worker.reserve_jobs** - an event measuring the duration of the job "pickup query"

The "run", "error", "failure" and "enqueue" events will include a `:job` argument in the event's payload,
providing access to the job instance.
The "run", "error", and "failure" events will include a `:job` argument in the event's payload,
providing access to the job instance. The "enqueue" event will include a `:jobs` array (along with a
`:count`) — note that elements may be either `Delayed::Job` instances (when enqueued directly via
`Delayed::Job.enqueue`) or `ActiveJob` instances (when enqueued via `perform_later` / `perform_all_later`).

```ruby
ActiveSupport::Notifications.subscribe('delayed.job.run') do |*args|
Expand Down
55 changes: 46 additions & 9 deletions lib/delayed/active_job_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,63 @@ def enqueue_after_transaction_commit?
end

def enqueue(job)
_enqueue(job)
enqueue_all([job])
job
end

def enqueue_at(job, timestamp)
_enqueue(job, run_at: Time.at(timestamp)) # rubocop:disable Rails/TimeZone
job.scheduled_at = Time.at(timestamp) # rubocop:disable Rails/TimeZone
enqueue_all([job])
job
end

def enqueue_all(jobs)
return 0 if jobs.empty?

assert_safe_to_enqueue!(jobs)

Delayed.lifecycle.run_callbacks(:enqueue, jobs) do
now = Delayed::Job.db_time_now
rows = jobs.map { |job| build_insert_row(job, now) }
result = Delayed::Job.insert_all(rows) # rubocop:disable Rails/SkipsModelValidations
assign_provider_job_ids(jobs, result) if Delayed::Job.connection.supports_insert_returning?
end
Comment on lines +25 to +31
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we should move this entire body into Delayed::Backend::Base to live alongside enqueue and enqueue_job. (There are potentially further simplifications we could in turn make over there as well.)


mark_successfully_enqueued(jobs)
jobs.size
end

private

def _enqueue(job, opts = {})
if enqueue_after_transaction_commit_enabled?(job)
def assert_safe_to_enqueue!(jobs)
if jobs.any? { |job| enqueue_after_transaction_commit_enabled?(job) }
raise UnsafeEnqueueError, "The ':delayed' ActiveJob adapter is not compatible with enqueue_after_transaction_commit"
end
unless Delayed::Worker.delay_jobs == true
raise UnsafeEnqueueError, "The ':delayed' ActiveJob adapter is not compatible with delay_jobs false"
end
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not confident on this restriction, but removing it makes the code quite a bit messier.

end

opts.merge!({ queue: job.queue_name, priority: job.priority }.compact)
.merge!(job.provider_attributes || {})
def assign_provider_job_ids(jobs, result)
ids = result.rows.map(&:first)
jobs.zip(ids) { |job, id| job.provider_job_id = id }
end

Delayed::Job.enqueue(JobWrapper.new(job), opts).tap do |dj|
job.provider_job_id = dj.id
end
def mark_successfully_enqueued(jobs)
jobs.each { |job| job.successfully_enqueued = true if job.respond_to?(:successfully_enqueued=) }
Copy link
Copy Markdown
Member

@smudge smudge May 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Were you able to determine if this API is a necessary part of the contract?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is sorta optional: rails/rails#41191. delayed by definition makes successfully_enqueued sorta trivial due to our transactionality assurances. If the job wasn't successfully enqueued, there would be a ActiveRecord::Rollback to indicate

end

def build_insert_row(job, now)
opts = { queue: job.queue_name, priority: job.priority }.compact
opts.merge!(job.provider_attributes || {})
opts[:run_at] = coerce_scheduled_at(job.scheduled_at) if job.scheduled_at

prepared = Delayed::Backend::JobPreparer.new(JobWrapper.new(job), opts).prepare
Delayed::Job.new(prepared).attributes.compact.merge('created_at' => now, 'updated_at' => now)
end

def coerce_scheduled_at(value)
value.is_a?(Numeric) ? Time.at(value) : value # rubocop:disable Rails/TimeZone
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does job.scheduled_at really need to be coerced? Seems reasonable to expect the implementation to construct a valid Time object.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure if we needed to support old versions of ActiveJob where it was a numeric: https://apidock.com/rails/ActiveJob/Core/scheduled_at%3D

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, looks like that landed in 7.1, so we'd need to restrict our activejob dependency in turn. The activejob dependency is technically optional, but based on our appraisals we still support >= 6.0... 🤔

I think we can consider changing support in a separate PR and keep the coercion in place here, if that makes sense to you.

end

def enqueue_after_transaction_commit_enabled?(job)
Expand Down
2 changes: 1 addition & 1 deletion lib/delayed/backend/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def enqueue(*args)

def enqueue_job(options)
new(options).tap do |job|
Delayed.lifecycle.run_callbacks(:enqueue, job) do
Delayed.lifecycle.run_callbacks(:enqueue, [job]) do
job.hook(:enqueue)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This job.hook(:enqueue) is potentially going to call enqueue on every job in the list -- another thing to be mindful of if we're wanting to make enqueuing of many jobs fast by default.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically I think we also deprecated hook methods like this, so maybe it's time to finally remove support.

Delayed::Worker.delay_job?(job) ? job.save : job.invoke_job
end
Expand Down
2 changes: 1 addition & 1 deletion lib/delayed/lifecycle.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ class InvalidCallback < RuntimeError; end
class Lifecycle
EVENTS = {
execute: [nil],
enqueue: [:job],
enqueue: [:jobs],
perform: %i(worker job),
error: %i(worker job),
failure: %i(worker job),
Expand Down
16 changes: 13 additions & 3 deletions lib/delayed/plugins/instrumentation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ module Delayed
module Plugins
class Instrumentation < Plugin
callbacks do |lifecycle|
lifecycle.around(:enqueue) do |job, *args, &block|
Comment thread
CelticMajora marked this conversation as resolved.
ActiveSupport::Notifications.instrument('delayed.job.enqueue', active_support_notifications_tags(job)) do
block.call(job, *args)
lifecycle.around(:enqueue) do |jobs, &block|
ActiveSupport::Notifications.instrument('delayed.job.enqueue', bulk_enqueue_tags(jobs)) do
block.call(jobs)
end
end

Expand Down Expand Up @@ -34,6 +34,16 @@ def self.active_support_notifications_tags(job)
job: job,
}
end

def self.bulk_enqueue_tags(jobs)
Comment thread
CelticMajora marked this conversation as resolved.
{
count: jobs.size,
table: Delayed::Job.table_name,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most accurate value here would be something like jobs.map { |j| j.class.table_name }.uniq.sole, but jobs.first.class.table_name might be a slightly cheaper proxy (assuming we enforce at the bulk enqueue API that all jobs go to the same table).

The reason I wouldn't assume it's always Delayed::Job.table_name is because it's possible (though not common) to create specialized job types for use with secondary DB connections or table names.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, it might make sense to emit counts by class and let the caller decide what to do with those class names, e.g. jobs.group_by { |j| j.class }.transform_values(&:count)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@effron wdyt?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might also make sense to return IDs in this payload.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think having a metric per class is valuable, and we can use that as a proxy to know that all jobs with the same class have the same table etc?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or, job_name, instead of class

database: Delayed::Job.database_name,
database_adapter: Delayed::Job.database_adapter_name,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment here w.r.t. relying on jobs to derive DB name and adatper.

jobs: jobs,
}
end
end
end
end
Loading
Loading