Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
delayed (0.5.5)
delayed (0.6.0)
activerecord (>= 5.2)
concurrent-ruby

Expand Down
2 changes: 1 addition & 1 deletion delayed.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Gem::Specification.new do |spec|
spec.require_paths = ['lib']
spec.summary = 'a multi-threaded, SQL-driven ActiveJob backend used at Betterment to process millions of background jobs per day'

spec.version = '0.5.5'
spec.version = '0.6.0'
spec.metadata = {
'changelog_uri' => 'https://github.com/betterment/delayed/blob/main/CHANGELOG.md',
'bug_tracker_uri' => 'https://github.com/betterment/delayed/issues',
Expand Down
2 changes: 1 addition & 1 deletion lib/delayed/runnable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def start
def on_exit!; end

def interruptable_sleep(seconds)
pipe[0].wait_readable(seconds)
pipe[0].wait_readable(seconds) if seconds.positive?
end

def stop
Expand Down
9 changes: 9 additions & 0 deletions lib/delayed/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class Worker
include Runnable

cattr_accessor :sleep_delay, instance_writer: false, default: 5
cattr_accessor :min_reserve_interval, instance_writer: false, default: 1
cattr_accessor :max_attempts, instance_writer: false, default: 25
cattr_accessor :max_claims, instance_writer: false, default: 5
cattr_accessor :max_run_time, instance_writer: false, default: 20.minutes
Expand Down Expand Up @@ -92,6 +93,7 @@ def work_off(num = 100)
total = 0

while total < num
start = clock_time
jobs = reserve_jobs
break if jobs.empty?

Expand All @@ -107,6 +109,9 @@ def work_off(num = 100)
pool.wait_for_termination

break if stop? # leave if we're exiting

elapsed = clock_time - start
interruptable_sleep(self.class.min_reserve_interval - elapsed)
end

[success.value, total - success.value]
Expand Down Expand Up @@ -227,5 +232,9 @@ def reserve_jobs
def reload!
Rails.application.reloader.reload! if defined?(Rails.application.reloader) && Rails.application.reloader.check!
end

def clock_time
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
end
end
4 changes: 2 additions & 2 deletions spec/delayed/tasks_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def stub_env(key, value)
.to change { Delayed::Worker.min_priority }.from(nil).to(6)
.and change { Delayed::Worker.max_priority }.from(nil).to(8)
.and change { Delayed::Worker.queues }.from([]).to(%w(foo bar))
.and change { Delayed::Worker.sleep_delay }.from(5).to(1)
.and change { Delayed::Worker.sleep_delay }.from(TEST_SLEEP_DELAY).to(1)
.and change { Delayed::Worker.read_ahead }.from(5).to(3)
.and change { Delayed::Worker.max_claims }.from(5).to(3)
end
Expand Down Expand Up @@ -96,7 +96,7 @@ def stub_env(key, value)
.to change { Delayed::Worker.min_priority }.from(nil).to(6)
.and change { Delayed::Worker.max_priority }.from(nil).to(8)
.and change { Delayed::Worker.queues }.from([]).to(%w(foo))
.and change { Delayed::Worker.sleep_delay }.from(5).to(1)
.and change { Delayed::Worker.sleep_delay }.from(TEST_SLEEP_DELAY).to(1)
.and change { Delayed::Worker.read_ahead }.from(5).to(3)
.and change { Delayed::Worker.max_claims }.from(5).to(3)
end
Expand Down
10 changes: 10 additions & 0 deletions spec/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ class SingletonClass
include Singleton
end

# Negative values are treated as sleep(0),
# so we can use different values to test the sleep behavior:
TEST_MIN_RESERVE_INTERVAL = -10
TEST_SLEEP_DELAY = -100

RSpec.configure do |config|
config.around(:each) do |example|
aj_priority_was = ActiveJob::Base.priority
Expand All @@ -113,6 +118,10 @@ class SingletonClass
queues_was = Delayed::Worker.queues
read_ahead_was = Delayed::Worker.read_ahead
sleep_delay_was = Delayed::Worker.sleep_delay
min_reserve_interval_was = Delayed::Worker.min_reserve_interval

Delayed::Worker.sleep_delay = TEST_SLEEP_DELAY
Delayed::Worker.min_reserve_interval = TEST_MIN_RESERVE_INTERVAL

example.run
ensure
Expand All @@ -130,6 +139,7 @@ class SingletonClass
Delayed::Worker.queues = queues_was
Delayed::Worker.read_ahead = read_ahead_was
Delayed::Worker.sleep_delay = sleep_delay_was
Delayed::Worker.min_reserve_interval = min_reserve_interval_was

Delayed::Job.delete_all
end
Expand Down
106 changes: 57 additions & 49 deletions spec/worker_spec.rb
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
require 'helper'

describe Delayed::Worker do
before do
described_class.sleep_delay = 0
end

describe 'start' do
it 'runs the :execute lifecycle hook' do
performances = []
Expand Down Expand Up @@ -32,62 +28,74 @@
allow(subject).to receive(:interruptable_sleep).and_call_original
end

context 'when there are no jobs' do
before do
allow(Delayed::Job).to receive(:reserve).and_return([])
end
around do |example|
max_claims_was = described_class.max_claims
described_class.max_claims = max_claims
example.run
ensure
described_class.max_claims = max_claims_was
end

it 'does not log and then sleeps' do
before do
allow(Delayed::Job).to receive(:reserve).and_return((0...jobs_returned).map { job }, [])
end

let(:max_claims) { 1 }
let(:jobs_returned) { 1 }
let(:job) do
instance_double(
Delayed::Job,
id: 123,
max_run_time: 10,
name: 'MyJob',
run_at: Delayed::Job.db_time_now,
created_at: Delayed::Job.db_time_now,
priority: Delayed::Priority.interactive,
queue: 'testqueue',
attempts: 0,
invoke_job: true,
destroy: true,
)
end

it 'logs the count and sleeps only within the loop' do
subject.run!
expect(Delayed.logger).to have_received(:info).with(/1 jobs processed/)
expect(subject).to have_received(:interruptable_sleep).once.with(a_value_within(1).of(TEST_MIN_RESERVE_INTERVAL))
expect(subject).not_to have_received(:interruptable_sleep).with(TEST_SLEEP_DELAY)
end

context 'when no jobs are returned' do
let(:jobs_returned) { 0 }

it 'does not log and then sleeps only outside of the loop' do
subject.run!
expect(Delayed.logger).not_to have_received(:info)
expect(subject).to have_received(:interruptable_sleep)
expect(subject).to have_received(:interruptable_sleep).with(TEST_SLEEP_DELAY)
end
end

context 'when there is a job worked off' do
around do |example|
max_claims_was = described_class.max_claims
described_class.max_claims = max_claims
example.run
ensure
described_class.max_claims = max_claims_was
end

before do
allow(Delayed::Job).to receive(:reserve).and_return([job], [])
end

let(:max_claims) { 1 }
let(:job) do
instance_double(
Delayed::Job,
id: 123,
max_run_time: 10,
name: 'MyJob',
run_at: Delayed::Job.db_time_now,
created_at: Delayed::Job.db_time_now,
priority: Delayed::Priority.interactive,
queue: 'testqueue',
attempts: 0,
invoke_job: true,
destroy: true,
)
end
context 'when max_claims is 3 and 3 jobs are returned' do
let(:max_claims) { 3 }
let(:jobs_returned) { 3 }

it 'logs the count and does not sleep' do
it 'logs the count and sleeps only in the loop' do
subject.run!
expect(Delayed.logger).to have_received(:info).with(/1 jobs processed/)
expect(subject).not_to have_received(:interruptable_sleep)
expect(Delayed.logger).to have_received(:info).with(/3 jobs processed/)
expect(subject).to have_received(:interruptable_sleep).once.with(a_value_within(1).of(TEST_MIN_RESERVE_INTERVAL))
expect(subject).not_to have_received(:interruptable_sleep).with(TEST_SLEEP_DELAY)
end
end

context 'when max_claims is 2' do
let(:max_claims) { 2 }
context 'when max_claims is 3 and 2 jobs are returned' do
let(:max_claims) { 3 }
let(:jobs_returned) { 2 }

it 'logs the count and sleeps' do
subject.run!
expect(Delayed.logger).to have_received(:info).with(/1 jobs processed/)
expect(subject).to have_received(:interruptable_sleep)
end
it 'logs the count and sleeps both in the loop and outside of the loop' do
subject.run!
expect(Delayed.logger).to have_received(:info).with(/2 jobs processed/)
expect(subject).to have_received(:interruptable_sleep).once.with(a_value_within(1).of(TEST_MIN_RESERVE_INTERVAL))
expect(subject).to have_received(:interruptable_sleep).once.with(TEST_SLEEP_DELAY)
end
end
end
Expand Down