From 58ddb41e28f88a1baf338a2a7f5ca2a292ecd092 Mon Sep 17 00:00:00 2001 From: Bruce Krysiak Date: Mon, 14 Nov 2016 12:09:05 -0800 Subject: [PATCH 1/8] Update README to show how to use lock_after_execution_period --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 665e4f2..f2da7e8 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ gem 'resque_solo' class UpdateCat include Resque::Plugins::UniqueJob @queue = :cats + @lock_after_execution_period = 20 def self.perform(cat_id) # do something From 6ab7620ddd5936a1a976c12d711db5bcba5bfc1a Mon Sep 17 00:00:00 2001 From: Bruce Krysiak Date: Mon, 14 Nov 2016 17:35:30 -0800 Subject: [PATCH 2/8] Step 1: move the enqueued test to a before hook --- lib/resque_ext/resque.rb | 17 ----------------- lib/resque_solo/unique_job.rb | 8 ++++++++ resque_solo.gemspec | 2 ++ test/resque_test.rb | 19 +++++++++++++------ test/test_helper.rb | 5 ++++- 5 files changed, 27 insertions(+), 24 deletions(-) diff --git a/lib/resque_ext/resque.rb b/lib/resque_ext/resque.rb index 58dad74..e1748fe 100644 --- a/lib/resque_ext/resque.rb +++ b/lib/resque_ext/resque.rb @@ -1,22 +1,5 @@ module Resque class << self - def enqueue_to(queue, klass, *args) - # Perform before_enqueue hooks. Don't perform enqueue if any hook returns false - before_hooks = Plugin.before_enqueue_hooks(klass).collect do |hook| - klass.send(hook, *args) - end - return nil if before_hooks.any? { |result| result == false } - - result = Job.create(queue, klass, *args) - return nil if result == "EXISTED" - - Plugin.after_enqueue_hooks(klass).each do |hook| - klass.send(hook, *args) - end - - true - end - def enqueued?(klass, *args) enqueued_in?(queue_from_class(klass), klass, *args) end diff --git a/lib/resque_solo/unique_job.rb b/lib/resque_solo/unique_job.rb index 7d55af4..b2cd571 100644 --- a/lib/resque_solo/unique_job.rb +++ b/lib/resque_solo/unique_job.rb @@ -45,6 +45,14 @@ def ttl def lock_after_execution_period @lock_after_execution_period ||= 0 end + + # We want this to run first in before_enqueue_hooks (which are alpha sorted), so name appropriately + def before_enqueue_001_solo_job(*args) + queue_name = self.instance_variable_get(:@queue) + item = { class: self.to_s, args: args } + !ResqueSolo::Queue.queued?(queue_name, item) + end + end end end diff --git a/resque_solo.gemspec b/resque_solo.gemspec index b12c37c..32fb9e9 100644 --- a/resque_solo.gemspec +++ b/resque_solo.gemspec @@ -20,5 +20,7 @@ Gem::Specification.new do |spec| spec.add_development_dependency "bundler", "~> 1.6" spec.add_development_dependency "fakeredis", "~> 0.4" spec.add_development_dependency "minitest", "~> 5.8" + spec.add_development_dependency "minitest-reporters", "~> 1.1" spec.add_development_dependency "rake", "~> 11.2" + spec.add_development_dependency "m", "~> 1.5" end diff --git a/test/resque_test.rb b/test/resque_test.rb index 820e42f..64363b4 100644 --- a/test/resque_test.rb +++ b/test/resque_test.rb @@ -12,7 +12,8 @@ class ResqueTest < MiniTest::Spec it "enqueues normal jobs" do Resque.enqueue FakeJob, "x" Resque.enqueue FakeJob, "x" - assert_equal 2, Resque.size(:normal) + queue_name = FakeJob.instance_variable_get(:@queue) + assert_equal 2, Resque.size(queue_name) end it "is not able to report if a non-unique job was enqueued" do @@ -26,19 +27,25 @@ class ResqueTest < MiniTest::Spec describe "#enqueue_to" do describe "non-unique job" do it "should return true if job was enqueued" do - assert Resque.enqueue_to(:normal, FakeJob) - assert Resque.enqueue_to(:normal, FakeJob) + queue_name = FakeJob.instance_variable_get(:@queue) + assert Resque.enqueue_to(queue_name, FakeJob) + assert Resque.enqueue_to(queue_name, FakeJob) end end describe "unique job" do + before do + @queue_name = FakeUniqueJob.instance_variable_get(:@queue) + end + it "should return true if job was enqueued" do - assert Resque.enqueue_to(:normal, FakeUniqueJob) + assert Resque.enqueue_to(@queue_name, FakeUniqueJob) end it "should return nil if job already existed" do - Resque.enqueue_to(:normal, FakeUniqueJob) - assert_nil Resque.enqueue_to(:normal, FakeUniqueJob) + Resque.enqueue_to(@queue_name, FakeUniqueJob) + assert Resque.enqueued?(FakeUniqueJob) + assert_nil Resque.enqueue_to(@queue_name, FakeUniqueJob) end end end diff --git a/test/test_helper.rb b/test/test_helper.rb index 67a7e1a..3e19df5 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -4,11 +4,14 @@ end require "minitest/autorun" +require "minitest/reporters" require "resque_solo" require "fake_jobs" -require "fakeredis" +require "fakeredis/minitest" begin require "pry-byebug" rescue LoadError # ignore end + +Minitest::Reporters.use! [Minitest::Reporters::DefaultReporter.new({ color: true })] From 28d590c6fbe484b9795f648a522c76211870800d Mon Sep 17 00:00:00 2001 From: Bruce Krysiak Date: Mon, 14 Nov 2016 19:40:40 -0800 Subject: [PATCH 3/8] Better locking via SETNX and use around_hook to mark unqueued after job is performed --- lib/resque_ext/job.rb | 27 --------------------------- lib/resque_solo/queue.rb | 5 +++-- lib/resque_solo/unique_job.rb | 20 +++++++++++++++++--- test/job_test.rb | 7 +++---- test/test_helper.rb | 4 ++++ 5 files changed, 27 insertions(+), 36 deletions(-) diff --git a/lib/resque_ext/job.rb b/lib/resque_ext/job.rb index cce85b3..bc97b5f 100644 --- a/lib/resque_ext/job.rb +++ b/lib/resque_ext/job.rb @@ -1,39 +1,12 @@ module Resque class Job class << self - # Mark an item as queued - def create_solo(queue, klass, *args) - item = { class: klass.to_s, args: args } - if Resque.inline? || !ResqueSolo::Queue.is_unique?(item) - return create_without_solo(queue, klass, *args) - end - return "EXISTED" if ResqueSolo::Queue.queued?(queue, item) - create_return_value = false - # redis transaction block - Resque.redis.multi do - create_return_value = create_without_solo(queue, klass, *args) - ResqueSolo::Queue.mark_queued(queue, item) - end - create_return_value - end - - # Mark an item as unqueued - def reserve_solo(queue) - item = reserve_without_solo(queue) - ResqueSolo::Queue.mark_unqueued(queue, item) if item && !Resque.inline? - item - end - # Mark destroyed jobs as unqueued def destroy_solo(queue, klass, *args) ResqueSolo::Queue.destroy(queue, klass, *args) unless Resque.inline? destroy_without_solo(queue, klass, *args) end - alias_method :create_without_solo, :create - alias_method :create, :create_solo - alias_method :reserve_without_solo, :reserve - alias_method :reserve, :reserve_solo alias_method :destroy_without_solo, :destroy alias_method :destroy, :destroy_solo end diff --git a/lib/resque_solo/queue.rb b/lib/resque_solo/queue.rb index a405330..93234df 100644 --- a/lib/resque_solo/queue.rb +++ b/lib/resque_solo/queue.rb @@ -7,11 +7,12 @@ def queued?(queue, item) end def mark_queued(queue, item) - return unless is_unique?(item) key = unique_key(queue, item) - redis.set(key, 1) + res = redis.setnx(key, 1) + return false unless res ttl = item_ttl(item) redis.expire(key, ttl) if ttl >= 0 + res end def mark_unqueued(queue, job) diff --git a/lib/resque_solo/unique_job.rb b/lib/resque_solo/unique_job.rb index b2cd571..977fdb6 100644 --- a/lib/resque_solo/unique_job.rb +++ b/lib/resque_solo/unique_job.rb @@ -48,9 +48,23 @@ def lock_after_execution_period # We want this to run first in before_enqueue_hooks (which are alpha sorted), so name appropriately def before_enqueue_001_solo_job(*args) - queue_name = self.instance_variable_get(:@queue) - item = { class: self.to_s, args: args } - !ResqueSolo::Queue.queued?(queue_name, item) + queue_name, item = get_queue_and_item(*args) + # This returns false if the key was already set + ResqueSolo::Queue.mark_queued(queue_name, item) + end + + # Always marks unqueued, even on failure + def around_perform_solo_job(*args) + queue_name, item = get_queue_and_item(*args) + begin + yield + ensure + ResqueSolo::Queue.mark_unqueued(queue_name, item) + end + end + + def get_queue_and_item(*args) + [self.instance_variable_get(:@queue), { class: self.to_s, args: args }] end end diff --git a/test/job_test.rb b/test/job_test.rb index 8a92644..4dccd0e 100644 --- a/test/job_test.rb +++ b/test/job_test.rb @@ -15,7 +15,7 @@ class JobTest < MiniTest::Spec Resque.enqueue FakeUniqueJob, "foo" Resque.enqueue FakeUniqueJob, "foo" assert_equal 1, Resque.size(:unique) - Resque.reserve(:unique) + perform_one_manually(:unique) assert_equal 0, Resque.size(:unique) Resque.enqueue FakeUniqueJob, "foo" Resque.enqueue FakeUniqueJob, "foo" @@ -48,8 +48,7 @@ class JobTest < MiniTest::Spec it "mark jobs as unqueued when they raise an exception" do 2.times { Resque.enqueue(FailingUniqueJob, "foo") } assert_equal 1, Resque.size(:unique) - worker = Resque::Worker.new(:unique) - worker.work 0 + assert_raises { perform_one_manually(:unique) } assert_equal 0, Resque.size(:unique) 2.times { Resque.enqueue(FailingUniqueJob, "foo") } assert_equal 1, Resque.size(:unique) @@ -98,7 +97,7 @@ class JobTest < MiniTest::Spec it "honor lock_after_execution_period in the redis key" do Resque.enqueue UniqueJobWithLock - Resque.reserve(:unique_with_lock) + perform_one_manually(:unique_with_lock) keys = Resque.redis.keys "solo:queue:unique_with_lock:job:*" assert_equal 1, keys.length assert_in_delta UniqueJobWithLock.lock_after_execution_period, diff --git a/test/test_helper.rb b/test/test_helper.rb index 3e19df5..6d77005 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -15,3 +15,7 @@ end Minitest::Reporters.use! [Minitest::Reporters::DefaultReporter.new({ color: true })] + +def perform_one_manually(queue_name) + Resque::Job.reserve(queue_name).perform +end \ No newline at end of file From c88a3d302f9fef2158962f123d8030dd9c79ee93 Mon Sep 17 00:00:00 2001 From: Bruce Krysiak Date: Tue, 15 Nov 2016 13:24:31 -0800 Subject: [PATCH 4/8] Updates per @teeparham code review --- lib/resque_solo/unique_job.rb | 11 ++--------- test/resque_test.rb | 18 ++++++------------ 2 files changed, 8 insertions(+), 21 deletions(-) diff --git a/lib/resque_solo/unique_job.rb b/lib/resque_solo/unique_job.rb index 977fdb6..78bb47a 100644 --- a/lib/resque_solo/unique_job.rb +++ b/lib/resque_solo/unique_job.rb @@ -48,25 +48,18 @@ def lock_after_execution_period # We want this to run first in before_enqueue_hooks (which are alpha sorted), so name appropriately def before_enqueue_001_solo_job(*args) - queue_name, item = get_queue_and_item(*args) # This returns false if the key was already set - ResqueSolo::Queue.mark_queued(queue_name, item) + ResqueSolo::Queue.mark_queued(@queue, { class: self.to_s, args: args }) end # Always marks unqueued, even on failure def around_perform_solo_job(*args) - queue_name, item = get_queue_and_item(*args) begin yield ensure - ResqueSolo::Queue.mark_unqueued(queue_name, item) + ResqueSolo::Queue.mark_unqueued(@queue, { class: self.to_s, args: args }) end end - - def get_queue_and_item(*args) - [self.instance_variable_get(:@queue), { class: self.to_s, args: args }] - end - end end end diff --git a/test/resque_test.rb b/test/resque_test.rb index 64363b4..4c0b72e 100644 --- a/test/resque_test.rb +++ b/test/resque_test.rb @@ -12,8 +12,7 @@ class ResqueTest < MiniTest::Spec it "enqueues normal jobs" do Resque.enqueue FakeJob, "x" Resque.enqueue FakeJob, "x" - queue_name = FakeJob.instance_variable_get(:@queue) - assert_equal 2, Resque.size(queue_name) + assert_equal 2, Resque.size(:normal) end it "is not able to report if a non-unique job was enqueued" do @@ -27,25 +26,20 @@ class ResqueTest < MiniTest::Spec describe "#enqueue_to" do describe "non-unique job" do it "should return true if job was enqueued" do - queue_name = FakeJob.instance_variable_get(:@queue) - assert Resque.enqueue_to(queue_name, FakeJob) - assert Resque.enqueue_to(queue_name, FakeJob) + assert Resque.enqueue_to(:unique, FakeJob) + assert Resque.enqueue_to(:unique, FakeJob) end end describe "unique job" do - before do - @queue_name = FakeUniqueJob.instance_variable_get(:@queue) - end - it "should return true if job was enqueued" do - assert Resque.enqueue_to(@queue_name, FakeUniqueJob) + assert Resque.enqueue_to(:unique, FakeUniqueJob) end it "should return nil if job already existed" do - Resque.enqueue_to(@queue_name, FakeUniqueJob) + Resque.enqueue_to(:unique, FakeUniqueJob) assert Resque.enqueued?(FakeUniqueJob) - assert_nil Resque.enqueue_to(@queue_name, FakeUniqueJob) + assert_nil Resque.enqueue_to(:unique, FakeUniqueJob) end end end From bda08eaac0d5a8ca9a8198addc0c4dbfb065ffc0 Mon Sep 17 00:00:00 2001 From: Bruce Krysiak Date: Mon, 28 Nov 2016 13:26:40 -0800 Subject: [PATCH 5/8] Removing incorrect comment and simplifying hook name --- lib/resque_solo/unique_job.rb | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/resque_solo/unique_job.rb b/lib/resque_solo/unique_job.rb index 78bb47a..e40025c 100644 --- a/lib/resque_solo/unique_job.rb +++ b/lib/resque_solo/unique_job.rb @@ -46,8 +46,7 @@ def lock_after_execution_period @lock_after_execution_period ||= 0 end - # We want this to run first in before_enqueue_hooks (which are alpha sorted), so name appropriately - def before_enqueue_001_solo_job(*args) + def before_enqueue_solo_job(*args) # This returns false if the key was already set ResqueSolo::Queue.mark_queued(@queue, { class: self.to_s, args: args }) end From 0c86aae87131468846158432ca57d12d88b0d8c4 Mon Sep 17 00:00:00 2001 From: Bruce Krysiak Date: Mon, 28 Nov 2016 16:27:18 -0800 Subject: [PATCH 6/8] Actually deal with various failure cases properly --- lib/resque_ext/job.rb | 31 ++++++++++++++++++++++ lib/resque_solo/unique_job.rb | 11 +------- test/fake_jobs.rb | 48 +++++++++++++++++++++++++++++++++++ test/job_test.rb | 18 +++++++++++++ test/resque_test.rb | 10 ++++++++ 5 files changed, 108 insertions(+), 10 deletions(-) diff --git a/lib/resque_ext/job.rb b/lib/resque_ext/job.rb index bc97b5f..5e5f551 100644 --- a/lib/resque_ext/job.rb +++ b/lib/resque_ext/job.rb @@ -1,12 +1,43 @@ module Resque class Job + def perform_solo + res = nil + begin + res = perform_without_solo + ensure + ResqueSolo::Queue.mark_unqueued(@queue, self) + end + res + end + + alias_method :perform_without_solo, :perform + alias_method :perform, :perform_solo + class << self + # Mark an item as queued + def create_solo(queue, klass, *args) + item = { class: klass.to_s, args: args } + if Resque.inline? || !ResqueSolo::Queue.is_unique?(item) + return create_without_solo(queue, klass, *args) + end + + create_return_value = false + # redis transaction block + Resque.redis.multi do + create_return_value = create_without_solo(queue, klass, *args) + ResqueSolo::Queue.mark_queued(queue, item) + end + create_return_value + end + # Mark destroyed jobs as unqueued def destroy_solo(queue, klass, *args) ResqueSolo::Queue.destroy(queue, klass, *args) unless Resque.inline? destroy_without_solo(queue, klass, *args) end + alias_method :create_without_solo, :create + alias_method :create, :create_solo alias_method :destroy_without_solo, :destroy alias_method :destroy, :destroy_solo end diff --git a/lib/resque_solo/unique_job.rb b/lib/resque_solo/unique_job.rb index e40025c..f6509f3 100644 --- a/lib/resque_solo/unique_job.rb +++ b/lib/resque_solo/unique_job.rb @@ -48,16 +48,7 @@ def lock_after_execution_period def before_enqueue_solo_job(*args) # This returns false if the key was already set - ResqueSolo::Queue.mark_queued(@queue, { class: self.to_s, args: args }) - end - - # Always marks unqueued, even on failure - def around_perform_solo_job(*args) - begin - yield - ensure - ResqueSolo::Queue.mark_unqueued(@queue, { class: self.to_s, args: args }) - end + !ResqueSolo::Queue.queued?(@queue, { class: self.to_s, args: args }) end end end diff --git a/test/fake_jobs.rb b/test/fake_jobs.rb index 0e01ae8..e4e1ff0 100644 --- a/test/fake_jobs.rb +++ b/test/fake_jobs.rb @@ -36,3 +36,51 @@ class UniqueJobWithLock def self.perform(*_) end end + +class EnqueueFailUniqueJob + include Resque::Plugins::UniqueJob + @queue = :unique + + def self.perform(_) + end + + def self.before_enqueue_fail + false + end +end + +class EnqueueErrorUniqueJob + include Resque::Plugins::UniqueJob + @queue = :unique + + def self.perform(_) + end + + def self.before_enqueue_zzz_error + raise "Fail" + end +end + +class DontPerformUniqueJob + include Resque::Plugins::UniqueJob + @queue = :unique + + def self.perform(_) + end + + def self.before_perform_dont + raise Resque::Job::DontPerform + end +end + +class BeforePerformErrorUniqueJob + include Resque::Plugins::UniqueJob + @queue = :unique + + def self.perform(_) + end + + def self.before_perform_dont + raise "Fail" + end +end \ No newline at end of file diff --git a/test/job_test.rb b/test/job_test.rb index 4dccd0e..8f0b4f2 100644 --- a/test/job_test.rb +++ b/test/job_test.rb @@ -54,6 +54,24 @@ class JobTest < MiniTest::Spec assert_equal 1, Resque.size(:unique) end + it "mark jobs as unqueued when a before_perform filter raises an exception" do + 2.times { Resque.enqueue(BeforePerformErrorUniqueJob, "foo") } + assert_equal 1, Resque.size(:unique) + assert_raises { perform_one_manually(:unique) } + assert_equal 0, Resque.size(:unique) + 2.times { Resque.enqueue(BeforePerformErrorUniqueJob, "foo") } + assert_equal 1, Resque.size(:unique) + end + + it "mark jobs as unqueued when a before_perform filter raises a DontPerform exception" do + 2.times { Resque.enqueue(DontPerformUniqueJob, "foo") } + assert_equal 1, Resque.size(:unique) + assert_raises { perform_one_manually(:unique) } + assert_equal 0, Resque.size(:unique) + 2.times { Resque.enqueue(DontPerformUniqueJob, "foo") } + assert_equal 1, Resque.size(:unique) + end + it "report if a unique job is enqueued" do Resque.enqueue FakeUniqueJob, "foo" assert Resque.enqueued?(FakeUniqueJob, "foo") diff --git a/test/resque_test.rb b/test/resque_test.rb index 4c0b72e..3dd3c7c 100644 --- a/test/resque_test.rb +++ b/test/resque_test.rb @@ -41,6 +41,16 @@ class ResqueTest < MiniTest::Spec assert Resque.enqueued?(FakeUniqueJob) assert_nil Resque.enqueue_to(:unique, FakeUniqueJob) end + + it "should not mark enqueued if another before_enqueue hook fails" do + assert_nil Resque.enqueue_to(:unique, EnqueueFailUniqueJob), "Should not have been actually enqueued" + refute Resque.enqueued?(EnqueueFailUniqueJob), "Should not have been marked enqueued" + end + + it "should not mark enqueued if another before_enqueue hook errors" do + assert_raises { Resque.enqueue_to(:unique, EnqueueErrorUniqueJob) } + refute Resque.enqueued?(EnqueueErrorUniqueJob), "Should not have been marked enqueued" + end end end end From ccfea33ec767c06129f7f1ee0d7c7867fc2c617f Mon Sep 17 00:00:00 2001 From: Bruce Krysiak Date: Mon, 28 Nov 2016 17:03:23 -0800 Subject: [PATCH 7/8] Simplifying actual unqiue job creation monkey patch logic --- lib/resque_ext/job.rb | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/lib/resque_ext/job.rb b/lib/resque_ext/job.rb index 5e5f551..c4208c0 100644 --- a/lib/resque_ext/job.rb +++ b/lib/resque_ext/job.rb @@ -20,14 +20,7 @@ def create_solo(queue, klass, *args) if Resque.inline? || !ResqueSolo::Queue.is_unique?(item) return create_without_solo(queue, klass, *args) end - - create_return_value = false - # redis transaction block - Resque.redis.multi do - create_return_value = create_without_solo(queue, klass, *args) - ResqueSolo::Queue.mark_queued(queue, item) - end - create_return_value + ResqueSolo::Queue.mark_queued(queue, item) ? create_without_solo(queue, klass, *args) : false end # Mark destroyed jobs as unqueued From b9a8a9270dac5d8ac7bfa8caea5224f667375fcb Mon Sep 17 00:00:00 2001 From: Bruce Krysiak Date: Mon, 23 Oct 2017 22:10:52 -0700 Subject: [PATCH 8/8] Fixing merge error --- resque_solo.gemspec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resque_solo.gemspec b/resque_solo.gemspec index 32fb9e9..a8e25d9 100644 --- a/resque_solo.gemspec +++ b/resque_solo.gemspec @@ -21,6 +21,6 @@ Gem::Specification.new do |spec| spec.add_development_dependency "fakeredis", "~> 0.4" spec.add_development_dependency "minitest", "~> 5.8" spec.add_development_dependency "minitest-reporters", "~> 1.1" - spec.add_development_dependency "rake", "~> 11.2" + spec.add_development_dependency "rake", "~> 12.0" spec.add_development_dependency "m", "~> 1.5" end