Skip to content
4 changes: 2 additions & 2 deletions lib/graphql/analysis/query_complexity.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ def result
future_complexity
end
when nil
subject.logger.warn <<~GRAPHQL
subject.logger.warn <<~MESSAGE
GraphQL-Ruby's complexity cost system is getting some "breaking fixes" in a future version. See the migration notes at https://graphql-ruby.org/api-doc/#{GraphQL::VERSION}/GraphQL/Schema.html#complexity_cost_calculation_mode_for-class_method

To opt into the future behavior, configure your schema (#{subject.schema.name ? subject.schema.name : subject.schema.ancestors}) with:

complexity_cost_calculation_mode(:future) # or `:legacy`, `:compare`

GRAPHQL
MESSAGE
max_possible_complexity(mode: :legacy)
else
raise ArgumentError, "Expected `:future`, `:legacy`, `:compare`, or `nil` from `#{query.schema}.complexity_cost_calculation_mode_for` but got: #{query.schema.complexity_cost_calculation_mode.inspect}"
Expand Down
108 changes: 75 additions & 33 deletions lib/graphql/dataloader/async_dataloader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@ class AsyncDataloader < Dataloader
def yield(source = Fiber[:__graphql_current_dataloader_source])
trace = Fiber[:__graphql_current_multiplex]&.current_trace
trace&.dataloader_fiber_yield(source)
if (condition = Fiber[:graphql_dataloader_next_tick])
condition.wait
else
Fiber.yield
end
Fiber[:graphql_dataloader_next_tick].wait
trace&.dataloader_fiber_resume(source)
nil
end
Expand All @@ -22,64 +18,106 @@ def run(trace_query_lazy: nil)
source_tasks = []
next_source_tasks = []
first_pass = true

sources_condition = Async::Condition.new
manager = spawn_fiber do
trace&.begin_dataloader(self)
jobs_condition = Async::Condition.new
trace&.begin_dataloader(self)
fiber_vars = get_fiber_variables
raised_error = nil
Sync do |root_task|
while first_pass || !job_fibers.empty?
first_pass = false
fiber_vars = get_fiber_variables

run_pending_steps(job_fibers, next_job_fibers, source_tasks, jobs_fiber_limit, trace)
set_fiber_variables(fiber_vars)
run_pending_steps(job_fibers, next_job_fibers, source_tasks, jobs_fiber_limit, trace, root_task, jobs_condition)

Sync do |root_task|
set_fiber_variables(fiber_vars)
while !source_tasks.empty? || @source_cache.each_value.any? { |group_sources| group_sources.each_value.any?(&:pending?) }
while (task = (source_tasks.shift || (((job_fibers.size + next_job_fibers.size + source_tasks.size + next_source_tasks.size) < total_fiber_limit) && spawn_source_task(root_task, sources_condition, trace))))
if task.alive?
root_task.yield # give the source task a chance to run
next_source_tasks << task
end
while !source_tasks.empty? || @source_cache.each_value.any? { |group_sources| group_sources.each_value.any?(&:pending?) }
while (task = (source_tasks.shift || (((job_fibers.size + next_job_fibers.size + source_tasks.size + next_source_tasks.size) < total_fiber_limit) && spawn_source_task(root_task, sources_condition, trace))))
if task.alive?
root_task.yield
next_source_tasks << task
else
task.wait # re-raise errors
end
sources_condition.signal
source_tasks.concat(next_source_tasks)
next_source_tasks.clear
end

sources_condition.signal
source_tasks.concat(next_source_tasks)
next_source_tasks.clear
end
jobs_condition.signal

if !@lazies_at_depth.empty?
with_trace_query_lazy(trace_query_lazy) do
run_next_pending_lazies(job_fibers, trace)
run_pending_steps(job_fibers, next_job_fibers, source_tasks, jobs_fiber_limit, trace)
run_next_pending_lazies(job_fibers, trace, root_task, jobs_condition)
run_pending_steps(job_fibers, next_job_fibers, source_tasks, jobs_fiber_limit, trace, root_task, jobs_condition)
end
end
end
trace&.end_dataloader(self)
rescue StandardError => err
raised_error = err
end

manager.resume
if manager.alive?
raise "Invariant: Manager didn't terminate successfully: #{manager}"
if raised_error
raise raised_error
end
trace&.end_dataloader(self)

rescue UncaughtThrowError => e
throw e.tag, e.value
end

private

def run_pending_steps(job_fibers, next_job_fibers, source_tasks, jobs_fiber_limit, trace)
while (f = (job_fibers.shift || (((job_fibers.size + next_job_fibers.size + source_tasks.size) < jobs_fiber_limit) && spawn_job_fiber(trace))))
def run_pending_steps(job_fibers, next_job_fibers, source_tasks, jobs_fiber_limit, trace, parent_task, condition)
while (f = (job_fibers.shift || (((job_fibers.size + next_job_fibers.size + source_tasks.size) < jobs_fiber_limit) && spawn_job_task(trace, parent_task, condition))))
if f.alive?
finished = run_fiber(f)
if !finished
next_job_fibers << f
end
parent_task.yield
next_job_fibers << f
else
f.wait # re-raise errors
end
end
job_fibers.concat(next_job_fibers)
next_job_fibers.clear
end

def spawn_job_task(trace, parent_task, condition)
if !@pending_jobs.empty?
fiber_vars = get_fiber_variables
parent_task.async do
trace&.dataloader_spawn_execution_fiber(@pending_jobs)
Fiber[:graphql_dataloader_next_tick] = condition
set_fiber_variables(fiber_vars)
while job = @pending_jobs.shift
job.call
end
cleanup_fiber
trace&.dataloader_fiber_exit
end
end
end

#### TODO DRY Had to duplicate to remove spawn_job_fiber
def run_next_pending_lazies(job_fibers, trace, parent_task, condition)
smallest_depth = nil
@lazies_at_depth.each_key do |depth_key|
smallest_depth ||= depth_key
if depth_key < smallest_depth
smallest_depth = depth_key
end
end

if smallest_depth
lazies = @lazies_at_depth.delete(smallest_depth)
if !lazies.empty?
lazies.each_with_index do |l, idx|
append_job { l.value }
end
job_fibers.unshift(spawn_job_task(trace, parent_task, condition))
end
end
end

def spawn_source_task(parent_task, condition, trace)
pending_sources = nil
@source_cache.each_value do |source_by_batch_params|
Expand All @@ -102,6 +140,10 @@ def spawn_source_task(parent_task, condition, trace)
s.run_pending_keys
trace&.end_dataloader_source(s)
end
nil
rescue StandardError => err
err
ensure
cleanup_fiber
trace&.dataloader_fiber_exit
end
Expand Down
29 changes: 23 additions & 6 deletions spec/graphql/dataloader_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1243,17 +1243,34 @@ def assert_last_max_fiber_count(expected_last_max_fiber_count, message = nil)

res = exec_query(query_str, context: { dataloader: fiber_counting_dataloader_class.new })
assert_nil res.context.dataloader.fiber_limit
assert_equal(10, FiberCounting.last_spawn_fiber_count)
assert_last_max_fiber_count((TESTING_EXEC_NEXT ? 9 : 9), "No limit works as expected")

res = exec_query(query_str, context: { dataloader: fiber_counting_dataloader_class.new(fiber_limit: 4) })
extra_shortlived_jobs_fibers = if fiber_counting_dataloader_class < GraphQL::Dataloader::AsyncDataloader
3
else
0
end
assert_equal 10 + extra_shortlived_jobs_fibers, FiberCounting.last_spawn_fiber_count
assert_last_max_fiber_count(9, "No limit works as expected")

extra_shortlived_jobs_fibers = if fiber_counting_dataloader_class < GraphQL::Dataloader::AsyncDataloader
10 # more here because there are fewer jobs fibers running at any one time
else
0
end
res = schema.execute(query_str, context: { dataloader: fiber_counting_dataloader_class.new(fiber_limit: 4) })
assert_equal 4, res.context.dataloader.fiber_limit
assert_equal((TESTING_EXEC_NEXT ? 11 : 12), FiberCounting.last_spawn_fiber_count)
assert_equal (TESTING_EXEC_NEXT ? 11 : 12) + extra_shortlived_jobs_fibers, FiberCounting.last_spawn_fiber_count
assert_last_max_fiber_count(4, "Limit of 4 works as expected")

res = exec_query(query_str, context: { dataloader: fiber_counting_dataloader_class.new(fiber_limit: 6) })
extra_shortlived_jobs_fibers = if fiber_counting_dataloader_class < GraphQL::Dataloader::AsyncDataloader
4
else
0
end

res = schema.execute(query_str, context: { dataloader: fiber_counting_dataloader_class.new(fiber_limit: 6) })
assert_equal 6, res.context.dataloader.fiber_limit
assert_equal(8, FiberCounting.last_spawn_fiber_count)
assert_equal 8 + extra_shortlived_jobs_fibers, FiberCounting.last_spawn_fiber_count
assert_last_max_fiber_count(6, "Limit of 6 works as expected")
end

Expand Down
2 changes: 1 addition & 1 deletion spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
end

# C methods aren't fair game in non-main Ractors
RUN_RACTOR_TESTS = (defined?(::Ractor) && !USING_C_PARSER && (ENV["TEST"].nil? || ENV["TEST"].include?("ractor_shareable")))
RUN_RACTOR_TESTS = (defined?(::Ractor) && !USING_C_PARSER && !ENV["SKIP_RACTOR_TESTS"] && (ENV["TEST"].nil? || ENV["TEST"].include?("ractor_shareable")))

require "rake"
require "graphql/rake_task"
Expand Down
Loading