diff --git a/README.md b/README.md index 08952473..145b29c3 100644 --- a/README.md +++ b/README.md @@ -284,11 +284,13 @@ The following events will be emitted automatically by workers as jobs are reserv - **delayed.job.run** - an event measuring the duration of a job's execution - **delayed.job.error** - an event indicating that a job has errored and may be retried (no duration attached) - **delayed.job.failure** - an event indicating that a job has permanently failed (no duration attached) -- **delayed.job.enqueue** - an event measuring the time it takes to enqueue a job +- **delayed.job.enqueue** - an event measuring the time it takes to enqueue one or more jobs (fires once per `Delayed::Job.enqueue` call and once per `perform_all_later` / `enqueue_all` batch) - **delayed.worker.reserve_jobs** - an event measuring the duration of the job "pickup query" -The "run", "error", "failure" and "enqueue" events will include a `:job` argument in the event's payload, -providing access to the job instance. +The "run", "error", and "failure" events will include a `:job` argument in the event's payload, +providing access to the job instance. The "enqueue" event will include a `:jobs` array (along with a +`:count`) — note that elements may be either `Delayed::Job` instances (when enqueued directly via +`Delayed::Job.enqueue`) or `ActiveJob` instances (when enqueued via `perform_later` / `perform_all_later`). ```ruby ActiveSupport::Notifications.subscribe('delayed.job.run') do |*args| diff --git a/lib/delayed/active_job_adapter.rb b/lib/delayed/active_job_adapter.rb index a2e5f2fd..5085a6ba 100644 --- a/lib/delayed/active_job_adapter.rb +++ b/lib/delayed/active_job_adapter.rb @@ -7,26 +7,63 @@ def enqueue_after_transaction_commit? end def enqueue(job) - _enqueue(job) + enqueue_all([job]) + job end def enqueue_at(job, timestamp) - _enqueue(job, run_at: Time.at(timestamp)) # rubocop:disable Rails/TimeZone + job.scheduled_at = Time.at(timestamp) # rubocop:disable Rails/TimeZone + enqueue_all([job]) + job + end + + def enqueue_all(jobs) + return 0 if jobs.empty? + + assert_safe_to_enqueue!(jobs) + + Delayed.lifecycle.run_callbacks(:enqueue, jobs) do + now = Delayed::Job.db_time_now + rows = jobs.map { |job| build_insert_row(job, now) } + result = Delayed::Job.insert_all(rows) # rubocop:disable Rails/SkipsModelValidations + assign_provider_job_ids(jobs, result) if Delayed::Job.connection.supports_insert_returning? + mark_successfully_enqueued(jobs) + end + + jobs.size end private - def _enqueue(job, opts = {}) - if enqueue_after_transaction_commit_enabled?(job) + def assert_safe_to_enqueue!(jobs) + if jobs.any? { |job| enqueue_after_transaction_commit_enabled?(job) } raise UnsafeEnqueueError, "The ':delayed' ActiveJob adapter is not compatible with enqueue_after_transaction_commit" end + if Delayed::Worker.delay_jobs == false + raise UnsafeEnqueueError, "The ':delayed' ActiveJob adapter is not compatible with delay_jobs false" + end + end - opts.merge!({ queue: job.queue_name, priority: job.priority }.compact) - .merge!(job.provider_attributes || {}) + def assign_provider_job_ids(jobs, result) + ids = result.rows.map(&:first) + jobs.zip(ids) { |job, id| job.provider_job_id = id } + end - Delayed::Job.enqueue(JobWrapper.new(job), opts).tap do |dj| - job.provider_job_id = dj.id - end + def mark_successfully_enqueued(jobs) + jobs.each { |job| job.successfully_enqueued = true if job.respond_to?(:successfully_enqueued=) } + end + + def build_insert_row(job, now) + opts = { queue: job.queue_name, priority: job.priority }.compact + opts.merge!(job.provider_attributes || {}) + opts[:run_at] = coerce_scheduled_at(job.scheduled_at) if job.scheduled_at + + prepared = Delayed::Backend::JobPreparer.new(JobWrapper.new(job), opts).prepare + Delayed::Job.new(prepared).attributes.compact.merge('created_at' => now, 'updated_at' => now) + end + + def coerce_scheduled_at(value) + value.is_a?(Numeric) ? Time.at(value) : value # rubocop:disable Rails/TimeZone end def enqueue_after_transaction_commit_enabled?(job) diff --git a/lib/delayed/backend/base.rb b/lib/delayed/backend/base.rb index e9ed30e6..d5a63556 100644 --- a/lib/delayed/backend/base.rb +++ b/lib/delayed/backend/base.rb @@ -14,8 +14,8 @@ def enqueue(*args) def enqueue_job(options) new(options).tap do |job| - Delayed.lifecycle.run_callbacks(:enqueue, job) do - job.hook(:enqueue) + warn_deprecated_enqueue_hook(job.payload_object) + Delayed.lifecycle.run_callbacks(:enqueue, [job]) do Delayed::Worker.delay_job?(job) ? job.save : job.invoke_job end end @@ -45,6 +45,15 @@ def work_off(num = 100) def name_assignable? column_names.include?('name') end + + private + + def warn_deprecated_enqueue_hook(payload) + return if payload.is_a?(Delayed::JobWrapper) + return unless payload.respond_to?(:enqueue) + + warn "[DEPRECATION] :enqueue hook on #{payload.class} is deprecated and is no longer invoked" + end end attr_reader :error @@ -109,9 +118,12 @@ def unlock def hook(name, *args) if payload_object.respond_to?(name) - if payload_object.is_a?(Delayed::JobWrapper) - return if name == :enqueue # this callback is not supported due to method naming conflicts. + if name == :enqueue + warn '[DEPRECATION] :enqueue hook is deprecated' + return + end + if payload_object.is_a?(Delayed::JobWrapper) warn '[DEPRECATION] Job hook methods (`before`, `after`, `success`, etc) are deprecated. Use ActiveJob callbacks instead.' end diff --git a/lib/delayed/lifecycle.rb b/lib/delayed/lifecycle.rb index 4aa7ad7a..9bab0b5a 100644 --- a/lib/delayed/lifecycle.rb +++ b/lib/delayed/lifecycle.rb @@ -4,7 +4,7 @@ class InvalidCallback < RuntimeError; end class Lifecycle EVENTS = { execute: [nil], - enqueue: [:job], + enqueue: [:jobs], perform: %i(worker job), error: %i(worker job), failure: %i(worker job), diff --git a/lib/delayed/plugins/instrumentation.rb b/lib/delayed/plugins/instrumentation.rb index cab572a9..043a4600 100644 --- a/lib/delayed/plugins/instrumentation.rb +++ b/lib/delayed/plugins/instrumentation.rb @@ -2,9 +2,9 @@ module Delayed module Plugins class Instrumentation < Plugin callbacks do |lifecycle| - lifecycle.around(:enqueue) do |job, *args, &block| - ActiveSupport::Notifications.instrument('delayed.job.enqueue', active_support_notifications_tags(job)) do - block.call(job, *args) + lifecycle.around(:enqueue) do |jobs, &block| + ActiveSupport::Notifications.instrument('delayed.job.enqueue', bulk_enqueue_tags(jobs)) do + block.call(jobs) end end @@ -34,6 +34,25 @@ def self.active_support_notifications_tags(job) job: job, } end + + def self.bulk_enqueue_tags(jobs) + { + count: jobs.size, + **summarize(jobs), + jobs: jobs, + } + end + + def self.summarize(jobs) + seed = { job_name: Hash.new(0), database: Hash.new(0), database_adapter: Hash.new(0) } + jobs.each_with_object(seed) do |job, acc| + name = job.respond_to?(:name) ? job.name : job.class.name + delayed_class = job.is_a?(Delayed::Job) ? job.class : Delayed::Job + acc[:job_name][name] += 1 + acc[:database][delayed_class.database_name] += 1 + acc[:database_adapter][delayed_class.database_adapter_name] += 1 + end + end end end end diff --git a/spec/delayed/active_job_adapter_spec.rb b/spec/delayed/active_job_adapter_spec.rb index d87eec55..638eafa0 100644 --- a/spec/delayed/active_job_adapter_spec.rb +++ b/spec/delayed/active_job_adapter_spec.rb @@ -68,10 +68,10 @@ def serialize(*) expect { JobClass.perform_later }.to raise_error(RuntimeError, "uh oh, serialize failed!") end - it 'bubbles out an error if Delayed::Job.enqueue fails' do - allow(Delayed::Job).to receive(:enqueue).and_raise("uh oh, enqueue failed!") + it 'bubbles out an error if the underlying insert fails' do + allow(Delayed::Job).to receive(:insert_all).and_raise('uh oh, insert failed!') - expect { JobClass.perform_later }.to raise_error(RuntimeError, "uh oh, enqueue failed!") + expect { JobClass.perform_later }.to raise_error(RuntimeError, 'uh oh, insert failed!') end it 'deserializes even if the underlying job class is not defined' do @@ -397,4 +397,241 @@ def perform(arg, kwarg:) end end end + + describe '.enqueue_all' do # rubocop:disable Metrics/BlockLength + let(:adapter) { ActiveJob::Base.queue_adapter } + + it 'returns 0 when given no jobs' do + expect(adapter.enqueue_all([])).to eq(0) + end + + context 'when Delayed::Worker.delay_jobs is false' do + around do |example| + was = Delayed::Worker.delay_jobs + Delayed::Worker.delay_jobs = false + example.run + ensure + Delayed::Worker.delay_jobs = was + end + + it 'raises UnsafeEnqueueError and inserts nothing' do + expect { adapter.enqueue_all([JobClass.new]) } + .to raise_error(Delayed::ActiveJobAdapter::UnsafeEnqueueError) + expect(Delayed::Job.count).to eq(0) + end + + it 'also raises for single-job perform_later' do + expect { JobClass.perform_later } + .to raise_error(Delayed::ActiveJobAdapter::UnsafeEnqueueError) + expect(Delayed::Job.count).to eq(0) + end + end + + context 'when the database adapter does not support INSERT RETURNING (e.g. MySQL)' do + before do + allow(Delayed::Job.connection).to receive(:supports_insert_returning?).and_return(false) + end + + it 'enqueues successfully but leaves provider_job_id nil' do + jobs = Array.new(2) { JobClass.new } + + expect(adapter.enqueue_all(jobs)).to eq(2) + expect(Delayed::Job.count).to eq(2) + expect(jobs.map(&:provider_job_id)).to all(be_nil) + end + end + + it 'inserts multiple jobs in a single INSERT' do + jobs = Array.new(3) { JobClass.new } + + expect { adapter.enqueue_all(jobs) } + .to emit_notification('sql.active_record').with_payload(hash_including(sql: a_string_matching(/\AINSERT INTO/i))) + expect(Delayed::Job.count).to eq(3) + end + + it 'returns the count of successfully enqueued jobs' do + jobs = Array.new(3) { JobClass.new } + expect(adapter.enqueue_all(jobs)).to eq(3) + end + + it 'sets provider_job_id on each input job when the adapter supports INSERT RETURNING' do + skip 'requires INSERT ... RETURNING support' unless Delayed::Job.connection.supports_insert_returning? + + jobs = Array.new(3) { JobClass.new } + adapter.enqueue_all(jobs) + expect(jobs.map(&:provider_job_id)).to match_array(Delayed::Job.pluck(:id)) + end + + if ActiveJob.gem_version.release >= Gem::Version.new('7.1') + it 'sets successfully_enqueued on each input job' do + jobs = Array.new(2) { JobClass.new } + adapter.enqueue_all(jobs) + expect(jobs).to all(be_successfully_enqueued) + end + end + + it 'honors per-job scheduled_at' do + skip 'requires INSERT ... RETURNING support' unless Delayed::Job.connection.supports_insert_returning? + + job = JobClass.new + job.scheduled_at = arbitrary_time + adapter.enqueue_all([JobClass.new, job]) + expect(Delayed::Job.find(job.provider_job_id).run_at).to eq(arbitrary_time) + end + + it 'applies db_time_now to run_at when no scheduled_at is set' do + Timecop.freeze(arbitrary_time) do + adapter.enqueue_all([JobClass.new]) + expect(Delayed::Job.last.run_at).to eq(arbitrary_time) + end + end + + it 'honors per-job queue and priority overrides' do + a = JobClass.new.tap do |j| + j.queue_name = 'q-a' + j.priority = 3 + end + b = JobClass.new.tap do |j| + j.queue_name = 'q-b' + j.priority = 7 + end + + adapter.enqueue_all([a, b]) + + rows = Delayed::Job.order(:id).to_a + expect(rows[0]).to have_attributes(queue: 'q-a', priority: 3) + expect(rows[1]).to have_attributes(queue: 'q-b', priority: 7) + end + + it 'supports a mix of job classes in one call' do + other_class = Class.new(ActiveJob::Base) do # rubocop:disable Rails/ApplicationJob + def perform; end + end + stub_const('OtherJobClass', other_class) + + adapter.enqueue_all([JobClass.new, OtherJobClass.new]) + + names = Delayed::Job.order(:id).pluck(:name) + expect(names).to eq(%w(JobClass OtherJobClass)) + end + + it 'sets the name column from display_name' do + adapter.enqueue_all([JobClass.new]) + expect(Delayed::Job.last.name).to eq('JobClass') + end + + it "fires Delayed's :enqueue lifecycle callback once with the jobs array" do + observed = [] + lifecycle_was = Delayed.lifecycle + Delayed.instance_variable_set(:@lifecycle, Delayed::Lifecycle.new) + Delayed.lifecycle.before(:enqueue) { |jobs| observed << jobs } + + input = Array.new(3) { JobClass.new } + adapter.enqueue_all(input) + + expect(observed.size).to eq(1) + expect(observed.first).to eq(input) + ensure + Delayed.instance_variable_set(:@lifecycle, lifecycle_was) + end + + it 'populates provider_job_id and successfully_enqueued before after(:enqueue) callbacks fire' do + skip 'requires INSERT ... RETURNING support' unless Delayed::Job.connection.supports_insert_returning? + + ids_seen = nil + enqueued_seen = nil + lifecycle_was = Delayed.lifecycle + Delayed.instance_variable_set(:@lifecycle, Delayed::Lifecycle.new) + Delayed.lifecycle.after(:enqueue) do |jobs| + ids_seen = jobs.map(&:provider_job_id) + if ActiveJob.gem_version.release >= Gem::Version.new('7.1') + enqueued_seen = jobs.map(&:successfully_enqueued?) + end + end + + adapter.enqueue_all([JobClass.new, JobClass.new]) + + expect(ids_seen).to all(be_a(Integer)) + if ActiveJob.gem_version.release >= Gem::Version.new('7.1') + expect(enqueued_seen).to all(be(true)) + end + ensure + Delayed.instance_variable_set(:@lifecycle, lifecycle_was) + end + + it 'does not fire ActiveJob before/around/after_enqueue callbacks' do + fires = [] + JobClass.before_enqueue { fires << :before } + JobClass.around_enqueue do |_j, block| + fires << :around_before + block.call + fires << :around_after + end + JobClass.after_enqueue { fires << :after } + + adapter.enqueue_all([JobClass.new, JobClass.new]) + + expect(fires).to be_empty + end + + if ActiveJob.gem_version.release >= Gem::Version.new('7.2') + context 'when a job sets enqueue_after_transaction_commit to :always' do + before do + JobClass.include ActiveJob::EnqueueAfterTransactionCommit + JobClass.enqueue_after_transaction_commit = :always + end + + it 'raises UnsafeEnqueueError and inserts nothing' do + ActiveJob.deprecator.silence do + expect { adapter.enqueue_all([JobClass.new]) }.to raise_error(Delayed::ActiveJobAdapter::UnsafeEnqueueError) + end + expect(Delayed::Job.count).to eq(0) + end + end + end + + context 'when a job has a stale run_at and deny_stale_enqueues is enabled' do + around do |example| + was = Delayed::Worker.deny_stale_enqueues + Delayed::Worker.deny_stale_enqueues = true + example.run + ensure + Delayed::Worker.deny_stale_enqueues = was + end + + it 'raises StaleEnqueueError and inserts nothing' do + job = JobClass.new + job.scheduled_at = Time.now.utc - 1.day + expect { adapter.enqueue_all([JobClass.new, job]) }.to raise_error(Delayed::StaleEnqueueError) + expect(Delayed::Job.count).to eq(0) + end + end + end + + describe 'single-job perform_later routes through insert_all' do + it 'invokes insert_all (not Delayed::Job.enqueue)' do + expect(Delayed::Job).to receive(:insert_all).and_call_original # rubocop:disable RSpec/MessageSpies + expect(Delayed::Job).not_to receive(:enqueue) # rubocop:disable RSpec/MessageSpies + + JobClass.perform_later + end + + it 'persists the job exactly once' do + expect { JobClass.perform_later }.to change { Delayed::Job.count }.by(1) + end + end + + if ActiveJob.gem_version.release >= Gem::Version.new('7.1') + describe 'ActiveJob.perform_all_later' do + it 'bulk-enqueues all jobs with a single INSERT' do + expect { ActiveJob.perform_all_later([JobClass.new, JobClass.new, JobClass.new]) } + .to emit_notification('sql.active_record').with_payload(hash_including(sql: a_string_matching(/\AINSERT INTO/i))) + expect(Delayed::Job.count).to eq(3) + end + + it 'returns nil' do + expect(ActiveJob.perform_all_later([JobClass.new])).to be_nil + end + end + end end diff --git a/spec/delayed/job_spec.rb b/spec/delayed/job_spec.rb index f1966406..9e0699ac 100644 --- a/spec/delayed/job_spec.rb +++ b/spec/delayed/job_spec.rb @@ -37,12 +37,6 @@ def create_job(opts = {}) end describe 'enqueue' do - it "allows enqueue hook to modify job at DB level" do - later = described_class.db_time_now + 20.minutes - job = described_class.enqueue payload_object: EnqueueJobMod.new - expect(described_class.find(job.id).run_at).to be_within(1).of(later) - end - context 'with a hash' do it "raises ArgumentError when handler doesn't respond_to :perform" do expect { described_class.enqueue(payload_object: Object.new) }.to raise_error(ArgumentError) @@ -170,6 +164,42 @@ def create_job(opts = {}) expect(job).to be_persisted end end + + context 'when payload defines a deprecated :enqueue hook' do + before do + stub_const('JobWithEnqueueHook', Class.new do + def enqueue(_job); end + + def perform; end + end) + end + + it 'emits a deprecation warning naming the payload class' do + expect { described_class.enqueue(JobWithEnqueueHook.new) } + .to output(/\[DEPRECATION\] :enqueue hook on JobWithEnqueueHook/).to_stderr + end + + it 'does not invoke the payload enqueue method' do + payload = JobWithEnqueueHook.new + expect(payload).not_to receive(:enqueue) + expect { described_class.enqueue(payload) }.to output(/DEPRECATION/).to_stderr + end + end + + context 'when payload does not define :enqueue' do + it 'does not emit a deprecation warning' do + expect { described_class.enqueue(SimpleJob.new) } + .not_to output(/\[DEPRECATION\] :enqueue hook/).to_stderr + end + end + + context 'when payload is an ActiveJob wrapper that responds to :enqueue' do + it 'does not emit the deprecation warning' do + wrapper = ActiveJob::QueueAdapters::DelayedJobAdapter::JobWrapper.new(ActiveJobJob.new.serialize) + expect { described_class.enqueue(payload_object: wrapper) } + .not_to output(/\[DEPRECATION\] :enqueue hook/).to_stderr + end + end end describe 'callbacks' do @@ -187,9 +217,9 @@ def create_job(opts = {}) it 'calls before and after callbacks' do job = described_class.enqueue(CallbackJob.new) - expect(CallbackJob.messages).to eq(['enqueue']) + expect(CallbackJob.messages).to eq([]) job.invoke_job - expect(CallbackJob.messages).to eq(%w(enqueue before perform success after)) + expect(CallbackJob.messages).to eq(%w(before perform success after)) end it 'calls the after callback with an error' do @@ -197,14 +227,14 @@ def create_job(opts = {}) expect(job.payload_object).to receive(:perform).and_raise(RuntimeError.new('fail')) expect { job.invoke_job }.to raise_error(RuntimeError, 'fail') - expect(CallbackJob.messages).to eq(['enqueue', 'before', 'error: RuntimeError', 'after']) + expect(CallbackJob.messages).to eq(['before', 'error: RuntimeError', 'after']) end it 'calls error when before raises an error' do job = described_class.enqueue(CallbackJob.new) expect(job.payload_object).to receive(:before).and_raise(RuntimeError.new('fail')) expect { job.invoke_job }.to raise_error(RuntimeError, 'fail') - expect(CallbackJob.messages).to eq(['enqueue', 'error: RuntimeError', 'after']) + expect(CallbackJob.messages).to eq(['error: RuntimeError', 'after']) end end diff --git a/spec/delayed/plugins/instrumentation_spec.rb b/spec/delayed/plugins/instrumentation_spec.rb index 933c1e1f..17ff3ce9 100644 --- a/spec/delayed/plugins/instrumentation_spec.rb +++ b/spec/delayed/plugins/instrumentation_spec.rb @@ -3,6 +3,39 @@ RSpec.describe Delayed::Plugins::Instrumentation do let!(:job) { Delayed::Job.enqueue SimpleJob.new, priority: 13, queue: 'test' } + it 'emits delayed.job.enqueue when a job is enqueued' do + expect { Delayed::Job.enqueue SimpleJob.new }.to emit_notification('delayed.job.enqueue').with_payload( + count: 1, + job_name: { 'SimpleJob' => 1 }, + database: { current_database_name => 1 }, + database_adapter: { current_adapter => 1 }, + jobs: a_collection_containing_exactly(an_instance_of(Delayed::Job)), + ) + end + + it 'emits a single delayed.job.enqueue when a batch is enqueued via the ActiveJob adapter' do + job_class = Class.new(ActiveJob::Base) do + def perform; end + end + stub_const('BatchedJob', job_class) + + adapter_was = ActiveJob::Base.queue_adapter + ActiveJob::Base.queue_adapter = :delayed + begin + jobs = Array.new(3) { BatchedJob.new } + expect { ActiveJob::Base.queue_adapter.enqueue_all(jobs) } + .to emit_notification('delayed.job.enqueue').with_payload( + count: 3, + job_name: { 'BatchedJob' => 3 }, + database: { current_database_name => 3 }, + database_adapter: { current_adapter => 3 }, + jobs: jobs, + ) + ensure + ActiveJob::Base.queue_adapter = adapter_was + end + end + it 'emits delayed.job.run' do expect { Delayed::Worker.new.work_off }.to emit_notification('delayed.job.run').with_payload( job_name: 'SimpleJob', diff --git a/spec/helper.rb b/spec/helper.rb index f815a767..b34ed4ff 100644 --- a/spec/helper.rb +++ b/spec/helper.rb @@ -268,6 +268,10 @@ def current_database end end +def current_database_name + current_adapter == 'sqlite3' ? 'tmp/database.sqlite' : 'delayed_job_test' +end + QueryUnderTest = Struct.new(:sql, :connection) do def self.for(query, connection: ActiveRecord::Base.connection) new(query.respond_to?(:to_sql) ? query.to_sql : query.to_s, connection) diff --git a/spec/lifecycle_spec.rb b/spec/lifecycle_spec.rb index daf2bea3..06475276 100644 --- a/spec/lifecycle_spec.rb +++ b/spec/lifecycle_spec.rb @@ -3,7 +3,7 @@ describe Delayed::Lifecycle do let(:lifecycle) { described_class.new } let(:callback) { lambda { |*_args| } } - let(:arguments) { [1] } + let(:arguments) { [%i(job_a job_b)] } let(:behavior) { double(Object, before!: nil, after!: nil, inside!: nil) } let(:wrapped_block) { proc { behavior.inside! } } diff --git a/spec/performable_method_spec.rb b/spec/performable_method_spec.rb index a37fae27..9f76dcfe 100644 --- a/spec/performable_method_spec.rb +++ b/spec/performable_method_spec.rb @@ -95,12 +95,6 @@ def private_method; end end end - it 'delegates enqueue hook to object' do - story = Story.create - expect(story).to receive(:enqueue).with(an_instance_of(Delayed::Job)) - story.delay.tell - end - it 'delegates error hook to object' do story = Story.create expect(story).to receive(:error).with(an_instance_of(Delayed::Job), an_instance_of(RuntimeError)) diff --git a/spec/sample_jobs.rb b/spec/sample_jobs.rb index 7d813354..8ec5c959 100644 --- a/spec/sample_jobs.rb +++ b/spec/sample_jobs.rb @@ -77,10 +77,6 @@ def perform class CallbackJob cattr_accessor :messages - def enqueue(_job) - self.class.messages << 'enqueue' - end - def before(_job) self.class.messages << 'before' end @@ -106,12 +102,6 @@ def failure(_job) end end -class EnqueueJobMod < SimpleJob - def enqueue(job) - job.run_at = 20.minutes.from_now - end -end - class ActiveJobJob < ActiveJob::Base # rubocop:disable Rails/ApplicationJob def perform(*args, **kwargs); end end