diff --git a/.claude/commands/github-ci-failures.md b/.claude/commands/github-review-failures.md similarity index 100% rename from .claude/commands/github-ci-failures.md rename to .claude/commands/github-review-failures.md diff --git a/CLAUDE.md b/CLAUDE.md index 1262275..176fc87 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -55,6 +55,7 @@ bundle exec rake # Both Layer 6: Dashboard app/controllers/pgbus/, app/views/pgbus/ Layer 5: CLI lib/pgbus/cli.rb Layer 4: Process Model lib/pgbus/process/ (supervisor, worker, dispatcher, consumer) + Execution Pools lib/pgbus/execution_pools/ (thread_pool, async_pool) Layer 3: Event Bus lib/pgbus/event_bus/ (publisher, subscriber, registry, handler) Layer 2: ActiveJob lib/pgbus/active_job/ (adapter, executor) Layer 1: Client lib/pgbus/client.rb (PGMQ wrapper) diff --git a/README.md b/README.md index 8516901..767d31f 100644 --- a/README.md +++ b/README.md @@ -463,6 +463,36 @@ end When a limit is hit, the worker drains its thread pool, exits, and the supervisor forks a fresh process. RSS memory is sampled from `/proc/self/statm` (Linux) or `ps -o rss` (macOS). +### Async execution mode (fibers) + +Workers can optionally execute jobs as fibers instead of threads. This is ideal for I/O-bound workloads (HTTP calls, email delivery, LLM API calls) where jobs spend most of their time waiting on network I/O. + +```ruby +Pgbus.configure do |config| + # Global: all workers use async mode + config.execution_mode = :async + + # Or per-worker: mix thread and async workers + config.workers = [ + { queues: %w[webhooks emails], threads: 100, execution_mode: :async }, + { queues: %w[default], threads: 5 } # stays thread-based + ] +end +``` + +**Prerequisites:** + +1. Add `gem "async"` to your Gemfile +2. Set `config.active_support.isolation_level = :fiber` in your Rails app + +**Why it reduces connections:** In thread mode, each thread holds a database connection while waiting on I/O. With 50 threads, that's 50 connections. In async mode, 50 fibers share 3-5 connections because fibers yield during I/O and only one runs at a time. + +**CLI flag:** `pgbus start --execution-mode async` + +**Safety:** Messages stay in PGMQ with visibility timeout protection regardless of execution mode. If a fiber or worker crashes, the visibility timeout expires and messages become available for re-read. No data loss risk. + +**Not recommended for:** CPU-bound jobs (image processing, heavy computation). These block the single reactor thread and should use thread mode. + ## Routing and ordering How messages flow between producers and the workers that handle them: priority sub-queues, consumer priority for active/standby workers, and single-active-consumer for strict ordering. diff --git a/benchmarks/connection_pool_bench.rb b/benchmarks/connection_pool_bench.rb new file mode 100644 index 0000000..b5dec3d --- /dev/null +++ b/benchmarks/connection_pool_bench.rb @@ -0,0 +1,130 @@ +# frozen_string_literal: true + +# Connection Pool Benchmark +# Measures peak active Postgres connections under varying concurrency +# for ThreadPool vs AsyncPool execution modes. +# +# REQUIRES: PGBUS_DATABASE_URL environment variable +# +# Usage: +# PGBUS_DATABASE_URL=postgres://user@localhost/pgbus_test ruby benchmarks/connection_pool_bench.rb + +require "json" +require "securerandom" +require "concurrent" +require "pgbus" +require "pg" + +DATABASE_URL = ENV.fetch("PGBUS_DATABASE_URL") do + warn "PGBUS_DATABASE_URL not set. This benchmark requires a real PostgreSQL database." + warn "Example: PGBUS_DATABASE_URL=postgres://user@localhost:5432/pgbus_test ruby benchmarks/connection_pool_bench.rb" + exit 1 +end + +Pgbus.configure do |c| + c.logger = Logger.new(IO::NULL) + c.queue_prefix = "pgbus_connbench" + c.default_queue = "default" + c.stats_enabled = false + c.database_url = DATABASE_URL +end + +def monitor_connection(url) + PG.connect(url) +end + +def count_active_connections(conn, prefix = "pgbus_connbench") + pattern = "%#{prefix}%" + result = conn.exec_params( + "SELECT count(*) FROM pg_stat_activity " \ + "WHERE application_name LIKE $1 OR query LIKE $1", + [pattern] + ) + result[0]["count"].to_i +rescue PG::Error => e + warn "Connection count query failed: #{e.message}" + 0 +end + +def update_peak(peak, current) + old = peak.value + peak.compare_and_set(old, current) if current > old +end + +def measure_peak_connections(mode:, capacity:, tasks:, monitor_conn:) + pool = if mode == :threads + Pgbus::ExecutionPools::ThreadPool.new(capacity: capacity) + else + Pgbus::ExecutionPools::AsyncPool.new(capacity: capacity) + end + + peak = Concurrent::AtomicFixnum.new(0) + done = Concurrent::CountDownLatch.new(tasks) + + # Monitor thread samples pg_stat_activity during the benchmark + stop_monitoring = Concurrent::AtomicBoolean.new(false) + monitor = Thread.new do + while stop_monitoring.false? + current = count_active_connections(monitor_conn) + update_peak(peak, current) + sleep 0.01 + end + end + + tasks.times do + pool.post do + # Simulate a real job: query the database + conn = PG.connect(DATABASE_URL) + conn.exec("SELECT pg_sleep(0.01)") + conn.close + done.count_down + rescue PG::Error + done.count_down + end + end + + done.wait(30) + stop_monitoring.make_true + monitor.join(2) + + pool.shutdown + pool.wait_for_termination(10) + + peak.value +end + +puts "=" * 70 +puts "Connection Pool Benchmark" +puts "Database: #{DATABASE_URL.sub(%r{//[^@]+@}, "//***@")}" +puts "=" * 70 + +monitor_conn = monitor_connection(DATABASE_URL) + +configs = [ + { mode: :threads, capacity: 5, tasks: 20 }, + { mode: :threads, capacity: 10, tasks: 40 }, + { mode: :threads, capacity: 25, tasks: 100 }, + { mode: :async, capacity: 50, tasks: 200 }, + { mode: :async, capacity: 100, tasks: 400 } +] + +header = ["Mode", "Capacity", "Tasks", "Peak Connections"] +puts format("\n%-12s %-10s %-8s %-20s", *header) +puts "-" * 55 + +configs.each do |cfg| + peak = measure_peak_connections( + mode: cfg[:mode], + capacity: cfg[:capacity], + tasks: cfg[:tasks], + monitor_conn: monitor_conn + ) + puts format("%-12s %-10d %-8d %-20d", cfg[:mode], cfg[:capacity], cfg[:tasks], peak) + sleep 1 # let connections drain between runs +end + +monitor_conn.close + +puts "\nExpected: async mode should show significantly fewer peak connections" +puts "than thread mode, regardless of fiber capacity." +puts "\nDone." diff --git a/benchmarks/execution_pool_bench.rb b/benchmarks/execution_pool_bench.rb new file mode 100644 index 0000000..34a9d40 --- /dev/null +++ b/benchmarks/execution_pool_bench.rb @@ -0,0 +1,167 @@ +# frozen_string_literal: true + +require_relative "bench_helper" + +# Execution Pool Benchmark +# Compares ThreadPool vs AsyncPool throughput, latency, and memory. +# +# Usage: +# ruby benchmarks/execution_pool_bench.rb + +CAPACITIES = [10, 50].freeze + +def build_thread_pool(capacity) + Pgbus::ExecutionPools::ThreadPool.new(capacity: capacity) +end + +def build_async_pool(capacity) + Pgbus::ExecutionPools::AsyncPool.new(capacity: capacity) +end + +# --- 1. No-op throughput (scheduling overhead) --- + +puts "=" * 70 +puts "1. No-op throughput — scheduling overhead (IPS)" +puts "=" * 70 + +CAPACITIES.each do |cap| + Benchmark.ips do |x| + x.config(warmup: 2, time: 5) + + x.report("ThreadPool(#{cap}) no-op") do + pool = build_thread_pool(cap) + done = Concurrent::CountDownLatch.new(cap) + cap.times { pool.post { done.count_down } } + done.wait(10) + pool.shutdown + pool.wait_for_termination(5) + end + + x.report("AsyncPool(#{cap}) no-op") do + pool = build_async_pool(cap) + done = Concurrent::CountDownLatch.new(cap) + cap.times { pool.post { done.count_down } } + done.wait(10) + pool.shutdown + pool.wait_for_termination(5) + end + + x.compare! + end + puts +end + +# --- 2. I/O-bound throughput (fiber advantage zone) --- + +puts "=" * 70 +puts "2. I/O-bound throughput — sleep(0.01) simulating DB I/O" +puts "=" * 70 + +CAPACITIES.each do |cap| + Benchmark.ips do |x| + x.config(warmup: 1, time: 5) + + x.report("ThreadPool(#{cap}) I/O") do + pool = build_thread_pool(cap) + done = Concurrent::CountDownLatch.new(cap) + cap.times do + pool.post do + sleep(0.01) + done.count_down + end + end + done.wait(10) + pool.shutdown + pool.wait_for_termination(5) + end + + x.report("AsyncPool(#{cap}) I/O") do + pool = build_async_pool(cap) + done = Concurrent::CountDownLatch.new(cap) + cap.times do + pool.post do + sleep(0.01) + done.count_down + end + end + done.wait(10) + pool.shutdown + pool.wait_for_termination(5) + end + + x.compare! + end + puts +end + +# --- 3. Memory overhead --- + +puts "=" * 70 +puts "3. Memory overhead — 1000 tasks" +puts "=" * 70 + +[10, 50].each do |cap| + puts "\n--- Capacity: #{cap} ---" + + %i[threads async].each do |mode| + report = MemoryProfiler.report do + 100.times do + pool = if mode == :threads + build_thread_pool(cap) + else + build_async_pool(cap) + end + done = Concurrent::CountDownLatch.new(cap) + cap.times { pool.post { done.count_down } } + done.wait(10) + pool.shutdown + pool.wait_for_termination(5) + end + end + + puts "\n#{mode.upcase} pool (#{cap} capacity × 100 iterations):" + puts " Total allocated: #{report.total_allocated_memsize} bytes" + puts " Total retained: #{report.total_retained_memsize} bytes" + puts " Allocated objects: #{report.total_allocated}" + puts " Retained objects: #{report.total_retained}" + end +end + +# --- 4. Latency percentiles --- + +puts "\n#{"=" * 70}" +puts "4. Latency percentiles — single task completion time" +puts "=" * 70 + +SAMPLES = 500 + +%i[threads async].each do |mode| + pool = mode == :threads ? build_thread_pool(5) : build_async_pool(5) + latencies = [] + + SAMPLES.times do + start = Process.clock_gettime(Process::CLOCK_MONOTONIC) + done = Concurrent::Event.new + pool.post { done.set } + done.wait(5) + elapsed = (Process.clock_gettime(Process::CLOCK_MONOTONIC) - start) * 1_000_000 # microseconds + latencies << elapsed + end + + pool.shutdown + pool.wait_for_termination(5) + + latencies.sort! + p50 = latencies[(SAMPLES * 0.50).to_i] + p95 = latencies[(SAMPLES * 0.95).to_i] + p99 = latencies[(SAMPLES * 0.99).to_i] + + puts "\n#{mode.upcase} pool latency (#{SAMPLES} samples):" + puts " p50: #{p50.round(1)} µs" + puts " p95: #{p95.round(1)} µs" + puts " p99: #{p99.round(1)} µs" + puts " min: #{latencies.first.round(1)} µs" + puts " max: #{latencies.last.round(1)} µs" +end + +puts "\nDone." diff --git a/lib/pgbus/cli.rb b/lib/pgbus/cli.rb index 282b295..77c1c20 100644 --- a/lib/pgbus/cli.rb +++ b/lib/pgbus/cli.rb @@ -42,6 +42,7 @@ def apply_start_options(args) options = parse_start_options(args) Pgbus.configuration.workers = options[:queues] if options[:queues] + Pgbus.configuration.execution_mode = options[:execution_mode].to_sym if options[:execution_mode] apply_capsule_filter(options[:capsule]) if options[:capsule] apply_role_filter(options) end @@ -70,6 +71,10 @@ def parse_start_options(args) opts.on("--dispatcher-only", "Run only the dispatcher (the maintenance pod pattern)") do options[:dispatcher_only] = true end + + opts.on("--execution-mode MODE", "Execution mode: threads (default) or async") do |v| + options[:execution_mode] = v + end end.parse!(args.dup) options end @@ -165,6 +170,8 @@ def print_help pattern — exactly one of these per deployment) --dispatcher-only Run only the dispatcher (the maintenance pod pattern) + --execution-mode Execution mode: threads (default) or async + (fiber-based, lower connection usage) HELP end end diff --git a/lib/pgbus/configuration.rb b/lib/pgbus/configuration.rb index f462d3f..9bcf5c7 100644 --- a/lib/pgbus/configuration.rb +++ b/lib/pgbus/configuration.rb @@ -11,7 +11,7 @@ class Configuration attr_accessor :default_queue, :queue_prefix # Worker settings - attr_accessor :polling_interval, :prefetch_limit + attr_accessor :polling_interval, :prefetch_limit, :execution_mode attr_reader :workers, :visibility_timeout # rubocop:disable Style/AccessorGrouping # Supervisor role selection. @@ -111,6 +111,7 @@ def initialize @visibility_timeout = 30 @prefetch_limit = nil + @execution_mode = :threads @max_jobs_per_worker = nil @max_memory_mb = nil @@ -216,6 +217,13 @@ def priority_queue_names(name) (0...priority_levels).map { |p| priority_queue_name(name, p) } end + # Returns the execution mode for a specific worker config hash, + # falling back to the global execution_mode setting. + def execution_mode_for(worker_config) + mode = worker_config[:execution_mode] || worker_config["execution_mode"] || execution_mode + ExecutionPools.normalize_mode(mode) + end + VALID_PGMQ_SCHEMA_MODES = %i[auto extension embedded].freeze def pgmq_schema_mode=(mode) @@ -242,9 +250,16 @@ def validate! raise ArgumentError, "retry_backoff_jitter must be between 0 and 1" end + # Validate global execution_mode + ExecutionPools.normalize_mode(execution_mode) + Array(workers).each do |w| threads = w[:threads] || w["threads"] || 5 raise ArgumentError, "worker threads must be > 0" unless threads.is_a?(Integer) && threads.positive? + + # Validate per-worker execution_mode override if present + mode = w[:execution_mode] || w["execution_mode"] + ExecutionPools.normalize_mode(mode) if mode end raise ArgumentError, "prefetch_limit must be > 0" if prefetch_limit && !(prefetch_limit.is_a?(Integer) && prefetch_limit.positive?) diff --git a/lib/pgbus/execution_pools.rb b/lib/pgbus/execution_pools.rb new file mode 100644 index 0000000..22701de --- /dev/null +++ b/lib/pgbus/execution_pools.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true + +module Pgbus + module ExecutionPools + class << self + def build(mode:, capacity:, on_state_change: nil) + case normalize_mode(mode) + when :threads + ThreadPool.new(capacity: capacity, on_state_change: on_state_change) + when :async + AsyncPool.new(capacity: capacity, on_state_change: on_state_change) + end + end + + def normalize_mode(mode) + case mode.to_s + when "", "threads" + :threads + when "async", "fiber" + :async + else + raise ArgumentError, "Unknown execution_mode: #{mode.inspect}. Expected one of: :threads, :async, :fiber" + end + end + end + end +end diff --git a/lib/pgbus/execution_pools/async_pool.rb b/lib/pgbus/execution_pools/async_pool.rb new file mode 100644 index 0000000..0e45d25 --- /dev/null +++ b/lib/pgbus/execution_pools/async_pool.rb @@ -0,0 +1,175 @@ +# frozen_string_literal: true + +module Pgbus + module ExecutionPools + class AsyncPool + attr_reader :capacity + + IDLE_WAIT_INTERVAL = 0.01 + + def initialize(capacity:, on_state_change: nil) + @capacity = capacity + @on_state_change = on_state_change + @available_capacity = capacity + @mutex = Mutex.new + @state_mutex = Mutex.new + @shutdown_flag = false + @fatal_error = nil + @boot_queue = Thread::Queue.new + @pending = Thread::Queue.new + + validate_dependencies! + @reactor_thread = start_reactor + result = @boot_queue.pop + raise result if result.is_a?(Exception) + end + + def post(&block) + raise_if_fatal! + raise "Execution pool is shutting down" if shutdown? + + reserved = false + reserve_capacity! + reserved = true + @pending << block + rescue StandardError + restore_capacity if reserved + raise + end + + def available_capacity + raise_if_fatal! + @mutex.synchronize { @available_capacity } + end + + def idle? + available_capacity.positive? + end + + def shutdown + @state_mutex.synchronize do + return false if @shutdown_flag + + @shutdown_flag = true + end + end + + def shutdown? + @state_mutex.synchronize { @shutdown_flag } + end + + def wait_for_termination(timeout) + @reactor_thread&.join(timeout) + end + + def kill + shutdown + @reactor_thread&.kill + end + + def metadata + inflight = @mutex.synchronize { @available_capacity } + { + mode: :async, + capacity: @capacity, + busy: @capacity - inflight + } + end + + private + + def validate_dependencies! + require "async" + require "async/semaphore" + rescue LoadError => e + raise LoadError, + "Async execution mode requires the `async` gem. " \ + "Add `gem \"async\"` to your Gemfile. Original error: #{e.message}" + end + + # rubocop:disable Lint/RescueException + def start_reactor + Thread.new do + Thread.current.name = "pgbus-async-reactor-#{object_id}" + Async do |task| + semaphore = Async::Semaphore.new(@capacity, parent: task) + @boot_queue << :ready + + wait_for_executions(semaphore) + wait_for_inflight + end + rescue Exception => e + register_fatal_error(e) + raise + end + end + # rubocop:enable Lint/RescueException + + def wait_for_executions(semaphore) + loop do + schedule_pending(semaphore) + break if shutdown? && @pending.empty? + + sleep(IDLE_WAIT_INTERVAL) if @pending.empty? + end + end + + def schedule_pending(semaphore) + while (block = next_pending) + semaphore.async do + perform(block) + end + end + end + + def next_pending + @pending.pop(true) + rescue ThreadError + nil + end + + def perform(block) + block.call + rescue StandardError => e + Pgbus.logger.error { "[Pgbus] Async pool fiber error: #{e.class}: #{e.message}" } + ensure + restore_capacity + end + + def reserve_capacity! + @mutex.synchronize do + raise "Execution pool is at capacity" if @available_capacity <= 0 + + @available_capacity -= 1 + end + end + + def restore_capacity + should_notify = @mutex.synchronize do + @available_capacity += 1 + @available_capacity.positive? + end + @on_state_change&.call if should_notify + end + + def register_fatal_error(error) + @state_mutex.synchronize { @fatal_error ||= error } + @boot_queue << error if @boot_queue.empty? + @on_state_change&.call + end + + def raise_if_fatal! + error = @state_mutex.synchronize { @fatal_error } + raise error if error + end + + def wait_for_inflight + sleep(IDLE_WAIT_INTERVAL) while inflight? + end + + def inflight? + @mutex.synchronize { @available_capacity < @capacity } + end + end + end +end diff --git a/lib/pgbus/execution_pools/thread_pool.rb b/lib/pgbus/execution_pools/thread_pool.rb new file mode 100644 index 0000000..80daf85 --- /dev/null +++ b/lib/pgbus/execution_pools/thread_pool.rb @@ -0,0 +1,61 @@ +# frozen_string_literal: true + +require "concurrent" + +module Pgbus + module ExecutionPools + class ThreadPool + attr_reader :capacity + + def initialize(capacity:, on_state_change: nil) + @capacity = capacity + @on_state_change = on_state_change + @available_capacity = Concurrent::AtomicFixnum.new(capacity) + @pool = Concurrent::FixedThreadPool.new(capacity) + end + + def post(&block) + @available_capacity.decrement + begin + @pool.post do + block.call + ensure + @available_capacity.increment + @on_state_change&.call + end + rescue StandardError + @available_capacity.increment + raise + end + end + + def available_capacity + @available_capacity.value + end + + def idle? + available_capacity.positive? + end + + def shutdown + @pool.shutdown + end + + def wait_for_termination(timeout) + @pool.wait_for_termination(timeout) + end + + def kill + @pool.kill + end + + def metadata + { + mode: :threads, + capacity: @capacity, + busy: @capacity - available_capacity + } + end + end + end +end diff --git a/lib/pgbus/process/consumer.rb b/lib/pgbus/process/consumer.rb index 4c5e82c..be91e07 100644 --- a/lib/pgbus/process/consumer.rb +++ b/lib/pgbus/process/consumer.rb @@ -7,14 +7,15 @@ module Process class Consumer include SignalHandler - attr_reader :topics, :threads, :config + attr_reader :topics, :threads, :config, :execution_mode - def initialize(topics:, threads: 3, config: Pgbus.configuration) + def initialize(topics:, threads: 3, config: Pgbus.configuration, execution_mode: :threads) @topics = Array(topics) @threads = threads @config = config + @execution_mode = ExecutionPools.normalize_mode(execution_mode) @shutting_down = false - @pool = Concurrent::FixedThreadPool.new(threads) + @pool = ExecutionPools.build(mode: @execution_mode, capacity: threads) @registry = EventBus::Registry.instance end @@ -53,7 +54,7 @@ def setup_subscriptions end def consume - idle = @pool.max_length - @pool.queue_length + idle = @pool.available_capacity return interruptible_sleep(config.polling_interval) if idle <= 0 tagged_messages = if @queue_names.size == 1 diff --git a/lib/pgbus/process/supervisor.rb b/lib/pgbus/process/supervisor.rb index cde3e2f..7b4933a 100644 --- a/lib/pgbus/process/supervisor.rb +++ b/lib/pgbus/process/supervisor.rb @@ -60,6 +60,7 @@ def fork_worker(worker_config) threads = worker_config[:threads] || worker_config["threads"] || 5 single_active = worker_config[:single_active_consumer] || worker_config["single_active_consumer"] || false priority = worker_config[:consumer_priority] || worker_config["consumer_priority"] || 0 + exec_mode = config.execution_mode_for(worker_config) pid = fork do restore_signals @@ -68,7 +69,8 @@ def fork_worker(worker_config) bootstrap_queues worker = Worker.new( queues: queues, threads: threads, config: config, - single_active_consumer: single_active, consumer_priority: priority + single_active_consumer: single_active, consumer_priority: priority, + execution_mode: exec_mode ) worker.run end @@ -79,7 +81,7 @@ def fork_worker(worker_config) end @forks[pid] = { type: :worker, config: worker_config } - Pgbus.logger.info { "[Pgbus] Forked worker pid=#{pid} queues=#{queues.join(",")}" } + Pgbus.logger.info { "[Pgbus] Forked worker pid=#{pid} queues=#{queues.join(",")} mode=#{exec_mode}" } rescue Errno::EAGAIN, Errno::ENOMEM => e Pgbus.logger.error { "[Pgbus] Fork failed for worker: #{e.message}" } end diff --git a/lib/pgbus/process/worker.rb b/lib/pgbus/process/worker.rb index c52d116..a8ccde1 100644 --- a/lib/pgbus/process/worker.rb +++ b/lib/pgbus/process/worker.rb @@ -7,14 +7,16 @@ module Process class Worker include SignalHandler - attr_reader :queues, :threads, :config + attr_reader :queues, :threads, :config, :execution_mode def initialize(queues:, threads: 5, config: Pgbus.configuration, - single_active_consumer: false, consumer_priority: 0) + single_active_consumer: false, consumer_priority: 0, + execution_mode: :threads) @queues = Array(queues) @wildcard = @queues.include?("*") @threads = threads @config = config + @execution_mode = ExecutionPools.normalize_mode(execution_mode) @single_active_consumer = single_active_consumer @consumer_priority = consumer_priority @lifecycle = Lifecycle.new @@ -27,10 +29,14 @@ def initialize(queues:, threads: 5, config: Pgbus.configuration, @started_at_monotonic = monotonic_now @stat_buffer = config.stats_enabled ? Pgbus::StatBuffer.new : nil @executor = Pgbus::ActiveJob::Executor.new(stat_buffer: @stat_buffer) - @pool = Concurrent::FixedThreadPool.new(threads) + @wake_signal = WakeSignal.new + @pool = ExecutionPools.build( + mode: @execution_mode, + capacity: threads, + on_state_change: -> { @wake_signal.notify! } + ) @circuit_breaker = Pgbus::CircuitBreaker.new(config: config) @queue_lock = QueueLock.new if @single_active_consumer - @wake_signal = WakeSignal.new end def stats @@ -39,12 +45,13 @@ def stats jobs_failed: @jobs_failed.value, in_flight: @in_flight.value, state: @lifecycle.state, + execution_mode: @execution_mode, consumer_priority: @consumer_priority, single_active_consumer: @single_active_consumer, locked_queues: @queue_lock&.held_queues || [], rates: @rate_counter.rates, started_at: @started_at - } + }.merge(@pool.metadata) end def run @@ -52,7 +59,10 @@ def run start_heartbeat resolve_wildcard_queues @lifecycle.transition_to!(:running) - Pgbus.logger.info { "[Pgbus] Worker started: queues=#{queues.join(",")} threads=#{threads} pid=#{::Process.pid}" } + Pgbus.logger.info do + "[Pgbus] Worker started: queues=#{queues.join(",")} threads=#{threads} " \ + "mode=#{@execution_mode} pid=#{::Process.pid}" + end loop do process_signals @@ -60,7 +70,7 @@ def run refresh_wildcard_queues break if @lifecycle.stopped? - break if @lifecycle.draining? && @pool.queue_length.zero? + break if @lifecycle.draining? && @pool.idle? claim_and_execute if @lifecycle.can_process? @stat_buffer&.flush_if_due @@ -97,7 +107,7 @@ def immediate_shutdown def claim_and_execute poll_interval = effective_polling_interval - idle = @pool.max_length - @pool.queue_length + idle = @pool.available_capacity return @wake_signal.wait(timeout: poll_interval) if idle <= 0 if config.prefetch_limit @@ -332,7 +342,7 @@ def start_heartbeat kind: "worker", metadata: { queues: queues, threads: threads, pid: ::Process.pid, - consumer_priority: @consumer_priority + execution_mode: @execution_mode, consumer_priority: @consumer_priority }, on_beat: -> { @rate_counter.snapshot! } ) diff --git a/lib/pgbus/web/streamer/listener.rb b/lib/pgbus/web/streamer/listener.rb index 555e3e8..4d7b056 100644 --- a/lib/pgbus/web/streamer/listener.rb +++ b/lib/pgbus/web/streamer/listener.rb @@ -113,6 +113,14 @@ def run_loop @conn.wait_for_notify(timeout_s) do |channel, _pid, _payload| handle_notify(channel) end || run_health_check + rescue IOError => e + # #stop closes the PG connection to interrupt + # wait_for_notify, which raises IOError ("stream closed + # in another thread"). This is expected — exit cleanly. + break unless @running + + @logger.warn { "[Pgbus::Streamer::Listener] IO error (#{e.class}: #{e.message}) — reconnecting" } + reconnect! rescue PG::Error => e break unless @running diff --git a/spec/pgbus/allocation_budget_spec.rb b/spec/pgbus/allocation_budget_spec.rb index f2c6e34..41ae7d7 100644 --- a/spec/pgbus/allocation_budget_spec.rb +++ b/spec/pgbus/allocation_budget_spec.rb @@ -110,7 +110,12 @@ def transaction(...) = yield(self) it "retains zero objects across 100 send_message cycles" do 10.times { plain_client.send_message("default", small_payload) } - report = MemoryProfiler.report do + # Scope to lib/pgbus/ so retained objects from external gems + # (async fiber scheduler hooks, connection_pool singletons, JSON + # parser caches) loaded by other specs in the same process don't + # pollute the measurement. This test validates that Pgbus's own + # send_message path leaks nothing. + report = MemoryProfiler.report(allow_files: "lib/pgbus") do 100.times { plain_client.send_message("default", small_payload) } end diff --git a/spec/pgbus/cli_spec.rb b/spec/pgbus/cli_spec.rb index c6029f0..a26bd88 100644 --- a/spec/pgbus/cli_spec.rb +++ b/spec/pgbus/cli_spec.rb @@ -137,6 +137,24 @@ end end + context "with --execution-mode flag" do + it "sets config.execution_mode to :async" do + original_mode = Pgbus.configuration.execution_mode + described_class.start(["start", "--execution-mode", "async"]) + expect(Pgbus.configuration.execution_mode).to eq(:async) + ensure + Pgbus.configuration.execution_mode = original_mode + end + + it "sets config.execution_mode to :threads" do + original_mode = Pgbus.configuration.execution_mode + described_class.start(["start", "--execution-mode", "threads"]) + expect(Pgbus.configuration.execution_mode).to eq(:threads) + ensure + Pgbus.configuration.execution_mode = original_mode + end + end + context "with multiple --*-only flags (mutually exclusive)" do it "raises ArgumentError when --workers-only and --scheduler-only are both passed" do expect do diff --git a/spec/pgbus/configuration_spec.rb b/spec/pgbus/configuration_spec.rb index 5e325cd..d49935a 100644 --- a/spec/pgbus/configuration_spec.rb +++ b/spec/pgbus/configuration_spec.rb @@ -108,6 +108,33 @@ it "has default recurring execution retention of 7 days" do expect(config.recurring_execution_retention).to eq(7 * 24 * 3600) end + + it "defaults execution_mode to :threads" do + expect(config.execution_mode).to eq(:threads) + end + end + + describe "#execution_mode_for" do + it "returns the global default when worker has no override" do + expect(config.execution_mode_for({})).to eq(:threads) + end + + it "returns the worker-level override when present" do + expect(config.execution_mode_for(execution_mode: :async)).to eq(:async) + end + + it "normalizes :fiber to :async" do + expect(config.execution_mode_for(execution_mode: :fiber)).to eq(:async) + end + + it "falls back to global execution_mode" do + config.execution_mode = :async + expect(config.execution_mode_for({})).to eq(:async) + end + + it "accepts string keys" do + expect(config.execution_mode_for("execution_mode" => "async")).to eq(:async) + end end describe "#queue_name" do @@ -834,6 +861,28 @@ config.retry_backoff_jitter = 0.2 expect { config.validate! }.not_to raise_error end + + it "rejects invalid global execution_mode" do + config.execution_mode = :bogus + expect { config.validate! }.to raise_error(ArgumentError, /execution_mode/i) + end + + it "accepts valid execution_mode values" do + %i[threads async fiber].each do |mode| + config.execution_mode = mode + expect { config.validate! }.not_to raise_error + end + end + + it "rejects invalid per-worker execution_mode" do + config.workers = [{ queues: %w[default], threads: 5, execution_mode: :bogus }] + expect { config.validate! }.to raise_error(ArgumentError, /execution_mode/i) + end + + it "accepts per-worker execution_mode override" do + config.workers = [{ queues: %w[default], threads: 50, execution_mode: :async }] + expect { config.validate! }.not_to raise_error + end end describe "#connection_options" do diff --git a/spec/pgbus/execution_pools/async_pool_spec.rb b/spec/pgbus/execution_pools/async_pool_spec.rb new file mode 100644 index 0000000..4ccdc9a --- /dev/null +++ b/spec/pgbus/execution_pools/async_pool_spec.rb @@ -0,0 +1,149 @@ +# frozen_string_literal: true + +require "spec_helper" + +RSpec.describe Pgbus::ExecutionPools::AsyncPool do + subject(:pool) { described_class.new(capacity: capacity) } + + let(:capacity) { 3 } + + after do + pool.shutdown + pool.wait_for_termination(5) + end + + describe "#initialize" do + it "creates a pool with given capacity" do + expect(pool.capacity).to eq(3) + end + + it "starts fully available" do + expect(pool.available_capacity).to eq(3) + end + + it "boots the reactor thread synchronously" do + # If we get here without hanging, boot sync worked. + # Verify the pool is functional by posting and completing a task. + result = Concurrent::IVar.new + pool.post { result.set(:booted) } + expect(result.value(5)).to eq(:booted) + end + end + + describe "#post" do + it "executes the submitted block as a fiber" do + result = Concurrent::IVar.new + pool.post { result.set(42) } + expect(result.value(5)).to eq(42) + end + + it "executes multiple blocks concurrently" do + results = Concurrent::Array.new + done = Concurrent::CountDownLatch.new(3) + + 3.times do |i| + pool.post do + results << i + done.count_down + end + end + + expect(done.wait(5)).to be true + expect(results.size).to eq(3) + end + + it "raises when pool is shutting down" do + pool.shutdown + sleep 0.05 + expect { pool.post { nil } }.to raise_error(RuntimeError, /shutting down/) + end + + it "raises when pool is at capacity" do + barrier = Concurrent::Event.new + + capacity.times { pool.post { barrier.wait(5) } } + sleep 0.05 + + expect { pool.post { nil } }.to raise_error(RuntimeError, /at capacity/) + barrier.set + end + end + + describe "#available_capacity" do + it "decreases when work is posted" do + barrier = Concurrent::Event.new + + pool.post { barrier.wait(5) } + sleep 0.05 + + expect(pool.available_capacity).to be < capacity + barrier.set + end + + it "increases when work completes" do + done = Concurrent::Event.new + pool.post { done.set } + done.wait(5) + sleep 0.05 + + expect(pool.available_capacity).to eq(capacity) + end + end + + describe "#idle?" do + it "is true when no work is queued" do + expect(pool).to be_idle + end + end + + describe "#on_state_change callback" do + it "fires when a fiber completes" do + callback_called = Concurrent::Event.new + pool_with_cb = described_class.new( + capacity: 3, + on_state_change: -> { callback_called.set } + ) + + pool_with_cb.post { nil } + expect(callback_called.wait(5)).to be true + ensure + pool_with_cb&.shutdown + pool_with_cb&.wait_for_termination(2) + end + end + + describe "#shutdown" do + it "waits for in-flight fibers to complete" do + result = Concurrent::IVar.new + pool.post do + sleep 0.05 + result.set(:done) + end + pool.shutdown + pool.wait_for_termination(5) + + expect(result.value(0)).to eq(:done) + end + end + + describe "#metadata" do + it "returns pool information" do + meta = pool.metadata + expect(meta[:mode]).to eq(:async) + expect(meta[:capacity]).to eq(3) + expect(meta[:busy]).to eq(0) + end + end + + describe "error handling" do + it "does not crash the reactor when a fiber raises" do + pool.post { raise "boom" } + sleep 0.1 + + # Pool should still accept new work + result = Concurrent::IVar.new + pool.post { result.set(:ok) } + expect(result.value(5)).to eq(:ok) + end + end +end diff --git a/spec/pgbus/execution_pools/thread_pool_spec.rb b/spec/pgbus/execution_pools/thread_pool_spec.rb new file mode 100644 index 0000000..2682d2c --- /dev/null +++ b/spec/pgbus/execution_pools/thread_pool_spec.rb @@ -0,0 +1,137 @@ +# frozen_string_literal: true + +require "spec_helper" + +RSpec.describe Pgbus::ExecutionPools::ThreadPool do + subject(:pool) { described_class.new(capacity: capacity) } + + let(:capacity) { 3 } + + after { pool.kill } + + describe "#initialize" do + it "creates a pool with given capacity" do + expect(pool.capacity).to eq(3) + end + + it "starts fully available" do + expect(pool.available_capacity).to eq(3) + end + end + + describe "#post" do + it "fires on_state_change when work completes" do + callback_called = Concurrent::Event.new + pool_with_cb = described_class.new( + capacity: 3, + on_state_change: -> { callback_called.set } + ) + + pool_with_cb.post { nil } + expect(callback_called.wait(2)).to be true + ensure + pool_with_cb&.kill + end + + it "executes the submitted block" do + result = Concurrent::IVar.new + pool.post { result.set(42) } + expect(result.value(2)).to eq(42) + end + + it "executes multiple blocks concurrently" do + results = Concurrent::Array.new + latch = Concurrent::CountDownLatch.new(3) + + 3.times do |i| + pool.post do + results << i + latch.count_down + end + end + + expect(latch.wait(5)).to be true + expect(results.size).to eq(3) + end + end + + describe "#available_capacity" do + it "decreases when work is posted" do + barrier = Concurrent::Event.new + + pool.post { barrier.wait(5) } + sleep 0.05 # let thread start + + expect(pool.available_capacity).to be < capacity + barrier.set + end + + it "increases when work completes" do + done = Concurrent::Event.new + pool.post { done.set } + done.wait(2) + sleep 0.05 # let pool reclaim thread + + expect(pool.available_capacity).to eq(capacity) + end + end + + describe "#idle?" do + it "is true when no work is queued" do + expect(pool).to be_idle + end + + it "is false when at capacity" do + barrier = Concurrent::Event.new + + capacity.times { pool.post { barrier.wait(5) } } + sleep 0.05 + + expect(pool).not_to be_idle + barrier.set + end + end + + describe "#shutdown" do + it "finishes in-progress work" do + result = Concurrent::IVar.new + pool.post do + sleep 0.05 + result.set(:done) + end + pool.shutdown + + expect(result.value(2)).to eq(:done) + end + end + + describe "#wait_for_termination" do + it "blocks until all work completes" do + result = Concurrent::IVar.new + pool.post do + sleep 0.05 + result.set(:done) + end + pool.shutdown + pool.wait_for_termination(5) + + expect(result.value(0)).to eq(:done) + end + end + + describe "#kill" do + it "abandons queued work" do + pool.kill + expect { pool.post { nil } }.to raise_error(Concurrent::RejectedExecutionError) + end + end + + describe "#metadata" do + it "returns pool information" do + meta = pool.metadata + expect(meta[:mode]).to eq(:threads) + expect(meta[:capacity]).to eq(3) + expect(meta[:busy]).to eq(0) + end + end +end diff --git a/spec/pgbus/execution_pools_spec.rb b/spec/pgbus/execution_pools_spec.rb new file mode 100644 index 0000000..c9921ba --- /dev/null +++ b/spec/pgbus/execution_pools_spec.rb @@ -0,0 +1,56 @@ +# frozen_string_literal: true + +require "spec_helper" + +RSpec.describe Pgbus::ExecutionPools do + describe ".build" do + it "returns a ThreadPool for :threads mode" do + pool = described_class.build(mode: :threads, capacity: 3) + expect(pool).to be_a(Pgbus::ExecutionPools::ThreadPool) + pool.kill + end + + it "returns an AsyncPool for :async mode" do + pool = described_class.build(mode: :async, capacity: 3) + expect(pool).to be_a(Pgbus::ExecutionPools::AsyncPool) + pool.shutdown + pool.wait_for_termination(2) + end + + it "returns an AsyncPool for :fiber mode (alias)" do + pool = described_class.build(mode: :fiber, capacity: 3) + expect(pool).to be_a(Pgbus::ExecutionPools::AsyncPool) + pool.shutdown + pool.wait_for_termination(2) + end + + it "raises ArgumentError for unknown mode" do + expect { described_class.build(mode: :unknown, capacity: 3) } + .to raise_error(ArgumentError, /unknown execution_mode/i) + end + end + + describe ".normalize_mode" do + it "normalizes :fiber to :async" do + expect(described_class.normalize_mode(:fiber)).to eq(:async) + end + + it "passes :threads through" do + expect(described_class.normalize_mode(:threads)).to eq(:threads) + end + + it "passes :async through" do + expect(described_class.normalize_mode(:async)).to eq(:async) + end + + it "accepts string modes" do + expect(described_class.normalize_mode("threads")).to eq(:threads) + expect(described_class.normalize_mode("async")).to eq(:async) + end + + it "raises for unknown modes" do + expect { described_class.normalize_mode(:bogus) } + .to raise_error(ArgumentError) + end + end +end diff --git a/spec/pgbus/process/consumer_spec.rb b/spec/pgbus/process/consumer_spec.rb index 689b8e0..b142d30 100644 --- a/spec/pgbus/process/consumer_spec.rb +++ b/spec/pgbus/process/consumer_spec.rb @@ -5,7 +5,14 @@ RSpec.describe Pgbus::Process::Consumer do let(:mock_client) { build_mock_client } - let(:mock_pool) { instance_double(Concurrent::FixedThreadPool, kill: nil, shutdown: nil, wait_for_termination: nil) } + let(:mock_pool) do + instance_double( + Pgbus::ExecutionPools::ThreadPool, + capacity: 3, available_capacity: 3, idle?: true, + kill: nil, shutdown: nil, wait_for_termination: nil, + metadata: { mode: :threads, capacity: 3, busy: 0 } + ) + end let(:mock_heartbeat) { instance_double(Pgbus::Process::Heartbeat, start: nil, stop: nil) } let(:subscriber_a) { instance_double(Pgbus::EventBus::Subscriber, pattern: "orders.#", queue_name: "q_orders") } let(:subscriber_b) { instance_double(Pgbus::EventBus::Subscriber, pattern: "payments.completed", queue_name: "q_payments") } @@ -14,7 +21,7 @@ before do allow(Pgbus).to receive(:client).and_return(mock_client) - allow(Concurrent::FixedThreadPool).to receive(:new).and_return(mock_pool) + allow(Pgbus::ExecutionPools).to receive(:build).and_return(mock_pool) allow(Pgbus::Process::Heartbeat).to receive(:new).and_return(mock_heartbeat) allow(Pgbus::EventBus::Registry).to receive(:instance).and_return(registry) end diff --git a/spec/pgbus/process/worker_spec.rb b/spec/pgbus/process/worker_spec.rb index 9a5200f..ed23b15 100644 --- a/spec/pgbus/process/worker_spec.rb +++ b/spec/pgbus/process/worker_spec.rb @@ -6,7 +6,14 @@ let(:heartbeat) { instance_double(Pgbus::Process::Heartbeat, start: true, stop: true) } let(:mock_client) { build_mock_client } let(:executor) { instance_double(Pgbus::ActiveJob::Executor) } - let(:pool) { instance_double(Concurrent::FixedThreadPool, max_length: 5, queue_length: 0, shutdown: true, kill: true) } + let(:pool) do + instance_double( + Pgbus::ExecutionPools::ThreadPool, + capacity: 5, available_capacity: 5, idle?: true, + shutdown: true, kill: true, wait_for_termination: true, + metadata: { mode: :threads, capacity: 5, busy: 0 } + ) + end let(:circuit_breaker) { instance_double(Pgbus::CircuitBreaker, paused?: false, record_success: nil, record_failure: nil) } let(:worker) { described_class.new(queues: %w[default], threads: 5) } @@ -14,16 +21,37 @@ allow(Pgbus::Process::Heartbeat).to receive(:new).and_return(heartbeat) allow(Pgbus).to receive(:client).and_return(mock_client) allow(Pgbus::ActiveJob::Executor).to receive(:new).and_return(executor) - allow(Concurrent::FixedThreadPool).to receive(:new).and_return(pool) + allow(Pgbus::ExecutionPools).to receive(:build).and_return(pool) allow(Pgbus::CircuitBreaker).to receive(:new).and_return(circuit_breaker) end describe "#initialize" do - it "stores config, queues, and creates a thread pool" do + it "stores config, queues, and creates an execution pool" do expect(worker.queues).to eq(%w[default]) expect(worker.threads).to eq(5) expect(worker.config).to be_a(Pgbus::Configuration) - expect(Concurrent::FixedThreadPool).to have_received(:new).with(5) + expect(Pgbus::ExecutionPools).to have_received(:build).with( + mode: :threads, capacity: 5, on_state_change: an_instance_of(Proc) + ) + end + + it "defaults to threads execution mode" do + expect(worker.execution_mode).to eq(:threads) + end + + it "accepts async execution mode" do + async_pool = instance_double( + Pgbus::ExecutionPools::AsyncPool, + capacity: 50, available_capacity: 50, idle?: true, + shutdown: true, kill: true, wait_for_termination: true, + metadata: { mode: :async, capacity: 50, busy: 0 } + ) + allow(Pgbus::ExecutionPools).to receive(:build) + .with(mode: :async, capacity: 50, on_state_change: an_instance_of(Proc)) + .and_return(async_pool) + + async_worker = described_class.new(queues: %w[default], threads: 50, execution_mode: :async) + expect(async_worker.execution_mode).to eq(:async) end it "initializes stats tracking" do @@ -31,7 +59,10 @@ expect(worker.stats[:jobs_failed]).to eq(0) expect(worker.stats[:in_flight]).to eq(0) expect(worker.stats[:state]).to eq(:starting) + expect(worker.stats[:execution_mode]).to eq(:threads) expect(worker.stats[:started_at]).to be_a(Time) + expect(worker.stats[:mode]).to eq(:threads) + expect(worker.stats[:capacity]).to eq(5) end end