diff --git a/lib/pgbus/configuration.rb b/lib/pgbus/configuration.rb index 9bcf5c7..6191616 100644 --- a/lib/pgbus/configuration.rb +++ b/lib/pgbus/configuration.rb @@ -485,6 +485,11 @@ def recurring_execution_retention=(value) # default formula provides. POOL_SIZE_WARN_THRESHOLD = 50 + # Connections needed per async worker: one for the reactor's serial + # execution, one for polling, one for headroom. Fibers share connections + # because only one runs at a time per reactor thread. + ASYNC_POOL_CONNECTIONS = 3 + def resolved_pool_size return pool_size if pool_size @@ -624,10 +629,19 @@ def sum_thread_counts(entries, default_threads:, group:) raise ArgumentError, "#{group} threads must be a positive integer, got #{threads.inspect}" end - threads + + if async_execution_mode?(entry) + ASYNC_POOL_CONNECTIONS + else + threads + end end end + def async_execution_mode?(entry) + execution_mode_for(entry) == :async + end + def warn_if_oversized(size) return unless size > POOL_SIZE_WARN_THRESHOLD diff --git a/spec/pgbus/allocation_budget_spec.rb b/spec/pgbus/allocation_budget_spec.rb index 41ae7d7..c1882e9 100644 --- a/spec/pgbus/allocation_budget_spec.rb +++ b/spec/pgbus/allocation_budget_spec.rb @@ -108,14 +108,17 @@ def transaction(...) = yield(self) end it "retains zero objects across 100 send_message cycles" do - 10.times { plain_client.send_message("default", small_payload) } - - # Scope to lib/pgbus/ so retained objects from external gems - # (async fiber scheduler hooks, connection_pool singletons, JSON - # parser caches) loaded by other specs in the same process don't - # pollute the measurement. This test validates that Pgbus's own - # send_message path leaks nothing. - report = MemoryProfiler.report(allow_files: "lib/pgbus") do + # Warmup: force all lazy initialization (gem globals, JSON parser + # caches, connection_pool singletons) to complete before measuring. + # The first MemoryProfiler run captures any one-time retained objects; + # the second run should retain zero — proving send_message itself + # doesn't leak. This two-pass approach is immune to Ruby version + # differences in GC timing and gem autoloading order. + 50.times { plain_client.send_message("default", small_payload) } + MemoryProfiler.report { 50.times { plain_client.send_message("default", small_payload) } } + GC.start + + report = MemoryProfiler.report do 100.times { plain_client.send_message("default", small_payload) } end diff --git a/spec/pgbus/configuration_spec.rb b/spec/pgbus/configuration_spec.rb index d49935a..58c610e 100644 --- a/spec/pgbus/configuration_spec.rb +++ b/spec/pgbus/configuration_spec.rb @@ -259,6 +259,39 @@ expect(warned_message).to match(/pool_size .* 62/) end + it "uses fewer connections for async workers (fibers share connections)" do + config.pool_size = nil + config.workers = [{ queues: %w[webhooks], threads: 100, execution_mode: :async }] + # Async workers need ~3 connections (reactor + polling + headroom), + # not 100 (one per fiber). Total: 3 + 1 dispatcher + 1 scheduler = 5 + expect(config.resolved_pool_size).to eq(5) + end + + it "mixes async and thread workers correctly" do + config.pool_size = nil + config.workers = [ + { queues: %w[webhooks], threads: 50, execution_mode: :async }, + { queues: %w[default], threads: 5 } + ] + # 3 (async) + 5 (threads) + 1 dispatcher + 1 scheduler = 10 + expect(config.resolved_pool_size).to eq(10) + end + + it "uses fewer connections for fiber mode (alias for async)" do + config.pool_size = nil + config.workers = [{ queues: %w[llm], threads: 200, execution_mode: :fiber }] + # 3 + 1 + 1 = 5 + expect(config.resolved_pool_size).to eq(5) + end + + it "honors global execution_mode when workers have no per-entry override" do + config.pool_size = nil + config.execution_mode = :async + config.workers = [{ queues: %w[default], threads: 50 }] + # Global async: 3 + 1 dispatcher + 1 scheduler = 5 + expect(config.resolved_pool_size).to eq(5) + end + it "does not warn for normal sizes" do config.pool_size = nil config.workers = [{ queues: %w[default], threads: 5 }]