diff --git a/lib/graphql/analysis/query_complexity.rb b/lib/graphql/analysis/query_complexity.rb index 4a90824b4c..c953855de4 100644 --- a/lib/graphql/analysis/query_complexity.rb +++ b/lib/graphql/analysis/query_complexity.rb @@ -27,14 +27,14 @@ def result future_complexity end when nil - subject.logger.warn <<~GRAPHQL + subject.logger.warn <<~MESSAGE GraphQL-Ruby's complexity cost system is getting some "breaking fixes" in a future version. See the migration notes at https://graphql-ruby.org/api-doc/#{GraphQL::VERSION}/GraphQL/Schema.html#complexity_cost_calculation_mode_for-class_method To opt into the future behavior, configure your schema (#{subject.schema.name ? subject.schema.name : subject.schema.ancestors}) with: complexity_cost_calculation_mode(:future) # or `:legacy`, `:compare` - GRAPHQL + MESSAGE max_possible_complexity(mode: :legacy) else raise ArgumentError, "Expected `:future`, `:legacy`, `:compare`, or `nil` from `#{query.schema}.complexity_cost_calculation_mode_for` but got: #{query.schema.complexity_cost_calculation_mode.inspect}" diff --git a/lib/graphql/dataloader/async_dataloader.rb b/lib/graphql/dataloader/async_dataloader.rb index 9781dda03b..ad9e6cc8d1 100644 --- a/lib/graphql/dataloader/async_dataloader.rb +++ b/lib/graphql/dataloader/async_dataloader.rb @@ -5,11 +5,7 @@ class AsyncDataloader < Dataloader def yield(source = Fiber[:__graphql_current_dataloader_source]) trace = Fiber[:__graphql_current_multiplex]&.current_trace trace&.dataloader_fiber_yield(source) - if (condition = Fiber[:graphql_dataloader_next_tick]) - condition.wait - else - Fiber.yield - end + Fiber[:graphql_dataloader_next_tick].wait trace&.dataloader_fiber_resume(source) nil end @@ -22,44 +18,49 @@ def run(trace_query_lazy: nil) source_tasks = [] next_source_tasks = [] first_pass = true + sources_condition = Async::Condition.new - manager = spawn_fiber do - trace&.begin_dataloader(self) + jobs_condition = Async::Condition.new + trace&.begin_dataloader(self) + fiber_vars = get_fiber_variables + raised_error = nil + Sync do |root_task| while first_pass || !job_fibers.empty? first_pass = false - fiber_vars = get_fiber_variables - - run_pending_steps(job_fibers, next_job_fibers, source_tasks, jobs_fiber_limit, trace) + set_fiber_variables(fiber_vars) + run_pending_steps(job_fibers, next_job_fibers, source_tasks, jobs_fiber_limit, trace, root_task, jobs_condition) - Sync do |root_task| - set_fiber_variables(fiber_vars) - while !source_tasks.empty? || @source_cache.each_value.any? { |group_sources| group_sources.each_value.any?(&:pending?) } - while (task = (source_tasks.shift || (((job_fibers.size + next_job_fibers.size + source_tasks.size + next_source_tasks.size) < total_fiber_limit) && spawn_source_task(root_task, sources_condition, trace)))) - if task.alive? - root_task.yield # give the source task a chance to run - next_source_tasks << task - end + while !source_tasks.empty? || @source_cache.each_value.any? { |group_sources| group_sources.each_value.any?(&:pending?) } + while (task = (source_tasks.shift || (((job_fibers.size + next_job_fibers.size + source_tasks.size + next_source_tasks.size) < total_fiber_limit) && spawn_source_task(root_task, sources_condition, trace)))) + if task.alive? + root_task.yield + next_source_tasks << task + else + task.wait # re-raise errors end - sources_condition.signal - source_tasks.concat(next_source_tasks) - next_source_tasks.clear end + + sources_condition.signal + source_tasks.concat(next_source_tasks) + next_source_tasks.clear end + jobs_condition.signal if !@lazies_at_depth.empty? with_trace_query_lazy(trace_query_lazy) do - run_next_pending_lazies(job_fibers, trace) - run_pending_steps(job_fibers, next_job_fibers, source_tasks, jobs_fiber_limit, trace) + run_next_pending_lazies(job_fibers, trace, root_task, jobs_condition) + run_pending_steps(job_fibers, next_job_fibers, source_tasks, jobs_fiber_limit, trace, root_task, jobs_condition) end end end - trace&.end_dataloader(self) + rescue StandardError => err + raised_error = err end - manager.resume - if manager.alive? - raise "Invariant: Manager didn't terminate successfully: #{manager}" + if raised_error + raise raised_error end + trace&.end_dataloader(self) rescue UncaughtThrowError => e throw e.tag, e.value @@ -67,19 +68,56 @@ def run(trace_query_lazy: nil) private - def run_pending_steps(job_fibers, next_job_fibers, source_tasks, jobs_fiber_limit, trace) - while (f = (job_fibers.shift || (((job_fibers.size + next_job_fibers.size + source_tasks.size) < jobs_fiber_limit) && spawn_job_fiber(trace)))) + def run_pending_steps(job_fibers, next_job_fibers, source_tasks, jobs_fiber_limit, trace, parent_task, condition) + while (f = (job_fibers.shift || (((job_fibers.size + next_job_fibers.size + source_tasks.size) < jobs_fiber_limit) && spawn_job_task(trace, parent_task, condition)))) if f.alive? - finished = run_fiber(f) - if !finished - next_job_fibers << f - end + parent_task.yield + next_job_fibers << f + else + f.wait # re-raise errors end end job_fibers.concat(next_job_fibers) next_job_fibers.clear end + def spawn_job_task(trace, parent_task, condition) + if !@pending_jobs.empty? + fiber_vars = get_fiber_variables + parent_task.async do + trace&.dataloader_spawn_execution_fiber(@pending_jobs) + Fiber[:graphql_dataloader_next_tick] = condition + set_fiber_variables(fiber_vars) + while job = @pending_jobs.shift + job.call + end + cleanup_fiber + trace&.dataloader_fiber_exit + end + end + end + + #### TODO DRY Had to duplicate to remove spawn_job_fiber + def run_next_pending_lazies(job_fibers, trace, parent_task, condition) + smallest_depth = nil + @lazies_at_depth.each_key do |depth_key| + smallest_depth ||= depth_key + if depth_key < smallest_depth + smallest_depth = depth_key + end + end + + if smallest_depth + lazies = @lazies_at_depth.delete(smallest_depth) + if !lazies.empty? + lazies.each_with_index do |l, idx| + append_job { l.value } + end + job_fibers.unshift(spawn_job_task(trace, parent_task, condition)) + end + end + end + def spawn_source_task(parent_task, condition, trace) pending_sources = nil @source_cache.each_value do |source_by_batch_params| @@ -102,6 +140,10 @@ def spawn_source_task(parent_task, condition, trace) s.run_pending_keys trace&.end_dataloader_source(s) end + nil + rescue StandardError => err + err + ensure cleanup_fiber trace&.dataloader_fiber_exit end diff --git a/spec/graphql/dataloader_spec.rb b/spec/graphql/dataloader_spec.rb index 00ca941b49..4baed62ef8 100644 --- a/spec/graphql/dataloader_spec.rb +++ b/spec/graphql/dataloader_spec.rb @@ -1243,17 +1243,34 @@ def assert_last_max_fiber_count(expected_last_max_fiber_count, message = nil) res = exec_query(query_str, context: { dataloader: fiber_counting_dataloader_class.new }) assert_nil res.context.dataloader.fiber_limit - assert_equal(10, FiberCounting.last_spawn_fiber_count) - assert_last_max_fiber_count((TESTING_EXEC_NEXT ? 9 : 9), "No limit works as expected") - res = exec_query(query_str, context: { dataloader: fiber_counting_dataloader_class.new(fiber_limit: 4) }) + extra_shortlived_jobs_fibers = if fiber_counting_dataloader_class < GraphQL::Dataloader::AsyncDataloader + 3 + else + 0 + end + assert_equal 10 + extra_shortlived_jobs_fibers, FiberCounting.last_spawn_fiber_count + assert_last_max_fiber_count(9, "No limit works as expected") + + extra_shortlived_jobs_fibers = if fiber_counting_dataloader_class < GraphQL::Dataloader::AsyncDataloader + 10 # more here because there are fewer jobs fibers running at any one time + else + 0 + end + res = schema.execute(query_str, context: { dataloader: fiber_counting_dataloader_class.new(fiber_limit: 4) }) assert_equal 4, res.context.dataloader.fiber_limit - assert_equal((TESTING_EXEC_NEXT ? 11 : 12), FiberCounting.last_spawn_fiber_count) + assert_equal (TESTING_EXEC_NEXT ? 11 : 12) + extra_shortlived_jobs_fibers, FiberCounting.last_spawn_fiber_count assert_last_max_fiber_count(4, "Limit of 4 works as expected") - res = exec_query(query_str, context: { dataloader: fiber_counting_dataloader_class.new(fiber_limit: 6) }) + extra_shortlived_jobs_fibers = if fiber_counting_dataloader_class < GraphQL::Dataloader::AsyncDataloader + 4 + else + 0 + end + + res = schema.execute(query_str, context: { dataloader: fiber_counting_dataloader_class.new(fiber_limit: 6) }) assert_equal 6, res.context.dataloader.fiber_limit - assert_equal(8, FiberCounting.last_spawn_fiber_count) + assert_equal 8 + extra_shortlived_jobs_fibers, FiberCounting.last_spawn_fiber_count assert_last_max_fiber_count(6, "Limit of 6 works as expected") end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index b5704c2e78..da8df08265 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -45,7 +45,7 @@ end # C methods aren't fair game in non-main Ractors -RUN_RACTOR_TESTS = (defined?(::Ractor) && !USING_C_PARSER && (ENV["TEST"].nil? || ENV["TEST"].include?("ractor_shareable"))) +RUN_RACTOR_TESTS = (defined?(::Ractor) && !USING_C_PARSER && !ENV["SKIP_RACTOR_TESTS"] && (ENV["TEST"].nil? || ENV["TEST"].include?("ractor_shareable"))) require "rake" require "graphql/rake_task"