feat(process): add async/fiber execution mode for workers#112
feat(process): add async/fiber execution mode for workers#112
Conversation
Introduce an ExecutionPools abstraction (ThreadPool + AsyncPool) that allows workers to execute jobs as fibers on a single async reactor thread instead of a thread pool. This dramatically reduces PostgreSQL connection usage for I/O-bound workloads (HTTP calls, email, LLM APIs). Key changes: - ExecutionPools factory with .build(mode:, capacity:, on_state_change:) - ThreadPool: wraps Concurrent::FixedThreadPool with unified interface - AsyncPool: reactor thread + Async::Semaphore bounded fibers - Worker/Consumer refactored to use pool abstraction - Configuration: execution_mode attr with per-worker override support - CLI: --execution-mode flag - Supervisor: passes execution_mode to forked workers The async pool uses Thread::Queue as cross-thread inbox between the worker main loop and reactor, with on_state_change callback to wake the worker immediately when fibers complete. Boot synchronization via Thread::Queue ensures the reactor is ready before accepting work. PGMQ visibility timeout provides crash safety regardless of execution mode -- messages stay in PostgreSQL until archived. ## Test Coverage - ThreadPool spec: interface contract, capacity tracking, shutdown - AsyncPool spec: fiber execution, capacity bounds, shutdown, error handling, on_state_change callback, boot synchronization - Factory spec: mode routing, :fiber alias, invalid mode - Worker spec: async mode wiring, execution_mode in stats - Configuration spec: defaults, validation, per-worker override - CLI spec: --execution-mode flag ## Verification - [x] bundle exec rubocop passes (0 offenses) - [x] bundle exec rspec passes (264 new/modified specs, 0 failures) - [x] Zero regressions in existing test suite
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds a pluggable execution-pool abstraction with thread and async/fiber pool implementations, wires global and per-worker Changes
Sequence Diagram(s)sequenceDiagram
participant CLI as User/CLI
participant Supervisor as Supervisor
participant Config as Configuration
participant ExecPool as ExecutionPools
participant Pool as ThreadPool/<br/>AsyncPool
participant Worker as Worker
CLI->>Supervisor: fork_worker(worker_config)
Supervisor->>Config: execution_mode_for(worker_config)
Config-->>Supervisor: :threads / :async
Supervisor->>Worker: new(execution_mode: mode, ...)
Worker->>ExecPool: build(mode:, capacity:, on_state_change:)
alt mode == :threads
ExecPool->>Pool: ThreadPool.new(capacity:)
else mode == :async
ExecPool->>Pool: AsyncPool.new(capacity:, on_state_change:)
end
Pool-->>Worker: pool instance
Worker->>Worker: start run-loop
loop Process messages
Worker->>Pool: post { execute_task }
Pool->>Pool: reserve capacity -> execute -> restore capacity
Pool-->>Worker: on_state_change callback
end
sequenceDiagram
participant Consumer as Consumer
participant Config as Configuration
participant ExecPool as ExecutionPools
participant Pool as ThreadPool/<br/>AsyncPool
participant MessageBus as Message Bus
Consumer->>Config: determine execution_mode
Consumer->>ExecPool: build(mode:, capacity: threads)
ExecPool-->>Consumer: pool
Consumer->>Consumer: start poll loop
loop Fetch & Process
Consumer->>Pool: available_capacity
Pool-->>Consumer: free_slots
Consumer->>MessageBus: fetch(limit: free_slots)
MessageBus-->>Consumer: messages
Consumer->>Pool: post { process_message }
Pool->>Pool: execute concurrently
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related issues
Possibly related PRs
Suggested labels
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 9
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@benchmarks/connection_pool_bench.rb`:
- Around line 37-46: The count_active_connections method currently interpolates
prefix into the SQL and is vulnerable to injection; change the query to use a
parameterized call (e.g., conn.exec_params) passing the prefix value (wrapped
with '%' wildcards) as a bound parameter instead of interpolating, keeping the
same WHERE clause logic and preserving the existing PG::Error rescue behavior.
- Line 12: Remove the unused require by deleting the require "benchmark/ips"
statement so the script only loads what's needed; locate and remove the require
"benchmark/ips" line at the top of the file (there are no references to
Benchmark.ips elsewhere, so no other changes are required).
In `@benchmarks/execution_pool_bench.rb`:
- Around line 54-95: The benchmark's I/O-bound section (the Benchmark.ips block
that iterates CAPACITIES and calls build_thread_pool / build_async_pool) uses
sleep(0.01) which under MRI releases the GVL and may not demonstrate
fiber/non-blocking I/O advantages; update the I/O-bound section by adding a
clear comment above the block stating that sleep(0.01) is an approximation and
can bias results on MRI, and optionally replace the sleep-based workload with a
real non-blocking async I/O simulation (e.g., non-blocking socket or file I/O or
an async library) if you want more realistic fiber performance
comparisons—target the Benchmark.ips reports and the pool.post bodies where
sleep is invoked to make these changes.
In `@lib/pgbus/execution_pools.rb`:
- Around line 6-13: The build method in execution_pools.rb currently passes
on_state_change only to AsyncPool but not to ThreadPool, so update the
ThreadPool instantiation in build (the case for normalize_mode(mode) when
:threads) to forward the on_state_change parameter to ThreadPool.new (matching
AsyncPool.new), ensuring ThreadPool#initialize receives the callback and can
signal workers when capacity frees up.
In `@lib/pgbus/execution_pools/thread_pool.rb`:
- Around line 31-33: Document the semantics of idle? to clarify it means "has
some capacity available" (available_capacity.positive?) rather than "completely
idle"; add a brief comment above the idle? method explaining that idle? returns
true when the pool can accept more work (i.e., there are free worker slots), and
note its usage in the drain check (break if draining? && `@pool.idle`?) so future
readers understand the intended behavior.
- Around line 17-25: The post method decrements `@available_capacity` before
calling `@pool.post` but if `@pool.post` raises (e.g., pool shutdown) the decrement
is never restored because the ensure runs only inside the posted block; fix by
rescuing exceptions from `@pool.post`: after `@available_capacity.decrement`, wrap
the `@pool.post` call in a begin/rescue that on any exception increments
`@available_capacity` and calls `@on_state_change`&.call, then re-raises the
exception; keep the existing ensure inside the posted block unchanged so
capacity is also restored after normal execution.
In `@spec/pgbus/execution_pools/async_pool_spec.rb`:
- Around line 20-27: Consolidate the duplicate examples or make the "boots the
reactor thread synchronously" example assert something different: either merge
both expectations into the "starts fully available" example (keep
expect(pool.available_capacity).to eq(3) and also any other quick check) or
change the boot test to assert a boot-specific condition such as the reactor
thread being alive (e.g. reference pool.reactor_thread and assert it is alive)
or that boot completed without blocking using a Timeout wrapper; update the
example titled "boots the reactor thread synchronously" to use
pool.reactor_thread or a non-blocking assertion instead of repeating
expect(pool.available_capacity).to eq(3).
In `@spec/pgbus/execution_pools/thread_pool_spec.rb`:
- Line 95: Rubocop flagged a Style/Semicolon violation on the inline block
passed to pool.post; change the single-line block "pool.post { sleep 0.05;
result.set(:done) }" to a multi-line do...end block so each statement is on its
own line (use pool.post do ... sleep 0.05 then result.set(:done) end),
preserving the same behavior and referencing pool.post and result.set to locate
the code.
- Line 85: The line using pool.post { sleep 0.05; result.set(:done) } violates
RuboCop Style/Semicolon; replace the single-line block with a multi-line block
so the two expressions are on separate lines, e.g. use pool.post do ... end,
call sleep 0.05 on the first line and then call result.set(:done) on the next
line to keep pool.post and result.set unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI (base), Organization UI (inherited)
Review profile: ASSERTIVE
Plan: Pro
Run ID: 06311454-9801-4524-9648-eec3a46e3996
📒 Files selected for processing (19)
CLAUDE.mdREADME.mdbenchmarks/connection_pool_bench.rbbenchmarks/execution_pool_bench.rblib/pgbus/cli.rblib/pgbus/configuration.rblib/pgbus/execution_pools.rblib/pgbus/execution_pools/async_pool.rblib/pgbus/execution_pools/thread_pool.rblib/pgbus/process/consumer.rblib/pgbus/process/supervisor.rblib/pgbus/process/worker.rbspec/pgbus/cli_spec.rbspec/pgbus/configuration_spec.rbspec/pgbus/execution_pools/async_pool_spec.rbspec/pgbus/execution_pools/thread_pool_spec.rbspec/pgbus/execution_pools_spec.rbspec/pgbus/process/consumer_spec.rbspec/pgbus/process/worker_spec.rb
- Fix RSpec/RepeatedExample in async_pool_spec (distinct assertion) - Fix Style/Semicolon in thread_pool_spec (multi-line blocks) - Rename github-ci-failures command to github-review-failures for consistency with github-review-comments
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@spec/pgbus/execution_pools/async_pool_spec.rb`:
- Line 57: Replace the flaky fixed waits (the occurrences of "sleep 0.05" in the
async_pool_spec) with event-driven synchronization: have the worker thread(s)
signal test progress via a Thread::Queue (or a Monitor + ConditionVariable /
simple CountDownLatch), then in the spec replace each sleep 0.05 with a
queue.pop(timeout) (or condition wait with timeout) that fails the test on
timeout; apply this change to every "sleep 0.05" instance so tests wait for
explicit signals from the code under test rather than fixed sleeps.
In `@spec/pgbus/execution_pools/thread_pool_spec.rb`:
- Around line 22-43: Add a spec that verifies ThreadPool#post triggers the
on_state_change callback: arrange a spy or proc and assign it to the pool's
`@on_state_change` (or use pool.instance_variable_set(:`@on_state_change`,
callback)), post a job that completes (e.g., sets an IVar or flips a flag), and
assert the callback was invoked (use a Concurrent::IVar or a simple boolean
captured by the proc) after the job runs; target the ThreadPool#post behavior
and the `@on_state_change` invocation to ensure wake-up/re-scheduling contract is
covered.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI (base), Organization UI (inherited)
Review profile: ASSERTIVE
Plan: Pro
Run ID: b3ce1ffe-6233-43d7-bcb8-6468e21f3298
📒 Files selected for processing (3)
.claude/commands/github-review-failures.mdspec/pgbus/execution_pools/async_pool_spec.rbspec/pgbus/execution_pools/thread_pool_spec.rb
- Pass on_state_change callback to ThreadPool in factory (was only passed to AsyncPool — workers in thread mode missed wake signals) - Guard ThreadPool#post against capacity leak if pool.post raises - Remove unused benchmark/ips require from connection_pool_bench - Use exec_params in connection_pool_bench to avoid SQL interpolation - Add on_state_change callback test for ThreadPool
When Listener#stop closes the PG connection to interrupt
wait_for_notify, Ruby raises IOError ("stream closed in another
thread") rather than PG::Error. The rescue clause only caught
PG::Error, leaving IOError unhandled and dumping a stack trace
to stderr on every streamer shutdown.
Catch IOError before PG::Error since it indicates an expected
shutdown — the connection was deliberately closed by #stop.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@lib/pgbus/web/streamer/listener.rb`:
- Around line 116-120: The rescue for IOError currently only breaks when
`@running` is false and otherwise silently continues; change it so that when an
IOError occurs while `@running` is true it performs the same logging and reconnect
flow as the existing PG::Error handler (lines 22–26) instead of silently
continuing — i.e., log the IOError with context (using the same logger used for
PG::Error) and invoke the same reconnect/cleanup routine (the listener's
reconnect/connection-reset logic) so the listener recovers; keep the existing
break behavior when `@running` is false.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI (base), Organization UI (inherited)
Review profile: ASSERTIVE
Plan: Pro
Run ID: aff46f95-be4c-403a-b31e-72d13f9f0697
📒 Files selected for processing (1)
lib/pgbus/web/streamer/listener.rb
MemoryProfiler on Ruby 3.3 picks up lazily-initialized globals from gems loaded by other specs in the same process (e.g. async's fiber scheduler hooks). Force GC.start before the report so these objects are collected and not attributed to send_message calls. This consistently failed on CI Ruby 3.3 (retained=5) while passing on 3.4 and 4.0 where GC is more aggressive about collecting fiber-related global state.
When IOError fires while the listener is still running (not from #stop shutdown), log the error and reconnect — same as the PG::Error path. Without this, the loop would continue trying wait_for_notify on a dead connection.
Replace GC.start workaround with MemoryProfiler's allow_files filter. This scopes retained object tracking to lib/pgbus/ so external gem globals (async fiber hooks, connection_pool singletons, JSON parser caches) loaded by other specs in the same process are excluded. The test now validates that Pgbus's own send_message path retains zero objects — regardless of what other gems do globally.
Summary
Adds an optional async/fiber execution mode for Pgbus workers, inspired by Solid Queue PR #728. Workers can now execute jobs as fibers on a single async reactor thread instead of a thread pool, dramatically reducing PostgreSQL connection usage for I/O-bound workloads.
ThreadPoolandAsyncPoolbackends behind a unified interfaceAsync::Semaphorebounded fibers,Thread::Queuecross-thread inbox, boot synchronization, fatal error propagationexecution_modesetting + per-worker override,:fiberalias for:async--execution-mode asyncflagWhy it's safe
Messages stay in PGMQ with visibility timeout protection regardless of execution mode. If a fiber or worker crashes, VT expires and messages become available for re-read.
Test plan
pgbus start --execution-mode asyncSummary by CodeRabbit
New Features
Documentation
Benchmarks & Tests
Bug Fixes