From 477bb8802f7ffc603ec6569281134327553aafb9 Mon Sep 17 00:00:00 2001 From: mhenrixon Date: Fri, 10 Apr 2026 09:41:58 +0200 Subject: [PATCH 1/7] feat(process): add async/fiber execution mode for workers Introduce an ExecutionPools abstraction (ThreadPool + AsyncPool) that allows workers to execute jobs as fibers on a single async reactor thread instead of a thread pool. This dramatically reduces PostgreSQL connection usage for I/O-bound workloads (HTTP calls, email, LLM APIs). Key changes: - ExecutionPools factory with .build(mode:, capacity:, on_state_change:) - ThreadPool: wraps Concurrent::FixedThreadPool with unified interface - AsyncPool: reactor thread + Async::Semaphore bounded fibers - Worker/Consumer refactored to use pool abstraction - Configuration: execution_mode attr with per-worker override support - CLI: --execution-mode flag - Supervisor: passes execution_mode to forked workers The async pool uses Thread::Queue as cross-thread inbox between the worker main loop and reactor, with on_state_change callback to wake the worker immediately when fibers complete. Boot synchronization via Thread::Queue ensures the reactor is ready before accepting work. PGMQ visibility timeout provides crash safety regardless of execution mode -- messages stay in PostgreSQL until archived. ## Test Coverage - ThreadPool spec: interface contract, capacity tracking, shutdown - AsyncPool spec: fiber execution, capacity bounds, shutdown, error handling, on_state_change callback, boot synchronization - Factory spec: mode routing, :fiber alias, invalid mode - Worker spec: async mode wiring, execution_mode in stats - Configuration spec: defaults, validation, per-worker override - CLI spec: --execution-mode flag ## Verification - [x] bundle exec rubocop passes (0 offenses) - [x] bundle exec rspec passes (264 new/modified specs, 0 failures) - [x] Zero regressions in existing test suite --- CLAUDE.md | 1 + README.md | 30 +++ benchmarks/connection_pool_bench.rb | 129 +++++++++++++ benchmarks/execution_pool_bench.rb | 167 +++++++++++++++++ lib/pgbus/cli.rb | 7 + lib/pgbus/configuration.rb | 17 +- lib/pgbus/execution_pools.rb | 27 +++ lib/pgbus/execution_pools/async_pool.rb | 175 ++++++++++++++++++ lib/pgbus/execution_pools/thread_pool.rb | 56 ++++++ lib/pgbus/process/consumer.rb | 9 +- lib/pgbus/process/supervisor.rb | 6 +- lib/pgbus/process/worker.rb | 28 ++- spec/pgbus/cli_spec.rb | 18 ++ spec/pgbus/configuration_spec.rb | 49 +++++ spec/pgbus/execution_pools/async_pool_spec.rb | 146 +++++++++++++++ .../pgbus/execution_pools/thread_pool_spec.rb | 118 ++++++++++++ spec/pgbus/execution_pools_spec.rb | 56 ++++++ spec/pgbus/process/consumer_spec.rb | 11 +- spec/pgbus/process/worker_spec.rb | 39 +++- 19 files changed, 1067 insertions(+), 22 deletions(-) create mode 100644 benchmarks/connection_pool_bench.rb create mode 100644 benchmarks/execution_pool_bench.rb create mode 100644 lib/pgbus/execution_pools.rb create mode 100644 lib/pgbus/execution_pools/async_pool.rb create mode 100644 lib/pgbus/execution_pools/thread_pool.rb create mode 100644 spec/pgbus/execution_pools/async_pool_spec.rb create mode 100644 spec/pgbus/execution_pools/thread_pool_spec.rb create mode 100644 spec/pgbus/execution_pools_spec.rb diff --git a/CLAUDE.md b/CLAUDE.md index 1262275..176fc87 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -55,6 +55,7 @@ bundle exec rake # Both Layer 6: Dashboard app/controllers/pgbus/, app/views/pgbus/ Layer 5: CLI lib/pgbus/cli.rb Layer 4: Process Model lib/pgbus/process/ (supervisor, worker, dispatcher, consumer) + Execution Pools lib/pgbus/execution_pools/ (thread_pool, async_pool) Layer 3: Event Bus lib/pgbus/event_bus/ (publisher, subscriber, registry, handler) Layer 2: ActiveJob lib/pgbus/active_job/ (adapter, executor) Layer 1: Client lib/pgbus/client.rb (PGMQ wrapper) diff --git a/README.md b/README.md index 8516901..767d31f 100644 --- a/README.md +++ b/README.md @@ -463,6 +463,36 @@ end When a limit is hit, the worker drains its thread pool, exits, and the supervisor forks a fresh process. RSS memory is sampled from `/proc/self/statm` (Linux) or `ps -o rss` (macOS). +### Async execution mode (fibers) + +Workers can optionally execute jobs as fibers instead of threads. This is ideal for I/O-bound workloads (HTTP calls, email delivery, LLM API calls) where jobs spend most of their time waiting on network I/O. + +```ruby +Pgbus.configure do |config| + # Global: all workers use async mode + config.execution_mode = :async + + # Or per-worker: mix thread and async workers + config.workers = [ + { queues: %w[webhooks emails], threads: 100, execution_mode: :async }, + { queues: %w[default], threads: 5 } # stays thread-based + ] +end +``` + +**Prerequisites:** + +1. Add `gem "async"` to your Gemfile +2. Set `config.active_support.isolation_level = :fiber` in your Rails app + +**Why it reduces connections:** In thread mode, each thread holds a database connection while waiting on I/O. With 50 threads, that's 50 connections. In async mode, 50 fibers share 3-5 connections because fibers yield during I/O and only one runs at a time. + +**CLI flag:** `pgbus start --execution-mode async` + +**Safety:** Messages stay in PGMQ with visibility timeout protection regardless of execution mode. If a fiber or worker crashes, the visibility timeout expires and messages become available for re-read. No data loss risk. + +**Not recommended for:** CPU-bound jobs (image processing, heavy computation). These block the single reactor thread and should use thread mode. + ## Routing and ordering How messages flow between producers and the workers that handle them: priority sub-queues, consumer priority for active/standby workers, and single-active-consumer for strict ordering. diff --git a/benchmarks/connection_pool_bench.rb b/benchmarks/connection_pool_bench.rb new file mode 100644 index 0000000..c7f9afe --- /dev/null +++ b/benchmarks/connection_pool_bench.rb @@ -0,0 +1,129 @@ +# frozen_string_literal: true + +# Connection Pool Benchmark +# Measures peak active Postgres connections under varying concurrency +# for ThreadPool vs AsyncPool execution modes. +# +# REQUIRES: PGBUS_DATABASE_URL environment variable +# +# Usage: +# PGBUS_DATABASE_URL=postgres://user@localhost/pgbus_test ruby benchmarks/connection_pool_bench.rb + +require "benchmark/ips" +require "json" +require "securerandom" +require "concurrent" +require "pgbus" +require "pg" + +DATABASE_URL = ENV.fetch("PGBUS_DATABASE_URL") do + warn "PGBUS_DATABASE_URL not set. This benchmark requires a real PostgreSQL database." + warn "Example: PGBUS_DATABASE_URL=postgres://user@localhost:5432/pgbus_test ruby benchmarks/connection_pool_bench.rb" + exit 1 +end + +Pgbus.configure do |c| + c.logger = Logger.new(IO::NULL) + c.queue_prefix = "pgbus_connbench" + c.default_queue = "default" + c.stats_enabled = false + c.database_url = DATABASE_URL +end + +def monitor_connection(url) + PG.connect(url) +end + +def count_active_connections(conn, prefix = "pgbus_connbench") + result = conn.exec( + "SELECT count(*) FROM pg_stat_activity " \ + "WHERE application_name LIKE '%#{prefix}%' OR query LIKE '%#{prefix}%'" + ) + result[0]["count"].to_i +rescue PG::Error => e + warn "Connection count query failed: #{e.message}" + 0 +end + +def update_peak(peak, current) + old = peak.value + peak.compare_and_set(old, current) if current > old +end + +def measure_peak_connections(mode:, capacity:, tasks:, monitor_conn:) + pool = if mode == :threads + Pgbus::ExecutionPools::ThreadPool.new(capacity: capacity) + else + Pgbus::ExecutionPools::AsyncPool.new(capacity: capacity) + end + + peak = Concurrent::AtomicFixnum.new(0) + done = Concurrent::CountDownLatch.new(tasks) + + # Monitor thread samples pg_stat_activity during the benchmark + stop_monitoring = Concurrent::AtomicBoolean.new(false) + monitor = Thread.new do + while stop_monitoring.false? + current = count_active_connections(monitor_conn) + update_peak(peak, current) + sleep 0.01 + end + end + + tasks.times do + pool.post do + # Simulate a real job: query the database + conn = PG.connect(DATABASE_URL) + conn.exec("SELECT pg_sleep(0.01)") + conn.close + done.count_down + rescue PG::Error + done.count_down + end + end + + done.wait(30) + stop_monitoring.make_true + monitor.join(2) + + pool.shutdown + pool.wait_for_termination(10) + + peak.value +end + +puts "=" * 70 +puts "Connection Pool Benchmark" +puts "Database: #{DATABASE_URL.sub(%r{//[^@]+@}, "//***@")}" +puts "=" * 70 + +monitor_conn = monitor_connection(DATABASE_URL) + +configs = [ + { mode: :threads, capacity: 5, tasks: 20 }, + { mode: :threads, capacity: 10, tasks: 40 }, + { mode: :threads, capacity: 25, tasks: 100 }, + { mode: :async, capacity: 50, tasks: 200 }, + { mode: :async, capacity: 100, tasks: 400 } +] + +header = ["Mode", "Capacity", "Tasks", "Peak Connections"] +puts format("\n%-12s %-10s %-8s %-20s", *header) +puts "-" * 55 + +configs.each do |cfg| + peak = measure_peak_connections( + mode: cfg[:mode], + capacity: cfg[:capacity], + tasks: cfg[:tasks], + monitor_conn: monitor_conn + ) + puts format("%-12s %-10d %-8d %-20d", cfg[:mode], cfg[:capacity], cfg[:tasks], peak) + sleep 1 # let connections drain between runs +end + +monitor_conn.close + +puts "\nExpected: async mode should show significantly fewer peak connections" +puts "than thread mode, regardless of fiber capacity." +puts "\nDone." diff --git a/benchmarks/execution_pool_bench.rb b/benchmarks/execution_pool_bench.rb new file mode 100644 index 0000000..34a9d40 --- /dev/null +++ b/benchmarks/execution_pool_bench.rb @@ -0,0 +1,167 @@ +# frozen_string_literal: true + +require_relative "bench_helper" + +# Execution Pool Benchmark +# Compares ThreadPool vs AsyncPool throughput, latency, and memory. +# +# Usage: +# ruby benchmarks/execution_pool_bench.rb + +CAPACITIES = [10, 50].freeze + +def build_thread_pool(capacity) + Pgbus::ExecutionPools::ThreadPool.new(capacity: capacity) +end + +def build_async_pool(capacity) + Pgbus::ExecutionPools::AsyncPool.new(capacity: capacity) +end + +# --- 1. No-op throughput (scheduling overhead) --- + +puts "=" * 70 +puts "1. No-op throughput — scheduling overhead (IPS)" +puts "=" * 70 + +CAPACITIES.each do |cap| + Benchmark.ips do |x| + x.config(warmup: 2, time: 5) + + x.report("ThreadPool(#{cap}) no-op") do + pool = build_thread_pool(cap) + done = Concurrent::CountDownLatch.new(cap) + cap.times { pool.post { done.count_down } } + done.wait(10) + pool.shutdown + pool.wait_for_termination(5) + end + + x.report("AsyncPool(#{cap}) no-op") do + pool = build_async_pool(cap) + done = Concurrent::CountDownLatch.new(cap) + cap.times { pool.post { done.count_down } } + done.wait(10) + pool.shutdown + pool.wait_for_termination(5) + end + + x.compare! + end + puts +end + +# --- 2. I/O-bound throughput (fiber advantage zone) --- + +puts "=" * 70 +puts "2. I/O-bound throughput — sleep(0.01) simulating DB I/O" +puts "=" * 70 + +CAPACITIES.each do |cap| + Benchmark.ips do |x| + x.config(warmup: 1, time: 5) + + x.report("ThreadPool(#{cap}) I/O") do + pool = build_thread_pool(cap) + done = Concurrent::CountDownLatch.new(cap) + cap.times do + pool.post do + sleep(0.01) + done.count_down + end + end + done.wait(10) + pool.shutdown + pool.wait_for_termination(5) + end + + x.report("AsyncPool(#{cap}) I/O") do + pool = build_async_pool(cap) + done = Concurrent::CountDownLatch.new(cap) + cap.times do + pool.post do + sleep(0.01) + done.count_down + end + end + done.wait(10) + pool.shutdown + pool.wait_for_termination(5) + end + + x.compare! + end + puts +end + +# --- 3. Memory overhead --- + +puts "=" * 70 +puts "3. Memory overhead — 1000 tasks" +puts "=" * 70 + +[10, 50].each do |cap| + puts "\n--- Capacity: #{cap} ---" + + %i[threads async].each do |mode| + report = MemoryProfiler.report do + 100.times do + pool = if mode == :threads + build_thread_pool(cap) + else + build_async_pool(cap) + end + done = Concurrent::CountDownLatch.new(cap) + cap.times { pool.post { done.count_down } } + done.wait(10) + pool.shutdown + pool.wait_for_termination(5) + end + end + + puts "\n#{mode.upcase} pool (#{cap} capacity × 100 iterations):" + puts " Total allocated: #{report.total_allocated_memsize} bytes" + puts " Total retained: #{report.total_retained_memsize} bytes" + puts " Allocated objects: #{report.total_allocated}" + puts " Retained objects: #{report.total_retained}" + end +end + +# --- 4. Latency percentiles --- + +puts "\n#{"=" * 70}" +puts "4. Latency percentiles — single task completion time" +puts "=" * 70 + +SAMPLES = 500 + +%i[threads async].each do |mode| + pool = mode == :threads ? build_thread_pool(5) : build_async_pool(5) + latencies = [] + + SAMPLES.times do + start = Process.clock_gettime(Process::CLOCK_MONOTONIC) + done = Concurrent::Event.new + pool.post { done.set } + done.wait(5) + elapsed = (Process.clock_gettime(Process::CLOCK_MONOTONIC) - start) * 1_000_000 # microseconds + latencies << elapsed + end + + pool.shutdown + pool.wait_for_termination(5) + + latencies.sort! + p50 = latencies[(SAMPLES * 0.50).to_i] + p95 = latencies[(SAMPLES * 0.95).to_i] + p99 = latencies[(SAMPLES * 0.99).to_i] + + puts "\n#{mode.upcase} pool latency (#{SAMPLES} samples):" + puts " p50: #{p50.round(1)} µs" + puts " p95: #{p95.round(1)} µs" + puts " p99: #{p99.round(1)} µs" + puts " min: #{latencies.first.round(1)} µs" + puts " max: #{latencies.last.round(1)} µs" +end + +puts "\nDone." diff --git a/lib/pgbus/cli.rb b/lib/pgbus/cli.rb index 282b295..77c1c20 100644 --- a/lib/pgbus/cli.rb +++ b/lib/pgbus/cli.rb @@ -42,6 +42,7 @@ def apply_start_options(args) options = parse_start_options(args) Pgbus.configuration.workers = options[:queues] if options[:queues] + Pgbus.configuration.execution_mode = options[:execution_mode].to_sym if options[:execution_mode] apply_capsule_filter(options[:capsule]) if options[:capsule] apply_role_filter(options) end @@ -70,6 +71,10 @@ def parse_start_options(args) opts.on("--dispatcher-only", "Run only the dispatcher (the maintenance pod pattern)") do options[:dispatcher_only] = true end + + opts.on("--execution-mode MODE", "Execution mode: threads (default) or async") do |v| + options[:execution_mode] = v + end end.parse!(args.dup) options end @@ -165,6 +170,8 @@ def print_help pattern — exactly one of these per deployment) --dispatcher-only Run only the dispatcher (the maintenance pod pattern) + --execution-mode Execution mode: threads (default) or async + (fiber-based, lower connection usage) HELP end end diff --git a/lib/pgbus/configuration.rb b/lib/pgbus/configuration.rb index f462d3f..9bcf5c7 100644 --- a/lib/pgbus/configuration.rb +++ b/lib/pgbus/configuration.rb @@ -11,7 +11,7 @@ class Configuration attr_accessor :default_queue, :queue_prefix # Worker settings - attr_accessor :polling_interval, :prefetch_limit + attr_accessor :polling_interval, :prefetch_limit, :execution_mode attr_reader :workers, :visibility_timeout # rubocop:disable Style/AccessorGrouping # Supervisor role selection. @@ -111,6 +111,7 @@ def initialize @visibility_timeout = 30 @prefetch_limit = nil + @execution_mode = :threads @max_jobs_per_worker = nil @max_memory_mb = nil @@ -216,6 +217,13 @@ def priority_queue_names(name) (0...priority_levels).map { |p| priority_queue_name(name, p) } end + # Returns the execution mode for a specific worker config hash, + # falling back to the global execution_mode setting. + def execution_mode_for(worker_config) + mode = worker_config[:execution_mode] || worker_config["execution_mode"] || execution_mode + ExecutionPools.normalize_mode(mode) + end + VALID_PGMQ_SCHEMA_MODES = %i[auto extension embedded].freeze def pgmq_schema_mode=(mode) @@ -242,9 +250,16 @@ def validate! raise ArgumentError, "retry_backoff_jitter must be between 0 and 1" end + # Validate global execution_mode + ExecutionPools.normalize_mode(execution_mode) + Array(workers).each do |w| threads = w[:threads] || w["threads"] || 5 raise ArgumentError, "worker threads must be > 0" unless threads.is_a?(Integer) && threads.positive? + + # Validate per-worker execution_mode override if present + mode = w[:execution_mode] || w["execution_mode"] + ExecutionPools.normalize_mode(mode) if mode end raise ArgumentError, "prefetch_limit must be > 0" if prefetch_limit && !(prefetch_limit.is_a?(Integer) && prefetch_limit.positive?) diff --git a/lib/pgbus/execution_pools.rb b/lib/pgbus/execution_pools.rb new file mode 100644 index 0000000..10798d6 --- /dev/null +++ b/lib/pgbus/execution_pools.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true + +module Pgbus + module ExecutionPools + class << self + def build(mode:, capacity:, on_state_change: nil) + case normalize_mode(mode) + when :threads + ThreadPool.new(capacity: capacity) + when :async + AsyncPool.new(capacity: capacity, on_state_change: on_state_change) + end + end + + def normalize_mode(mode) + case mode.to_s + when "", "threads" + :threads + when "async", "fiber" + :async + else + raise ArgumentError, "Unknown execution_mode: #{mode.inspect}. Expected one of: :threads, :async, :fiber" + end + end + end + end +end diff --git a/lib/pgbus/execution_pools/async_pool.rb b/lib/pgbus/execution_pools/async_pool.rb new file mode 100644 index 0000000..0e45d25 --- /dev/null +++ b/lib/pgbus/execution_pools/async_pool.rb @@ -0,0 +1,175 @@ +# frozen_string_literal: true + +module Pgbus + module ExecutionPools + class AsyncPool + attr_reader :capacity + + IDLE_WAIT_INTERVAL = 0.01 + + def initialize(capacity:, on_state_change: nil) + @capacity = capacity + @on_state_change = on_state_change + @available_capacity = capacity + @mutex = Mutex.new + @state_mutex = Mutex.new + @shutdown_flag = false + @fatal_error = nil + @boot_queue = Thread::Queue.new + @pending = Thread::Queue.new + + validate_dependencies! + @reactor_thread = start_reactor + result = @boot_queue.pop + raise result if result.is_a?(Exception) + end + + def post(&block) + raise_if_fatal! + raise "Execution pool is shutting down" if shutdown? + + reserved = false + reserve_capacity! + reserved = true + @pending << block + rescue StandardError + restore_capacity if reserved + raise + end + + def available_capacity + raise_if_fatal! + @mutex.synchronize { @available_capacity } + end + + def idle? + available_capacity.positive? + end + + def shutdown + @state_mutex.synchronize do + return false if @shutdown_flag + + @shutdown_flag = true + end + end + + def shutdown? + @state_mutex.synchronize { @shutdown_flag } + end + + def wait_for_termination(timeout) + @reactor_thread&.join(timeout) + end + + def kill + shutdown + @reactor_thread&.kill + end + + def metadata + inflight = @mutex.synchronize { @available_capacity } + { + mode: :async, + capacity: @capacity, + busy: @capacity - inflight + } + end + + private + + def validate_dependencies! + require "async" + require "async/semaphore" + rescue LoadError => e + raise LoadError, + "Async execution mode requires the `async` gem. " \ + "Add `gem \"async\"` to your Gemfile. Original error: #{e.message}" + end + + # rubocop:disable Lint/RescueException + def start_reactor + Thread.new do + Thread.current.name = "pgbus-async-reactor-#{object_id}" + Async do |task| + semaphore = Async::Semaphore.new(@capacity, parent: task) + @boot_queue << :ready + + wait_for_executions(semaphore) + wait_for_inflight + end + rescue Exception => e + register_fatal_error(e) + raise + end + end + # rubocop:enable Lint/RescueException + + def wait_for_executions(semaphore) + loop do + schedule_pending(semaphore) + break if shutdown? && @pending.empty? + + sleep(IDLE_WAIT_INTERVAL) if @pending.empty? + end + end + + def schedule_pending(semaphore) + while (block = next_pending) + semaphore.async do + perform(block) + end + end + end + + def next_pending + @pending.pop(true) + rescue ThreadError + nil + end + + def perform(block) + block.call + rescue StandardError => e + Pgbus.logger.error { "[Pgbus] Async pool fiber error: #{e.class}: #{e.message}" } + ensure + restore_capacity + end + + def reserve_capacity! + @mutex.synchronize do + raise "Execution pool is at capacity" if @available_capacity <= 0 + + @available_capacity -= 1 + end + end + + def restore_capacity + should_notify = @mutex.synchronize do + @available_capacity += 1 + @available_capacity.positive? + end + @on_state_change&.call if should_notify + end + + def register_fatal_error(error) + @state_mutex.synchronize { @fatal_error ||= error } + @boot_queue << error if @boot_queue.empty? + @on_state_change&.call + end + + def raise_if_fatal! + error = @state_mutex.synchronize { @fatal_error } + raise error if error + end + + def wait_for_inflight + sleep(IDLE_WAIT_INTERVAL) while inflight? + end + + def inflight? + @mutex.synchronize { @available_capacity < @capacity } + end + end + end +end diff --git a/lib/pgbus/execution_pools/thread_pool.rb b/lib/pgbus/execution_pools/thread_pool.rb new file mode 100644 index 0000000..9cdf919 --- /dev/null +++ b/lib/pgbus/execution_pools/thread_pool.rb @@ -0,0 +1,56 @@ +# frozen_string_literal: true + +require "concurrent" + +module Pgbus + module ExecutionPools + class ThreadPool + attr_reader :capacity + + def initialize(capacity:, on_state_change: nil) + @capacity = capacity + @on_state_change = on_state_change + @available_capacity = Concurrent::AtomicFixnum.new(capacity) + @pool = Concurrent::FixedThreadPool.new(capacity) + end + + def post(&block) + @available_capacity.decrement + @pool.post do + block.call + ensure + @available_capacity.increment + @on_state_change&.call + end + end + + def available_capacity + @available_capacity.value + end + + def idle? + available_capacity.positive? + end + + def shutdown + @pool.shutdown + end + + def wait_for_termination(timeout) + @pool.wait_for_termination(timeout) + end + + def kill + @pool.kill + end + + def metadata + { + mode: :threads, + capacity: @capacity, + busy: @capacity - available_capacity + } + end + end + end +end diff --git a/lib/pgbus/process/consumer.rb b/lib/pgbus/process/consumer.rb index 4c5e82c..be91e07 100644 --- a/lib/pgbus/process/consumer.rb +++ b/lib/pgbus/process/consumer.rb @@ -7,14 +7,15 @@ module Process class Consumer include SignalHandler - attr_reader :topics, :threads, :config + attr_reader :topics, :threads, :config, :execution_mode - def initialize(topics:, threads: 3, config: Pgbus.configuration) + def initialize(topics:, threads: 3, config: Pgbus.configuration, execution_mode: :threads) @topics = Array(topics) @threads = threads @config = config + @execution_mode = ExecutionPools.normalize_mode(execution_mode) @shutting_down = false - @pool = Concurrent::FixedThreadPool.new(threads) + @pool = ExecutionPools.build(mode: @execution_mode, capacity: threads) @registry = EventBus::Registry.instance end @@ -53,7 +54,7 @@ def setup_subscriptions end def consume - idle = @pool.max_length - @pool.queue_length + idle = @pool.available_capacity return interruptible_sleep(config.polling_interval) if idle <= 0 tagged_messages = if @queue_names.size == 1 diff --git a/lib/pgbus/process/supervisor.rb b/lib/pgbus/process/supervisor.rb index cde3e2f..7b4933a 100644 --- a/lib/pgbus/process/supervisor.rb +++ b/lib/pgbus/process/supervisor.rb @@ -60,6 +60,7 @@ def fork_worker(worker_config) threads = worker_config[:threads] || worker_config["threads"] || 5 single_active = worker_config[:single_active_consumer] || worker_config["single_active_consumer"] || false priority = worker_config[:consumer_priority] || worker_config["consumer_priority"] || 0 + exec_mode = config.execution_mode_for(worker_config) pid = fork do restore_signals @@ -68,7 +69,8 @@ def fork_worker(worker_config) bootstrap_queues worker = Worker.new( queues: queues, threads: threads, config: config, - single_active_consumer: single_active, consumer_priority: priority + single_active_consumer: single_active, consumer_priority: priority, + execution_mode: exec_mode ) worker.run end @@ -79,7 +81,7 @@ def fork_worker(worker_config) end @forks[pid] = { type: :worker, config: worker_config } - Pgbus.logger.info { "[Pgbus] Forked worker pid=#{pid} queues=#{queues.join(",")}" } + Pgbus.logger.info { "[Pgbus] Forked worker pid=#{pid} queues=#{queues.join(",")} mode=#{exec_mode}" } rescue Errno::EAGAIN, Errno::ENOMEM => e Pgbus.logger.error { "[Pgbus] Fork failed for worker: #{e.message}" } end diff --git a/lib/pgbus/process/worker.rb b/lib/pgbus/process/worker.rb index c52d116..a8ccde1 100644 --- a/lib/pgbus/process/worker.rb +++ b/lib/pgbus/process/worker.rb @@ -7,14 +7,16 @@ module Process class Worker include SignalHandler - attr_reader :queues, :threads, :config + attr_reader :queues, :threads, :config, :execution_mode def initialize(queues:, threads: 5, config: Pgbus.configuration, - single_active_consumer: false, consumer_priority: 0) + single_active_consumer: false, consumer_priority: 0, + execution_mode: :threads) @queues = Array(queues) @wildcard = @queues.include?("*") @threads = threads @config = config + @execution_mode = ExecutionPools.normalize_mode(execution_mode) @single_active_consumer = single_active_consumer @consumer_priority = consumer_priority @lifecycle = Lifecycle.new @@ -27,10 +29,14 @@ def initialize(queues:, threads: 5, config: Pgbus.configuration, @started_at_monotonic = monotonic_now @stat_buffer = config.stats_enabled ? Pgbus::StatBuffer.new : nil @executor = Pgbus::ActiveJob::Executor.new(stat_buffer: @stat_buffer) - @pool = Concurrent::FixedThreadPool.new(threads) + @wake_signal = WakeSignal.new + @pool = ExecutionPools.build( + mode: @execution_mode, + capacity: threads, + on_state_change: -> { @wake_signal.notify! } + ) @circuit_breaker = Pgbus::CircuitBreaker.new(config: config) @queue_lock = QueueLock.new if @single_active_consumer - @wake_signal = WakeSignal.new end def stats @@ -39,12 +45,13 @@ def stats jobs_failed: @jobs_failed.value, in_flight: @in_flight.value, state: @lifecycle.state, + execution_mode: @execution_mode, consumer_priority: @consumer_priority, single_active_consumer: @single_active_consumer, locked_queues: @queue_lock&.held_queues || [], rates: @rate_counter.rates, started_at: @started_at - } + }.merge(@pool.metadata) end def run @@ -52,7 +59,10 @@ def run start_heartbeat resolve_wildcard_queues @lifecycle.transition_to!(:running) - Pgbus.logger.info { "[Pgbus] Worker started: queues=#{queues.join(",")} threads=#{threads} pid=#{::Process.pid}" } + Pgbus.logger.info do + "[Pgbus] Worker started: queues=#{queues.join(",")} threads=#{threads} " \ + "mode=#{@execution_mode} pid=#{::Process.pid}" + end loop do process_signals @@ -60,7 +70,7 @@ def run refresh_wildcard_queues break if @lifecycle.stopped? - break if @lifecycle.draining? && @pool.queue_length.zero? + break if @lifecycle.draining? && @pool.idle? claim_and_execute if @lifecycle.can_process? @stat_buffer&.flush_if_due @@ -97,7 +107,7 @@ def immediate_shutdown def claim_and_execute poll_interval = effective_polling_interval - idle = @pool.max_length - @pool.queue_length + idle = @pool.available_capacity return @wake_signal.wait(timeout: poll_interval) if idle <= 0 if config.prefetch_limit @@ -332,7 +342,7 @@ def start_heartbeat kind: "worker", metadata: { queues: queues, threads: threads, pid: ::Process.pid, - consumer_priority: @consumer_priority + execution_mode: @execution_mode, consumer_priority: @consumer_priority }, on_beat: -> { @rate_counter.snapshot! } ) diff --git a/spec/pgbus/cli_spec.rb b/spec/pgbus/cli_spec.rb index c6029f0..a26bd88 100644 --- a/spec/pgbus/cli_spec.rb +++ b/spec/pgbus/cli_spec.rb @@ -137,6 +137,24 @@ end end + context "with --execution-mode flag" do + it "sets config.execution_mode to :async" do + original_mode = Pgbus.configuration.execution_mode + described_class.start(["start", "--execution-mode", "async"]) + expect(Pgbus.configuration.execution_mode).to eq(:async) + ensure + Pgbus.configuration.execution_mode = original_mode + end + + it "sets config.execution_mode to :threads" do + original_mode = Pgbus.configuration.execution_mode + described_class.start(["start", "--execution-mode", "threads"]) + expect(Pgbus.configuration.execution_mode).to eq(:threads) + ensure + Pgbus.configuration.execution_mode = original_mode + end + end + context "with multiple --*-only flags (mutually exclusive)" do it "raises ArgumentError when --workers-only and --scheduler-only are both passed" do expect do diff --git a/spec/pgbus/configuration_spec.rb b/spec/pgbus/configuration_spec.rb index 5e325cd..d49935a 100644 --- a/spec/pgbus/configuration_spec.rb +++ b/spec/pgbus/configuration_spec.rb @@ -108,6 +108,33 @@ it "has default recurring execution retention of 7 days" do expect(config.recurring_execution_retention).to eq(7 * 24 * 3600) end + + it "defaults execution_mode to :threads" do + expect(config.execution_mode).to eq(:threads) + end + end + + describe "#execution_mode_for" do + it "returns the global default when worker has no override" do + expect(config.execution_mode_for({})).to eq(:threads) + end + + it "returns the worker-level override when present" do + expect(config.execution_mode_for(execution_mode: :async)).to eq(:async) + end + + it "normalizes :fiber to :async" do + expect(config.execution_mode_for(execution_mode: :fiber)).to eq(:async) + end + + it "falls back to global execution_mode" do + config.execution_mode = :async + expect(config.execution_mode_for({})).to eq(:async) + end + + it "accepts string keys" do + expect(config.execution_mode_for("execution_mode" => "async")).to eq(:async) + end end describe "#queue_name" do @@ -834,6 +861,28 @@ config.retry_backoff_jitter = 0.2 expect { config.validate! }.not_to raise_error end + + it "rejects invalid global execution_mode" do + config.execution_mode = :bogus + expect { config.validate! }.to raise_error(ArgumentError, /execution_mode/i) + end + + it "accepts valid execution_mode values" do + %i[threads async fiber].each do |mode| + config.execution_mode = mode + expect { config.validate! }.not_to raise_error + end + end + + it "rejects invalid per-worker execution_mode" do + config.workers = [{ queues: %w[default], threads: 5, execution_mode: :bogus }] + expect { config.validate! }.to raise_error(ArgumentError, /execution_mode/i) + end + + it "accepts per-worker execution_mode override" do + config.workers = [{ queues: %w[default], threads: 50, execution_mode: :async }] + expect { config.validate! }.not_to raise_error + end end describe "#connection_options" do diff --git a/spec/pgbus/execution_pools/async_pool_spec.rb b/spec/pgbus/execution_pools/async_pool_spec.rb new file mode 100644 index 0000000..ae0b24d --- /dev/null +++ b/spec/pgbus/execution_pools/async_pool_spec.rb @@ -0,0 +1,146 @@ +# frozen_string_literal: true + +require "spec_helper" + +RSpec.describe Pgbus::ExecutionPools::AsyncPool do + subject(:pool) { described_class.new(capacity: capacity) } + + let(:capacity) { 3 } + + after do + pool.shutdown + pool.wait_for_termination(5) + end + + describe "#initialize" do + it "creates a pool with given capacity" do + expect(pool.capacity).to eq(3) + end + + it "starts fully available" do + expect(pool.available_capacity).to eq(3) + end + + it "boots the reactor thread synchronously" do + # If we get here without hanging, boot sync worked + expect(pool.available_capacity).to eq(3) + end + end + + describe "#post" do + it "executes the submitted block as a fiber" do + result = Concurrent::IVar.new + pool.post { result.set(42) } + expect(result.value(5)).to eq(42) + end + + it "executes multiple blocks concurrently" do + results = Concurrent::Array.new + done = Concurrent::CountDownLatch.new(3) + + 3.times do |i| + pool.post do + results << i + done.count_down + end + end + + expect(done.wait(5)).to be true + expect(results.size).to eq(3) + end + + it "raises when pool is shutting down" do + pool.shutdown + sleep 0.05 + expect { pool.post { nil } }.to raise_error(RuntimeError, /shutting down/) + end + + it "raises when pool is at capacity" do + barrier = Concurrent::Event.new + + capacity.times { pool.post { barrier.wait(5) } } + sleep 0.05 + + expect { pool.post { nil } }.to raise_error(RuntimeError, /at capacity/) + barrier.set + end + end + + describe "#available_capacity" do + it "decreases when work is posted" do + barrier = Concurrent::Event.new + + pool.post { barrier.wait(5) } + sleep 0.05 + + expect(pool.available_capacity).to be < capacity + barrier.set + end + + it "increases when work completes" do + done = Concurrent::Event.new + pool.post { done.set } + done.wait(5) + sleep 0.05 + + expect(pool.available_capacity).to eq(capacity) + end + end + + describe "#idle?" do + it "is true when no work is queued" do + expect(pool).to be_idle + end + end + + describe "#on_state_change callback" do + it "fires when a fiber completes" do + callback_called = Concurrent::Event.new + pool_with_cb = described_class.new( + capacity: 3, + on_state_change: -> { callback_called.set } + ) + + pool_with_cb.post { nil } + expect(callback_called.wait(5)).to be true + ensure + pool_with_cb&.shutdown + pool_with_cb&.wait_for_termination(2) + end + end + + describe "#shutdown" do + it "waits for in-flight fibers to complete" do + result = Concurrent::IVar.new + pool.post do + sleep 0.05 + result.set(:done) + end + pool.shutdown + pool.wait_for_termination(5) + + expect(result.value(0)).to eq(:done) + end + end + + describe "#metadata" do + it "returns pool information" do + meta = pool.metadata + expect(meta[:mode]).to eq(:async) + expect(meta[:capacity]).to eq(3) + expect(meta[:busy]).to eq(0) + end + end + + describe "error handling" do + it "does not crash the reactor when a fiber raises" do + pool.post { raise "boom" } + sleep 0.1 + + # Pool should still accept new work + result = Concurrent::IVar.new + pool.post { result.set(:ok) } + expect(result.value(5)).to eq(:ok) + end + end +end diff --git a/spec/pgbus/execution_pools/thread_pool_spec.rb b/spec/pgbus/execution_pools/thread_pool_spec.rb new file mode 100644 index 0000000..eb1bbeb --- /dev/null +++ b/spec/pgbus/execution_pools/thread_pool_spec.rb @@ -0,0 +1,118 @@ +# frozen_string_literal: true + +require "spec_helper" + +RSpec.describe Pgbus::ExecutionPools::ThreadPool do + subject(:pool) { described_class.new(capacity: capacity) } + + let(:capacity) { 3 } + + after { pool.kill } + + describe "#initialize" do + it "creates a pool with given capacity" do + expect(pool.capacity).to eq(3) + end + + it "starts fully available" do + expect(pool.available_capacity).to eq(3) + end + end + + describe "#post" do + it "executes the submitted block" do + result = Concurrent::IVar.new + pool.post { result.set(42) } + expect(result.value(2)).to eq(42) + end + + it "executes multiple blocks concurrently" do + results = Concurrent::Array.new + latch = Concurrent::CountDownLatch.new(3) + + 3.times do |i| + pool.post do + results << i + latch.count_down + end + end + + expect(latch.wait(5)).to be true + expect(results.size).to eq(3) + end + end + + describe "#available_capacity" do + it "decreases when work is posted" do + barrier = Concurrent::Event.new + + pool.post { barrier.wait(5) } + sleep 0.05 # let thread start + + expect(pool.available_capacity).to be < capacity + barrier.set + end + + it "increases when work completes" do + done = Concurrent::Event.new + pool.post { done.set } + done.wait(2) + sleep 0.05 # let pool reclaim thread + + expect(pool.available_capacity).to eq(capacity) + end + end + + describe "#idle?" do + it "is true when no work is queued" do + expect(pool).to be_idle + end + + it "is false when at capacity" do + barrier = Concurrent::Event.new + + capacity.times { pool.post { barrier.wait(5) } } + sleep 0.05 + + expect(pool).not_to be_idle + barrier.set + end + end + + describe "#shutdown" do + it "finishes in-progress work" do + result = Concurrent::IVar.new + pool.post { sleep 0.05; result.set(:done) } + pool.shutdown + + expect(result.value(2)).to eq(:done) + end + end + + describe "#wait_for_termination" do + it "blocks until all work completes" do + result = Concurrent::IVar.new + pool.post { sleep 0.05; result.set(:done) } + pool.shutdown + pool.wait_for_termination(5) + + expect(result.value(0)).to eq(:done) + end + end + + describe "#kill" do + it "abandons queued work" do + pool.kill + expect { pool.post { nil } }.to raise_error(Concurrent::RejectedExecutionError) + end + end + + describe "#metadata" do + it "returns pool information" do + meta = pool.metadata + expect(meta[:mode]).to eq(:threads) + expect(meta[:capacity]).to eq(3) + expect(meta[:busy]).to eq(0) + end + end +end diff --git a/spec/pgbus/execution_pools_spec.rb b/spec/pgbus/execution_pools_spec.rb new file mode 100644 index 0000000..c9921ba --- /dev/null +++ b/spec/pgbus/execution_pools_spec.rb @@ -0,0 +1,56 @@ +# frozen_string_literal: true + +require "spec_helper" + +RSpec.describe Pgbus::ExecutionPools do + describe ".build" do + it "returns a ThreadPool for :threads mode" do + pool = described_class.build(mode: :threads, capacity: 3) + expect(pool).to be_a(Pgbus::ExecutionPools::ThreadPool) + pool.kill + end + + it "returns an AsyncPool for :async mode" do + pool = described_class.build(mode: :async, capacity: 3) + expect(pool).to be_a(Pgbus::ExecutionPools::AsyncPool) + pool.shutdown + pool.wait_for_termination(2) + end + + it "returns an AsyncPool for :fiber mode (alias)" do + pool = described_class.build(mode: :fiber, capacity: 3) + expect(pool).to be_a(Pgbus::ExecutionPools::AsyncPool) + pool.shutdown + pool.wait_for_termination(2) + end + + it "raises ArgumentError for unknown mode" do + expect { described_class.build(mode: :unknown, capacity: 3) } + .to raise_error(ArgumentError, /unknown execution_mode/i) + end + end + + describe ".normalize_mode" do + it "normalizes :fiber to :async" do + expect(described_class.normalize_mode(:fiber)).to eq(:async) + end + + it "passes :threads through" do + expect(described_class.normalize_mode(:threads)).to eq(:threads) + end + + it "passes :async through" do + expect(described_class.normalize_mode(:async)).to eq(:async) + end + + it "accepts string modes" do + expect(described_class.normalize_mode("threads")).to eq(:threads) + expect(described_class.normalize_mode("async")).to eq(:async) + end + + it "raises for unknown modes" do + expect { described_class.normalize_mode(:bogus) } + .to raise_error(ArgumentError) + end + end +end diff --git a/spec/pgbus/process/consumer_spec.rb b/spec/pgbus/process/consumer_spec.rb index 689b8e0..b142d30 100644 --- a/spec/pgbus/process/consumer_spec.rb +++ b/spec/pgbus/process/consumer_spec.rb @@ -5,7 +5,14 @@ RSpec.describe Pgbus::Process::Consumer do let(:mock_client) { build_mock_client } - let(:mock_pool) { instance_double(Concurrent::FixedThreadPool, kill: nil, shutdown: nil, wait_for_termination: nil) } + let(:mock_pool) do + instance_double( + Pgbus::ExecutionPools::ThreadPool, + capacity: 3, available_capacity: 3, idle?: true, + kill: nil, shutdown: nil, wait_for_termination: nil, + metadata: { mode: :threads, capacity: 3, busy: 0 } + ) + end let(:mock_heartbeat) { instance_double(Pgbus::Process::Heartbeat, start: nil, stop: nil) } let(:subscriber_a) { instance_double(Pgbus::EventBus::Subscriber, pattern: "orders.#", queue_name: "q_orders") } let(:subscriber_b) { instance_double(Pgbus::EventBus::Subscriber, pattern: "payments.completed", queue_name: "q_payments") } @@ -14,7 +21,7 @@ before do allow(Pgbus).to receive(:client).and_return(mock_client) - allow(Concurrent::FixedThreadPool).to receive(:new).and_return(mock_pool) + allow(Pgbus::ExecutionPools).to receive(:build).and_return(mock_pool) allow(Pgbus::Process::Heartbeat).to receive(:new).and_return(mock_heartbeat) allow(Pgbus::EventBus::Registry).to receive(:instance).and_return(registry) end diff --git a/spec/pgbus/process/worker_spec.rb b/spec/pgbus/process/worker_spec.rb index 9a5200f..ed23b15 100644 --- a/spec/pgbus/process/worker_spec.rb +++ b/spec/pgbus/process/worker_spec.rb @@ -6,7 +6,14 @@ let(:heartbeat) { instance_double(Pgbus::Process::Heartbeat, start: true, stop: true) } let(:mock_client) { build_mock_client } let(:executor) { instance_double(Pgbus::ActiveJob::Executor) } - let(:pool) { instance_double(Concurrent::FixedThreadPool, max_length: 5, queue_length: 0, shutdown: true, kill: true) } + let(:pool) do + instance_double( + Pgbus::ExecutionPools::ThreadPool, + capacity: 5, available_capacity: 5, idle?: true, + shutdown: true, kill: true, wait_for_termination: true, + metadata: { mode: :threads, capacity: 5, busy: 0 } + ) + end let(:circuit_breaker) { instance_double(Pgbus::CircuitBreaker, paused?: false, record_success: nil, record_failure: nil) } let(:worker) { described_class.new(queues: %w[default], threads: 5) } @@ -14,16 +21,37 @@ allow(Pgbus::Process::Heartbeat).to receive(:new).and_return(heartbeat) allow(Pgbus).to receive(:client).and_return(mock_client) allow(Pgbus::ActiveJob::Executor).to receive(:new).and_return(executor) - allow(Concurrent::FixedThreadPool).to receive(:new).and_return(pool) + allow(Pgbus::ExecutionPools).to receive(:build).and_return(pool) allow(Pgbus::CircuitBreaker).to receive(:new).and_return(circuit_breaker) end describe "#initialize" do - it "stores config, queues, and creates a thread pool" do + it "stores config, queues, and creates an execution pool" do expect(worker.queues).to eq(%w[default]) expect(worker.threads).to eq(5) expect(worker.config).to be_a(Pgbus::Configuration) - expect(Concurrent::FixedThreadPool).to have_received(:new).with(5) + expect(Pgbus::ExecutionPools).to have_received(:build).with( + mode: :threads, capacity: 5, on_state_change: an_instance_of(Proc) + ) + end + + it "defaults to threads execution mode" do + expect(worker.execution_mode).to eq(:threads) + end + + it "accepts async execution mode" do + async_pool = instance_double( + Pgbus::ExecutionPools::AsyncPool, + capacity: 50, available_capacity: 50, idle?: true, + shutdown: true, kill: true, wait_for_termination: true, + metadata: { mode: :async, capacity: 50, busy: 0 } + ) + allow(Pgbus::ExecutionPools).to receive(:build) + .with(mode: :async, capacity: 50, on_state_change: an_instance_of(Proc)) + .and_return(async_pool) + + async_worker = described_class.new(queues: %w[default], threads: 50, execution_mode: :async) + expect(async_worker.execution_mode).to eq(:async) end it "initializes stats tracking" do @@ -31,7 +59,10 @@ expect(worker.stats[:jobs_failed]).to eq(0) expect(worker.stats[:in_flight]).to eq(0) expect(worker.stats[:state]).to eq(:starting) + expect(worker.stats[:execution_mode]).to eq(:threads) expect(worker.stats[:started_at]).to be_a(Time) + expect(worker.stats[:mode]).to eq(:threads) + expect(worker.stats[:capacity]).to eq(5) end end From 24d56671679ac012f458db991faba4b39f347843 Mon Sep 17 00:00:00 2001 From: mhenrixon Date: Fri, 10 Apr 2026 09:57:19 +0200 Subject: [PATCH 2/7] fix(ci): resolve rubocop offenses and rename command - Fix RSpec/RepeatedExample in async_pool_spec (distinct assertion) - Fix Style/Semicolon in thread_pool_spec (multi-line blocks) - Rename github-ci-failures command to github-review-failures for consistency with github-review-comments --- ...github-ci-failures.md => github-review-failures.md} | 0 spec/pgbus/execution_pools/async_pool_spec.rb | 7 +++++-- spec/pgbus/execution_pools/thread_pool_spec.rb | 10 ++++++++-- 3 files changed, 13 insertions(+), 4 deletions(-) rename .claude/commands/{github-ci-failures.md => github-review-failures.md} (100%) diff --git a/.claude/commands/github-ci-failures.md b/.claude/commands/github-review-failures.md similarity index 100% rename from .claude/commands/github-ci-failures.md rename to .claude/commands/github-review-failures.md diff --git a/spec/pgbus/execution_pools/async_pool_spec.rb b/spec/pgbus/execution_pools/async_pool_spec.rb index ae0b24d..4ccdc9a 100644 --- a/spec/pgbus/execution_pools/async_pool_spec.rb +++ b/spec/pgbus/execution_pools/async_pool_spec.rb @@ -22,8 +22,11 @@ end it "boots the reactor thread synchronously" do - # If we get here without hanging, boot sync worked - expect(pool.available_capacity).to eq(3) + # If we get here without hanging, boot sync worked. + # Verify the pool is functional by posting and completing a task. + result = Concurrent::IVar.new + pool.post { result.set(:booted) } + expect(result.value(5)).to eq(:booted) end end diff --git a/spec/pgbus/execution_pools/thread_pool_spec.rb b/spec/pgbus/execution_pools/thread_pool_spec.rb index eb1bbeb..4c29292 100644 --- a/spec/pgbus/execution_pools/thread_pool_spec.rb +++ b/spec/pgbus/execution_pools/thread_pool_spec.rb @@ -82,7 +82,10 @@ describe "#shutdown" do it "finishes in-progress work" do result = Concurrent::IVar.new - pool.post { sleep 0.05; result.set(:done) } + pool.post do + sleep 0.05 + result.set(:done) + end pool.shutdown expect(result.value(2)).to eq(:done) @@ -92,7 +95,10 @@ describe "#wait_for_termination" do it "blocks until all work completes" do result = Concurrent::IVar.new - pool.post { sleep 0.05; result.set(:done) } + pool.post do + sleep 0.05 + result.set(:done) + end pool.shutdown pool.wait_for_termination(5) From 792877a00d2e6078f0e33721de024abf4b3d0739 Mon Sep 17 00:00:00 2001 From: mhenrixon Date: Fri, 10 Apr 2026 10:24:49 +0200 Subject: [PATCH 3/7] fix: address PR review feedback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Pass on_state_change callback to ThreadPool in factory (was only passed to AsyncPool — workers in thread mode missed wake signals) - Guard ThreadPool#post against capacity leak if pool.post raises - Remove unused benchmark/ips require from connection_pool_bench - Use exec_params in connection_pool_bench to avoid SQL interpolation - Add on_state_change callback test for ThreadPool --- benchmarks/connection_pool_bench.rb | 7 ++++--- lib/pgbus/execution_pools.rb | 2 +- lib/pgbus/execution_pools/thread_pool.rb | 13 +++++++++---- spec/pgbus/execution_pools/thread_pool_spec.rb | 13 +++++++++++++ 4 files changed, 27 insertions(+), 8 deletions(-) diff --git a/benchmarks/connection_pool_bench.rb b/benchmarks/connection_pool_bench.rb index c7f9afe..b5dec3d 100644 --- a/benchmarks/connection_pool_bench.rb +++ b/benchmarks/connection_pool_bench.rb @@ -9,7 +9,6 @@ # Usage: # PGBUS_DATABASE_URL=postgres://user@localhost/pgbus_test ruby benchmarks/connection_pool_bench.rb -require "benchmark/ips" require "json" require "securerandom" require "concurrent" @@ -35,9 +34,11 @@ def monitor_connection(url) end def count_active_connections(conn, prefix = "pgbus_connbench") - result = conn.exec( + pattern = "%#{prefix}%" + result = conn.exec_params( "SELECT count(*) FROM pg_stat_activity " \ - "WHERE application_name LIKE '%#{prefix}%' OR query LIKE '%#{prefix}%'" + "WHERE application_name LIKE $1 OR query LIKE $1", + [pattern] ) result[0]["count"].to_i rescue PG::Error => e diff --git a/lib/pgbus/execution_pools.rb b/lib/pgbus/execution_pools.rb index 10798d6..22701de 100644 --- a/lib/pgbus/execution_pools.rb +++ b/lib/pgbus/execution_pools.rb @@ -6,7 +6,7 @@ class << self def build(mode:, capacity:, on_state_change: nil) case normalize_mode(mode) when :threads - ThreadPool.new(capacity: capacity) + ThreadPool.new(capacity: capacity, on_state_change: on_state_change) when :async AsyncPool.new(capacity: capacity, on_state_change: on_state_change) end diff --git a/lib/pgbus/execution_pools/thread_pool.rb b/lib/pgbus/execution_pools/thread_pool.rb index 9cdf919..80daf85 100644 --- a/lib/pgbus/execution_pools/thread_pool.rb +++ b/lib/pgbus/execution_pools/thread_pool.rb @@ -16,11 +16,16 @@ def initialize(capacity:, on_state_change: nil) def post(&block) @available_capacity.decrement - @pool.post do - block.call - ensure + begin + @pool.post do + block.call + ensure + @available_capacity.increment + @on_state_change&.call + end + rescue StandardError @available_capacity.increment - @on_state_change&.call + raise end end diff --git a/spec/pgbus/execution_pools/thread_pool_spec.rb b/spec/pgbus/execution_pools/thread_pool_spec.rb index 4c29292..2682d2c 100644 --- a/spec/pgbus/execution_pools/thread_pool_spec.rb +++ b/spec/pgbus/execution_pools/thread_pool_spec.rb @@ -20,6 +20,19 @@ end describe "#post" do + it "fires on_state_change when work completes" do + callback_called = Concurrent::Event.new + pool_with_cb = described_class.new( + capacity: 3, + on_state_change: -> { callback_called.set } + ) + + pool_with_cb.post { nil } + expect(callback_called.wait(2)).to be true + ensure + pool_with_cb&.kill + end + it "executes the submitted block" do result = Concurrent::IVar.new pool.post { result.set(42) } From 1d15fc73a2e9f2fc3b00d8581ffa52699721c0a4 Mon Sep 17 00:00:00 2001 From: mhenrixon Date: Fri, 10 Apr 2026 10:40:11 +0200 Subject: [PATCH 4/7] fix(streams): handle IOError on listener shutdown gracefully MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When Listener#stop closes the PG connection to interrupt wait_for_notify, Ruby raises IOError ("stream closed in another thread") rather than PG::Error. The rescue clause only caught PG::Error, leaving IOError unhandled and dumping a stack trace to stderr on every streamer shutdown. Catch IOError before PG::Error since it indicates an expected shutdown — the connection was deliberately closed by #stop. --- lib/pgbus/web/streamer/listener.rb | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/pgbus/web/streamer/listener.rb b/lib/pgbus/web/streamer/listener.rb index 555e3e8..12ed797 100644 --- a/lib/pgbus/web/streamer/listener.rb +++ b/lib/pgbus/web/streamer/listener.rb @@ -113,6 +113,11 @@ def run_loop @conn.wait_for_notify(timeout_s) do |channel, _pid, _payload| handle_notify(channel) end || run_health_check + rescue IOError + # #stop closes the PG connection to interrupt + # wait_for_notify, which raises IOError ("stream closed + # in another thread"). This is expected — exit cleanly. + break unless @running rescue PG::Error => e break unless @running From 4a6f016733468aec4b42f465881ec8e0d20695aa Mon Sep 17 00:00:00 2001 From: mhenrixon Date: Fri, 10 Apr 2026 10:48:12 +0200 Subject: [PATCH 5/7] fix(test): GC before allocation leak test for Ruby 3.3 stability MemoryProfiler on Ruby 3.3 picks up lazily-initialized globals from gems loaded by other specs in the same process (e.g. async's fiber scheduler hooks). Force GC.start before the report so these objects are collected and not attributed to send_message calls. This consistently failed on CI Ruby 3.3 (retained=5) while passing on 3.4 and 4.0 where GC is more aggressive about collecting fiber-related global state. --- spec/pgbus/allocation_budget_spec.rb | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/spec/pgbus/allocation_budget_spec.rb b/spec/pgbus/allocation_budget_spec.rb index f2c6e34..7c58733 100644 --- a/spec/pgbus/allocation_budget_spec.rb +++ b/spec/pgbus/allocation_budget_spec.rb @@ -110,6 +110,12 @@ def transaction(...) = yield(self) it "retains zero objects across 100 send_message cycles" do 10.times { plain_client.send_message("default", small_payload) } + # Force GC to collect any lazily-initialized gem globals (e.g. async's + # fiber scheduler hooks) so MemoryProfiler doesn't attribute them to + # our send_message calls. Without this, Ruby 3.3 retains objects from + # gems loaded by other specs in the same process. + GC.start + report = MemoryProfiler.report do 100.times { plain_client.send_message("default", small_payload) } end From 865422f220d46eabd45af74c3ba8a0bae3ddce34 Mon Sep 17 00:00:00 2001 From: mhenrixon Date: Fri, 10 Apr 2026 10:53:26 +0200 Subject: [PATCH 6/7] fix(streams): log and reconnect on unexpected IOError in listener MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When IOError fires while the listener is still running (not from #stop shutdown), log the error and reconnect — same as the PG::Error path. Without this, the loop would continue trying wait_for_notify on a dead connection. --- lib/pgbus/web/streamer/listener.rb | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/pgbus/web/streamer/listener.rb b/lib/pgbus/web/streamer/listener.rb index 12ed797..4d7b056 100644 --- a/lib/pgbus/web/streamer/listener.rb +++ b/lib/pgbus/web/streamer/listener.rb @@ -113,11 +113,14 @@ def run_loop @conn.wait_for_notify(timeout_s) do |channel, _pid, _payload| handle_notify(channel) end || run_health_check - rescue IOError + rescue IOError => e # #stop closes the PG connection to interrupt # wait_for_notify, which raises IOError ("stream closed # in another thread"). This is expected — exit cleanly. break unless @running + + @logger.warn { "[Pgbus::Streamer::Listener] IO error (#{e.class}: #{e.message}) — reconnecting" } + reconnect! rescue PG::Error => e break unless @running From 6ffa884a9ecbc42e644ad64ea2f97d3061cff0de Mon Sep 17 00:00:00 2001 From: mhenrixon Date: Fri, 10 Apr 2026 10:58:31 +0200 Subject: [PATCH 7/7] fix(test): scope allocation leak test to lib/pgbus/ files only MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace GC.start workaround with MemoryProfiler's allow_files filter. This scopes retained object tracking to lib/pgbus/ so external gem globals (async fiber hooks, connection_pool singletons, JSON parser caches) loaded by other specs in the same process are excluded. The test now validates that Pgbus's own send_message path retains zero objects — regardless of what other gems do globally. --- spec/pgbus/allocation_budget_spec.rb | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/spec/pgbus/allocation_budget_spec.rb b/spec/pgbus/allocation_budget_spec.rb index 7c58733..41ae7d7 100644 --- a/spec/pgbus/allocation_budget_spec.rb +++ b/spec/pgbus/allocation_budget_spec.rb @@ -110,13 +110,12 @@ def transaction(...) = yield(self) it "retains zero objects across 100 send_message cycles" do 10.times { plain_client.send_message("default", small_payload) } - # Force GC to collect any lazily-initialized gem globals (e.g. async's - # fiber scheduler hooks) so MemoryProfiler doesn't attribute them to - # our send_message calls. Without this, Ruby 3.3 retains objects from - # gems loaded by other specs in the same process. - GC.start - - report = MemoryProfiler.report do + # Scope to lib/pgbus/ so retained objects from external gems + # (async fiber scheduler hooks, connection_pool singletons, JSON + # parser caches) loaded by other specs in the same process don't + # pollute the measurement. This test validates that Pgbus's own + # send_message path leaks nothing. + report = MemoryProfiler.report(allow_files: "lib/pgbus") do 100.times { plain_client.send_message("default", small_payload) } end