-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Expand file tree
/
Copy pathasync_dataloader.rb
More file actions
154 lines (140 loc) · 5.23 KB
/
async_dataloader.rb
File metadata and controls
154 lines (140 loc) · 5.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# frozen_string_literal: true
module GraphQL
class Dataloader
class AsyncDataloader < Dataloader
def yield(source = Fiber[:__graphql_current_dataloader_source])
trace = Fiber[:__graphql_current_multiplex]&.current_trace
trace&.dataloader_fiber_yield(source)
Fiber[:graphql_dataloader_next_tick].wait
trace&.dataloader_fiber_resume(source)
nil
end
def run(trace_query_lazy: nil)
trace = Fiber[:__graphql_current_multiplex]&.current_trace
jobs_fiber_limit, total_fiber_limit = calculate_fiber_limit
job_fibers = []
next_job_fibers = []
source_tasks = []
next_source_tasks = []
first_pass = true
sources_condition = Async::Condition.new
jobs_condition = Async::Condition.new
trace&.begin_dataloader(self)
fiber_vars = get_fiber_variables
raised_error = nil
Sync do |root_task|
while first_pass || !job_fibers.empty?
first_pass = false
set_fiber_variables(fiber_vars)
run_pending_steps(job_fibers, next_job_fibers, source_tasks, jobs_fiber_limit, trace, root_task, jobs_condition)
while !source_tasks.empty? || @source_cache.each_value.any? { |group_sources| group_sources.each_value.any?(&:pending?) }
while (task = (source_tasks.shift || (((job_fibers.size + next_job_fibers.size + source_tasks.size + next_source_tasks.size) < total_fiber_limit) && spawn_source_task(root_task, sources_condition, trace))))
if task.alive?
root_task.yield
next_source_tasks << task
else
task.wait # re-raise errors
end
end
sources_condition.signal
source_tasks.concat(next_source_tasks)
next_source_tasks.clear
end
jobs_condition.signal
if !@lazies_at_depth.empty?
with_trace_query_lazy(trace_query_lazy) do
run_next_pending_lazies(job_fibers, trace, root_task, jobs_condition)
run_pending_steps(job_fibers, next_job_fibers, source_tasks, jobs_fiber_limit, trace, root_task, jobs_condition)
end
end
end
rescue StandardError => err
raised_error = err
end
if raised_error
raise raised_error
end
trace&.end_dataloader(self)
rescue UncaughtThrowError => e
throw e.tag, e.value
end
private
def run_pending_steps(job_fibers, next_job_fibers, source_tasks, jobs_fiber_limit, trace, parent_task, condition)
while (f = (job_fibers.shift || (((job_fibers.size + next_job_fibers.size + source_tasks.size) < jobs_fiber_limit) && spawn_job_task(trace, parent_task, condition))))
if f.alive?
parent_task.yield
next_job_fibers << f
else
f.wait # re-raise errors
end
end
job_fibers.concat(next_job_fibers)
next_job_fibers.clear
end
def spawn_job_task(trace, parent_task, condition)
if !@pending_jobs.empty?
fiber_vars = get_fiber_variables
parent_task.async do
trace&.dataloader_spawn_execution_fiber(@pending_jobs)
Fiber[:graphql_dataloader_next_tick] = condition
set_fiber_variables(fiber_vars)
while job = @pending_jobs.shift
job.call
end
cleanup_fiber
trace&.dataloader_fiber_exit
end
end
end
#### TODO DRY Had to duplicate to remove spawn_job_fiber
def run_next_pending_lazies(job_fibers, trace, parent_task, condition)
smallest_depth = nil
@lazies_at_depth.each_key do |depth_key|
smallest_depth ||= depth_key
if depth_key < smallest_depth
smallest_depth = depth_key
end
end
if smallest_depth
lazies = @lazies_at_depth.delete(smallest_depth)
if !lazies.empty?
lazies.each_with_index do |l, idx|
append_job { l.value }
end
job_fibers.unshift(spawn_job_task(trace, parent_task, condition))
end
end
end
def spawn_source_task(parent_task, condition, trace)
pending_sources = nil
@source_cache.each_value do |source_by_batch_params|
source_by_batch_params.each_value do |source|
if source.pending?
pending_sources ||= []
pending_sources << source
end
end
end
if pending_sources
fiber_vars = get_fiber_variables
parent_task.async do
trace&.dataloader_spawn_source_fiber(pending_sources)
set_fiber_variables(fiber_vars)
Fiber[:graphql_dataloader_next_tick] = condition
pending_sources.each do |s|
trace&.begin_dataloader_source(s)
s.run_pending_keys
trace&.end_dataloader_source(s)
end
nil
rescue StandardError => err
err
ensure
cleanup_fiber
trace&.dataloader_fiber_exit
end
end
end
end
end
end