-
Notifications
You must be signed in to change notification settings - Fork 237
Batch Support #142
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Batch Support #142
Changes from 38 commits
94e4371
8b694ac
07e0552
6cce814
2c6a519
5bffda3
7769a53
1ec4bff
503b888
a36d13d
09ce612
84cc97a
c37cde4
cb51f62
fd0dc75
8ba3ece
7da00ed
b94ce64
fa1a702
c02f4ad
e28d20e
bd85d74
0595283
18e016a
5a19cf5
093e06a
b096939
071b024
479a16d
d9ee1d6
62bc2ac
5516be4
28f8dc0
acf1767
ddd4c13
92087c4
db19c14
fcae152
9834e7e
46a353e
7ea178c
eb54d47
e0b6e38
3a725e2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,12 @@ | ||
| # frozen_string_literal: true | ||
|
|
||
| module SolidQueue | ||
| class Batch | ||
| class EmptyJob < (defined?(ApplicationJob) ? ApplicationJob : ActiveJob::Base) | ||
| def perform | ||
| # This job does nothing - it just exists to trigger batch completion | ||
| # The batch completion will be handled by the normal job_finished! flow | ||
| end | ||
| end | ||
| end | ||
| end |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,157 @@ | ||||||||||||||||||||||||||||||||
| # frozen_string_literal: true | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| module SolidQueue | ||||||||||||||||||||||||||||||||
| class Batch < Record | ||||||||||||||||||||||||||||||||
| include Trackable | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| has_many :jobs | ||||||||||||||||||||||||||||||||
| has_many :batch_executions, class_name: "SolidQueue::BatchExecution", dependent: :destroy | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| serialize :on_finish, coder: JSON | ||||||||||||||||||||||||||||||||
| serialize :on_success, coder: JSON | ||||||||||||||||||||||||||||||||
| serialize :on_failure, coder: JSON | ||||||||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think for these 3, I'd remove some repetition here and below: %w[ finish success failure ].each do |callback_type|
serialize "on_#{callback_type}", coder: JSON
define_method("on_#{callback_type}=") do |callback|
super serialize_callback(callback)
end
end |
||||||||||||||||||||||||||||||||
| serialize :metadata, coder: JSON | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| after_initialize :set_active_job_batch_id | ||||||||||||||||||||||||||||||||
| after_commit :start_batch, on: :create, unless: -> { ActiveRecord.respond_to?(:after_all_transactions_commit) } | ||||||||||||||||||||||||||||||||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are a couple places that use |
||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| mattr_accessor :maintenance_queue_name | ||||||||||||||||||||||||||||||||
| self.maintenance_queue_name = "default" | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| def enqueue(&block) | ||||||||||||||||||||||||||||||||
| raise "You cannot enqueue a batch that is already finished" if finished? | ||||||||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps use here a new error class, like |
||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| transaction do | ||||||||||||||||||||||||||||||||
| save! if new_record? | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| Batch.wrap_in_batch_context(id) do | ||||||||||||||||||||||||||||||||
| block&.call(self) | ||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| if ActiveRecord.respond_to?(:after_all_transactions_commit) | ||||||||||||||||||||||||||||||||
| ActiveRecord.after_all_transactions_commit do | ||||||||||||||||||||||||||||||||
| start_batch | ||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| def on_success=(value) | ||||||||||||||||||||||||||||||||
| super(serialize_callback(value)) | ||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| def on_failure=(value) | ||||||||||||||||||||||||||||||||
| super(serialize_callback(value)) | ||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| def on_finish=(value) | ||||||||||||||||||||||||||||||||
| super(serialize_callback(value)) | ||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| def metadata | ||||||||||||||||||||||||||||||||
| (super || {}).with_indifferent_access | ||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| def check_completion! | ||||||||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I'd name this just
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ahh, I see it calls |
||||||||||||||||||||||||||||||||
| return if finished? || !ready? | ||||||||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Under which circumstance could this be called for a batch that's not |
||||||||||||||||||||||||||||||||
| return if batch_executions.limit(1).exists? | ||||||||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why the |
||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| rows = Batch | ||||||||||||||||||||||||||||||||
| .where(id: id) | ||||||||||||||||||||||||||||||||
| .unfinished | ||||||||||||||||||||||||||||||||
| .empty_executions | ||||||||||||||||||||||||||||||||
| .update_all(finished_at: Time.current) | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| return if rows.zero? | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| with_lock do | ||||||||||||||||||||||||||||||||
| failed = jobs.joins(:failed_execution).count | ||||||||||||||||||||||||||||||||
| finished_attributes = {} | ||||||||||||||||||||||||||||||||
| if failed > 0 | ||||||||||||||||||||||||||||||||
| finished_attributes[:failed_at] = Time.current | ||||||||||||||||||||||||||||||||
| finished_attributes[:failed_jobs] = failed | ||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||
| finished_attributes[:completed_jobs] = total_jobs - failed | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| update!(finished_attributes) | ||||||||||||||||||||||||||||||||
| execute_callbacks | ||||||||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd like to rewrite this method to be more clear and to make it perfectly obvious what it does. But then I realise I'm not quite sure what the two main checks here do, before we update the batch. I mean these two checks: return if batch_executions.limit(1).exists? rows = Batch.where(id: id).unfinished.empty_executions.update_all(finished_at: Time.current)
return if rows.zero?Could you perhaps explain these again? 🙏🏻 |
||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| private | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| def set_active_job_batch_id | ||||||||||||||||||||||||||||||||
| self.active_job_batch_id ||= SecureRandom.uuid | ||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| def as_active_job(active_job_klass) | ||||||||||||||||||||||||||||||||
| active_job_klass.is_a?(ActiveJob::Base) ? active_job_klass : active_job_klass.new | ||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| def serialize_callback(value) | ||||||||||||||||||||||||||||||||
| return value if value.blank? | ||||||||||||||||||||||||||||||||
| active_job = as_active_job(value) | ||||||||||||||||||||||||||||||||
| # We can pick up batch ids from context, but callbacks should never be considered a part of the batch | ||||||||||||||||||||||||||||||||
| active_job.batch_id = nil | ||||||||||||||||||||||||||||||||
| active_job.serialize | ||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||
|
Comment on lines
+109
to
+116
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I'd inline
Suggested change
|
||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| def perform_completion_job(job_field, attrs) | ||||||||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd rename
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thinking more about the names here, we have |
||||||||||||||||||||||||||||||||
| active_job = ActiveJob::Base.deserialize(send(job_field)) | ||||||||||||||||||||||||||||||||
| active_job.send(:deserialize_arguments_if_needed) | ||||||||||||||||||||||||||||||||
| active_job.arguments = [ self ] + Array.wrap(active_job.arguments) | ||||||||||||||||||||||||||||||||
| SolidQueue::Job.enqueue_all([ active_job ]) | ||||||||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh! Any reason we can't just use |
||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| active_job.provider_job_id = Job.find_by(active_job_id: active_job.job_id).id | ||||||||||||||||||||||||||||||||
| attrs[job_field] = active_job.serialize | ||||||||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't do anything with the |
||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| def execute_callbacks | ||||||||||||||||||||||||||||||||
| if failed_at? | ||||||||||||||||||||||||||||||||
| perform_completion_job(:on_failure, {}) if on_failure.present? | ||||||||||||||||||||||||||||||||
| else | ||||||||||||||||||||||||||||||||
| perform_completion_job(:on_success, {}) if on_success.present? | ||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| perform_completion_job(:on_finish, {}) if on_finish.present? | ||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| def enqueue_empty_job | ||||||||||||||||||||||||||||||||
| Batch.wrap_in_batch_context(id) do | ||||||||||||||||||||||||||||||||
| EmptyJob.set(queue: self.class.maintenance_queue_name || "default").perform_later | ||||||||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think, since the
Suggested change
|
||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| def start_batch | ||||||||||||||||||||||||||||||||
| enqueue_empty_job if reload.total_jobs == 0 | ||||||||||||||||||||||||||||||||
| update!(enqueued_at: Time.current) | ||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| class << self | ||||||||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd move this to the beginning of the file, just to be consistent with other classes in the gem (nothing wrong with having it in the end, it's just for consistency 😅). |
||||||||||||||||||||||||||||||||
| def enqueue(on_success: nil, on_failure: nil, on_finish: nil, **metadata, &block) | ||||||||||||||||||||||||||||||||
| new.tap do |batch| | ||||||||||||||||||||||||||||||||
| batch.assign_attributes( | ||||||||||||||||||||||||||||||||
| on_success: on_success, | ||||||||||||||||||||||||||||||||
| on_failure: on_failure, | ||||||||||||||||||||||||||||||||
| on_finish: on_finish, | ||||||||||||||||||||||||||||||||
| metadata: metadata | ||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| batch.enqueue(&block) | ||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| def current_batch_id | ||||||||||||||||||||||||||||||||
| ActiveSupport::IsolatedExecutionState[:current_batch_id] | ||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| def wrap_in_batch_context(batch_id) | ||||||||||||||||||||||||||||||||
| previous_batch_id = current_batch_id.presence || nil | ||||||||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||||||||
| ActiveSupport::IsolatedExecutionState[:current_batch_id] = batch_id | ||||||||||||||||||||||||||||||||
| yield | ||||||||||||||||||||||||||||||||
| ensure | ||||||||||||||||||||||||||||||||
| ActiveSupport::IsolatedExecutionState[:current_batch_id] = previous_batch_id | ||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,68 @@ | ||
| # frozen_string_literal: true | ||
|
|
||
| module SolidQueue | ||
| class Batch | ||
| module Trackable | ||
| extend ActiveSupport::Concern | ||
|
|
||
| included do | ||
| scope :finished, -> { where.not(finished_at: nil) } | ||
| scope :succeeded, -> { finished.where(failed_at: nil) } | ||
| scope :unfinished, -> { where(finished_at: nil) } | ||
| scope :failed, -> { where.not(failed_at: nil) } | ||
| scope :empty_executions, -> { | ||
| where(<<~SQL) | ||
| NOT EXISTS ( | ||
| SELECT 1 FROM solid_queue_batch_executions | ||
| WHERE solid_queue_batch_executions.batch_id = solid_queue_batches.id | ||
| LIMIT 1 | ||
| ) | ||
| SQL | ||
| } | ||
| end | ||
|
|
||
| def status | ||
| if finished? | ||
| failed? ? "failed" : "completed" | ||
| elsif enqueued_at.present? | ||
| "processing" | ||
| else | ||
| "pending" | ||
| end | ||
| end | ||
|
|
||
| def failed? | ||
| failed_at.present? | ||
| end | ||
|
|
||
| def succeeded? | ||
| finished? && !failed? | ||
| end | ||
|
|
||
| def finished? | ||
| finished_at.present? | ||
| end | ||
|
|
||
| def ready? | ||
| enqueued_at.present? | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Something a bit confusing with the status here is that |
||
| end | ||
|
|
||
| def completed_jobs | ||
| finished? ? self[:completed_jobs] : total_jobs - batch_executions.count | ||
| end | ||
|
|
||
| def failed_jobs | ||
| finished? ? self[:failed_jobs] : jobs.joins(:failed_execution).count | ||
| end | ||
|
|
||
| def pending_jobs | ||
| finished? ? 0 : batch_executions.count | ||
| end | ||
|
|
||
| def progress_percentage | ||
| return 0 if total_jobs == 0 | ||
| ((completed_jobs + failed_jobs) * 100.0 / total_jobs).round(2) | ||
| end | ||
| end | ||
| end | ||
| end | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| # frozen_string_literal: true | ||
|
|
||
| module SolidQueue | ||
| class BatchExecution < Record | ||
| belongs_to :job, optional: true | ||
| belongs_to :batch | ||
|
|
||
| after_commit :check_completion, on: :destroy | ||
|
|
||
| private | ||
| def check_completion | ||
| batch = Batch.find_by(id: batch_id) | ||
| batch.check_completion! if batch.present? | ||
| end | ||
|
|
||
| class << self | ||
| def create_all_from_jobs(jobs) | ||
| batch_jobs = jobs.select { |job| job.batch_id.present? } | ||
| return if batch_jobs.empty? | ||
|
|
||
| batch_jobs.group_by(&:batch_id).each do |batch_id, jobs| | ||
| BatchExecution.insert_all!(jobs.map { |job| | ||
| { batch_id:, job_id: job.respond_to?(:provider_job_id) ? job.provider_job_id : job.id } | ||
| }) | ||
|
|
||
| total = jobs.size | ||
| SolidQueue::Batch.where(id: batch_id).update_all([ "total_jobs = total_jobs + ?", total ]) | ||
| end | ||
| end | ||
| end | ||
| end | ||
| end |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| # frozen_string_literal: true | ||
|
|
||
| module SolidQueue | ||
| class Execution | ||
| module Batchable | ||
| extend ActiveSupport::Concern | ||
|
|
||
| included do | ||
| after_create :update_batch_progress, if: -> { job.batch_id? } | ||
| end | ||
|
|
||
| private | ||
| def update_batch_progress | ||
| if is_a?(FailedExecution) | ||
| # FailedExecutions are only created when the job is done retrying | ||
| job.batch_execution&.destroy! | ||
| end | ||
| rescue => e | ||
| Rails.logger.error "[SolidQueue] Failed to notify batch #{job.batch_id} about job #{job.id} failure: #{e.message}" | ||
| end | ||
| end | ||
| end | ||
| end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we don't need a custom attribute for this 🤔 Since we're setting it in the initialiser, we can also set it like this:
And we can delete the
maintenance_queue_nameeverywhere inBatch.