From 680ab479c214b1f4325b352c4ecedf8b59be5056 Mon Sep 17 00:00:00 2001 From: Todd Date: Thu, 12 May 2016 10:14:37 -0400 Subject: [PATCH 1/9] shit happens better safe than sorry right? --- lib/resque/scheduler.rb | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/lib/resque/scheduler.rb b/lib/resque/scheduler.rb index 8ca91367..7e06647a 100644 --- a/lib/resque/scheduler.rb +++ b/lib/resque/scheduler.rb @@ -140,11 +140,16 @@ def load_schedule_job(name, config) args = optionizate_interval_value(config[interval_type]) args = [args, nil, job: true] if args.is_a?(::String) - job = rufus_scheduler.send(interval_type, *args) do - enqueue_recurring(name, config) + begin + job = rufus_scheduler.send(interval_type, *args) do + enqueue_recurring(name, config) + end + @scheduled_jobs[name] = job + interval_defined = true + + rescue => e + log "[Bad Schedule] ignoring with: #{e.message}\n#{e.backtrace.join("\n")}" end - @scheduled_jobs[name] = job - interval_defined = true break end unless interval_defined From 6079f4db62e33df89bcb5d4f29dc4028d859cd2f Mon Sep 17 00:00:00 2001 From: Todd Date: Thu, 12 May 2016 10:25:46 -0400 Subject: [PATCH 2/9] log to error log log_error --- lib/resque/scheduler.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/resque/scheduler.rb b/lib/resque/scheduler.rb index 7e06647a..c00c0b3b 100644 --- a/lib/resque/scheduler.rb +++ b/lib/resque/scheduler.rb @@ -148,7 +148,7 @@ def load_schedule_job(name, config) interval_defined = true rescue => e - log "[Bad Schedule] ignoring with: #{e.message}\n#{e.backtrace.join("\n")}" + log_error "[Bad Schedule] ignoring with: #{e.message}\n#{e.backtrace.join("\n")}" end break end From 6d7c2052c15ab31f77e27fefc7519f14ef94fa98 Mon Sep 17 00:00:00 2001 From: Todd Fisher Date: Mon, 28 Nov 2016 13:45:13 -0500 Subject: [PATCH 3/9] server should not have access so don't try to force it --- lib/resque/scheduler/server.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/resque/scheduler/server.rb b/lib/resque/scheduler/server.rb index 286118a7..9ec59d46 100644 --- a/lib/resque/scheduler/server.rb +++ b/lib/resque/scheduler/server.rb @@ -147,7 +147,7 @@ def show_job_arguments(args) def queue_from_class_name(class_name) Resque.queue_from_class( - Resque::Scheduler::Util.constantize(class_name) + class_name ) end From 69182c19e49c04f30edb7ea7be46ae747a6e0fc4 Mon Sep 17 00:00:00 2001 From: Todd Fisher Date: Sun, 19 Mar 2017 15:28:26 -0400 Subject: [PATCH 4/9] we want to run scheduled jobs and delayed jobs in separate processes. we have observed major throughput issues when jobs are dynamically scheduled adversly impacting throughput of delayed jobs --- lib/resque/scheduler.rb | 72 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/lib/resque/scheduler.rb b/lib/resque/scheduler.rb index c00c0b3b..207464e1 100644 --- a/lib/resque/scheduler.rb +++ b/lib/resque/scheduler.rb @@ -31,6 +31,78 @@ class << self # allow user to set an additional failure handler attr_writer :failure_handler + # run with RESQUE_SCHEDULER_MASTER_LOCK_PREFIX=delayed + def run_delayed_only + procline 'Starting Delayed' + + # trap signals + register_signal_handlers + + # Quote from the resque/worker. + # Fix buffering so we can `rake resque:scheduler > scheduler.log` and + # get output from the child in there. + $stdout.sync = true + $stderr.sync = true + + begin + @th = Thread.current + + # Now start the scheduling part of the loop. + loop do + begin + handle_delayed_items if master? + rescue Errno::EAGAIN, Errno::ECONNRESET, Redis::CannotConnectError => e + log! e.message + release_master_lock + end + poll_sleep + end + + rescue Interrupt + log 'Exiting' + end + ensure + release_master_lock + end + + # run with RESQUE_SCHEDULER_MASTER_LOCK_PREFIX=scheduler + def run_scheduled_only + procline 'Starting Scheduler' + + # trap signals + register_signal_handlers + + # Quote from the resque/worker. + # Fix buffering so we can `rake resque:scheduler > scheduler.log` and + # get output from the child in there. + $stdout.sync = true + $stderr.sync = true + + # Load the schedule into rufus + # If dynamic is set, load that schedule otherwise use normal load + reload_schedule! + + begin + @th = Thread.current + + # Now start the scheduling part of the loop. + loop do + begin + update_schedule if master? + rescue Errno::EAGAIN, Errno::ECONNRESET, Redis::CannotConnectError => e + log! e.message + release_master_lock + end + poll_sleep + end + + rescue Interrupt + log 'Exiting' + end + ensure + release_master_lock + end + # Schedule all jobs and continually look for delayed jobs (never returns) def run procline 'Starting' From 563cbf5c615c2b6a413615dab365af0c6abb720b Mon Sep 17 00:00:00 2001 From: Todd Fisher Date: Sun, 19 Mar 2017 15:34:05 -0400 Subject: [PATCH 5/9] we should force the lock names when run with separate methods --- lib/resque/scheduler.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/resque/scheduler.rb b/lib/resque/scheduler.rb index 207464e1..cc432221 100644 --- a/lib/resque/scheduler.rb +++ b/lib/resque/scheduler.rb @@ -34,6 +34,7 @@ class << self # run with RESQUE_SCHEDULER_MASTER_LOCK_PREFIX=delayed def run_delayed_only procline 'Starting Delayed' + ENV['RESQUE_SCHEDULER_MASTER_LOCK_PREFIX']="delayed" # trap signals register_signal_handlers @@ -68,6 +69,7 @@ def run_delayed_only # run with RESQUE_SCHEDULER_MASTER_LOCK_PREFIX=scheduler def run_scheduled_only procline 'Starting Scheduler' + ENV['RESQUE_SCHEDULER_MASTER_LOCK_PREFIX']="scheduler" # trap signals register_signal_handlers From 39ee6271518b77745cf96ea5696d7b2fb61ce0f7 Mon Sep 17 00:00:00 2001 From: Todd Fisher Date: Sun, 19 Mar 2017 22:30:02 -0400 Subject: [PATCH 6/9] clearify the procline when running as master scheduler --- lib/resque/scheduler.rb | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/resque/scheduler.rb b/lib/resque/scheduler.rb index cc432221..639ec247 100644 --- a/lib/resque/scheduler.rb +++ b/lib/resque/scheduler.rb @@ -90,7 +90,10 @@ def run_scheduled_only # Now start the scheduling part of the loop. loop do begin - update_schedule if master? + if master? + update_schedule + procline 'Processing Schedules' + end rescue Errno::EAGAIN, Errno::ECONNRESET, Redis::CannotConnectError => e log! e.message release_master_lock From 55086135fc5acf623cf27d5fd38154d782f8758e Mon Sep 17 00:00:00 2001 From: Todd Fisher Date: Mon, 27 Mar 2017 12:35:39 -0400 Subject: [PATCH 7/9] processing delayed jobs does not need a lock --- lib/resque/scheduler.rb | 35 +++++++++++++++++---- lib/resque/scheduler/delaying_extensions.rb | 10 ++++-- 2 files changed, 37 insertions(+), 8 deletions(-) diff --git a/lib/resque/scheduler.rb b/lib/resque/scheduler.rb index 639ec247..6aeeb9cb 100644 --- a/lib/resque/scheduler.rb +++ b/lib/resque/scheduler.rb @@ -31,10 +31,9 @@ class << self # allow user to set an additional failure handler attr_writer :failure_handler - # run with RESQUE_SCHEDULER_MASTER_LOCK_PREFIX=delayed + # runs without a lock def run_delayed_only procline 'Starting Delayed' - ENV['RESQUE_SCHEDULER_MASTER_LOCK_PREFIX']="delayed" # trap signals register_signal_handlers @@ -49,12 +48,12 @@ def run_delayed_only @th = Thread.current # Now start the scheduling part of the loop. + procline 'Processing Delayed Items' loop do begin - handle_delayed_items if master? + unlocked_handle_delayed_items rescue Errno::EAGAIN, Errno::ECONNRESET, Redis::CannotConnectError => e log! e.message - release_master_lock end poll_sleep end @@ -62,8 +61,32 @@ def run_delayed_only rescue Interrupt log 'Exiting' end - ensure - release_master_lock + end + + # Handles queueing delayed items + # at_time - Time to start scheduling items (default: now). + # this should be multi-process safe + def unlocked_handle_delayed_items(at_time = nil) + timestamp = Resque.next_delayed_timestamp(at_time) + if timestamp + until timestamp.nil? + unlocked_enqueue_delayed_items_for_timestamp(timestamp) + timestamp = Resque.next_delayed_timestamp(at_time) + end + end + end + + def unlocked_enqueue_delayed_items_for_timestamp(timestamp) + item = nil + loop do + handle_shutdown do + # Continually check that it is still the master + item = enqueue_next_item(timestamp) + end + # continue processing until there are no more ready items in this + # timestamp + break if item.nil? + end end # run with RESQUE_SCHEDULER_MASTER_LOCK_PREFIX=scheduler diff --git a/lib/resque/scheduler/delaying_extensions.rb b/lib/resque/scheduler/delaying_extensions.rb index ec7606f0..6b83beae 100644 --- a/lib/resque/scheduler/delaying_extensions.rb +++ b/lib/resque/scheduler/delaying_extensions.rb @@ -114,9 +114,14 @@ def next_delayed_timestamp(at_time = nil) def next_item_for_timestamp(timestamp) key = "delayed:#{timestamp.to_i}" + # TODO: this shoudl be all done in lua lpop, srem should be atomic encoded_item = redis.lpop(key) - redis.srem("timestamps:#{encoded_item}", key) - item = decode(encoded_item) + if encoded_item + redis.srem("timestamps:#{encoded_item}", key) + item = decode(encoded_item) + else + item = nil + end # If the list is empty, remove it. clean_up_timestamp(key, timestamp) @@ -285,6 +290,7 @@ def clean_up_timestamp(key, timestamp) # Use a watch here to ensure nobody adds jobs to this delayed # queue while we're removing it. redis.watch(key) do + # TODO: this should be done using lua script if redis.llen(key).to_i == 0 # If the list is empty, remove it. redis.multi do From d42e0af02c286aa1bad047523bc872bcaf7ae550 Mon Sep 17 00:00:00 2001 From: Todd Fisher Date: Sat, 16 Jun 2018 11:36:31 -0400 Subject: [PATCH 8/9] remove dependency on old redis version --- resque-scheduler.gemspec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resque-scheduler.gemspec b/resque-scheduler.gemspec index 3a4faea8..29479d54 100644 --- a/resque-scheduler.gemspec +++ b/resque-scheduler.gemspec @@ -54,7 +54,7 @@ Gem::Specification.new do |spec| spec.add_development_dependency 'rubocop', '~> 0.40.0' spec.add_runtime_dependency 'mono_logger', '~> 1.0' - spec.add_runtime_dependency 'redis', '~> 3.3' + spec.add_runtime_dependency 'redis' spec.add_runtime_dependency 'resque', '~> 1.26' spec.add_runtime_dependency 'rufus-scheduler', '~> 3.2' end From 10b303313009d62addb71813600b09679609e7c5 Mon Sep 17 00:00:00 2001 From: Todd Fisher Date: Fri, 15 Feb 2019 10:35:14 -0500 Subject: [PATCH 9/9] less offensive :/ --- lib/resque/scheduler.rb | 2 +- resque-scheduler.gemspec | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/resque/scheduler.rb b/lib/resque/scheduler.rb index 6aeeb9cb..583339da 100644 --- a/lib/resque/scheduler.rb +++ b/lib/resque/scheduler.rb @@ -92,7 +92,7 @@ def unlocked_enqueue_delayed_items_for_timestamp(timestamp) # run with RESQUE_SCHEDULER_MASTER_LOCK_PREFIX=scheduler def run_scheduled_only procline 'Starting Scheduler' - ENV['RESQUE_SCHEDULER_MASTER_LOCK_PREFIX']="scheduler" + ENV['RESQUE_SCHEDULER_MASTER_LOCK_PREFIX'] = 'scheduler' # trap signals register_signal_handlers diff --git a/resque-scheduler.gemspec b/resque-scheduler.gemspec index 29479d54..5c749f39 100644 --- a/resque-scheduler.gemspec +++ b/resque-scheduler.gemspec @@ -55,6 +55,6 @@ Gem::Specification.new do |spec| spec.add_runtime_dependency 'mono_logger', '~> 1.0' spec.add_runtime_dependency 'redis' - spec.add_runtime_dependency 'resque', '~> 1.26' - spec.add_runtime_dependency 'rufus-scheduler', '~> 3.2' + spec.add_runtime_dependency 'resque' + spec.add_runtime_dependency 'rufus-scheduler' end