Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ bundle exec rake # Both
Layer 6: Dashboard app/controllers/pgbus/, app/views/pgbus/
Layer 5: CLI lib/pgbus/cli.rb
Layer 4: Process Model lib/pgbus/process/ (supervisor, worker, dispatcher, consumer)
Execution Pools lib/pgbus/execution_pools/ (thread_pool, async_pool)
Layer 3: Event Bus lib/pgbus/event_bus/ (publisher, subscriber, registry, handler)
Layer 2: ActiveJob lib/pgbus/active_job/ (adapter, executor)
Layer 1: Client lib/pgbus/client.rb (PGMQ wrapper)
Expand Down
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,36 @@ end

When a limit is hit, the worker drains its thread pool, exits, and the supervisor forks a fresh process. RSS memory is sampled from `/proc/self/statm` (Linux) or `ps -o rss` (macOS).

### Async execution mode (fibers)

Workers can optionally execute jobs as fibers instead of threads. This is ideal for I/O-bound workloads (HTTP calls, email delivery, LLM API calls) where jobs spend most of their time waiting on network I/O.

```ruby
Pgbus.configure do |config|
# Global: all workers use async mode
config.execution_mode = :async

# Or per-worker: mix thread and async workers
config.workers = [
{ queues: %w[webhooks emails], threads: 100, execution_mode: :async },
{ queues: %w[default], threads: 5 } # stays thread-based
]
end
```

**Prerequisites:**

1. Add `gem "async"` to your Gemfile
2. Set `config.active_support.isolation_level = :fiber` in your Rails app

**Why it reduces connections:** In thread mode, each thread holds a database connection while waiting on I/O. With 50 threads, that's 50 connections. In async mode, 50 fibers share 3-5 connections because fibers yield during I/O and only one runs at a time.

**CLI flag:** `pgbus start --execution-mode async`

**Safety:** Messages stay in PGMQ with visibility timeout protection regardless of execution mode. If a fiber or worker crashes, the visibility timeout expires and messages become available for re-read. No data loss risk.

**Not recommended for:** CPU-bound jobs (image processing, heavy computation). These block the single reactor thread and should use thread mode.

## Routing and ordering

How messages flow between producers and the workers that handle them: priority sub-queues, consumer priority for active/standby workers, and single-active-consumer for strict ordering.
Expand Down
130 changes: 130 additions & 0 deletions benchmarks/connection_pool_bench.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# frozen_string_literal: true

# Connection Pool Benchmark
# Measures peak active Postgres connections under varying concurrency
# for ThreadPool vs AsyncPool execution modes.
#
# REQUIRES: PGBUS_DATABASE_URL environment variable
#
# Usage:
# PGBUS_DATABASE_URL=postgres://user@localhost/pgbus_test ruby benchmarks/connection_pool_bench.rb

require "json"
require "securerandom"
require "concurrent"
require "pgbus"
require "pg"

DATABASE_URL = ENV.fetch("PGBUS_DATABASE_URL") do
warn "PGBUS_DATABASE_URL not set. This benchmark requires a real PostgreSQL database."
warn "Example: PGBUS_DATABASE_URL=postgres://user@localhost:5432/pgbus_test ruby benchmarks/connection_pool_bench.rb"
exit 1
end

Pgbus.configure do |c|
c.logger = Logger.new(IO::NULL)
c.queue_prefix = "pgbus_connbench"
c.default_queue = "default"
c.stats_enabled = false
c.database_url = DATABASE_URL
end

def monitor_connection(url)
PG.connect(url)
end

def count_active_connections(conn, prefix = "pgbus_connbench")
pattern = "%#{prefix}%"
result = conn.exec_params(
"SELECT count(*) FROM pg_stat_activity " \
"WHERE application_name LIKE $1 OR query LIKE $1",
[pattern]
)
result[0]["count"].to_i
rescue PG::Error => e
warn "Connection count query failed: #{e.message}"
0
end
Comment thread
mhenrixon marked this conversation as resolved.

def update_peak(peak, current)
old = peak.value
peak.compare_and_set(old, current) if current > old
end

def measure_peak_connections(mode:, capacity:, tasks:, monitor_conn:)
pool = if mode == :threads
Pgbus::ExecutionPools::ThreadPool.new(capacity: capacity)
else
Pgbus::ExecutionPools::AsyncPool.new(capacity: capacity)
end

peak = Concurrent::AtomicFixnum.new(0)
done = Concurrent::CountDownLatch.new(tasks)

# Monitor thread samples pg_stat_activity during the benchmark
stop_monitoring = Concurrent::AtomicBoolean.new(false)
monitor = Thread.new do
while stop_monitoring.false?
current = count_active_connections(monitor_conn)
update_peak(peak, current)
sleep 0.01
end
end

tasks.times do
pool.post do
# Simulate a real job: query the database
conn = PG.connect(DATABASE_URL)
conn.exec("SELECT pg_sleep(0.01)")
conn.close
done.count_down
rescue PG::Error
done.count_down
end
end

done.wait(30)
stop_monitoring.make_true
monitor.join(2)

pool.shutdown
pool.wait_for_termination(10)

peak.value
end

puts "=" * 70
puts "Connection Pool Benchmark"
puts "Database: #{DATABASE_URL.sub(%r{//[^@]+@}, "//***@")}"
puts "=" * 70

monitor_conn = monitor_connection(DATABASE_URL)

configs = [
{ mode: :threads, capacity: 5, tasks: 20 },
{ mode: :threads, capacity: 10, tasks: 40 },
{ mode: :threads, capacity: 25, tasks: 100 },
{ mode: :async, capacity: 50, tasks: 200 },
{ mode: :async, capacity: 100, tasks: 400 }
]

header = ["Mode", "Capacity", "Tasks", "Peak Connections"]
puts format("\n%-12s %-10s %-8s %-20s", *header)
puts "-" * 55

configs.each do |cfg|
peak = measure_peak_connections(
mode: cfg[:mode],
capacity: cfg[:capacity],
tasks: cfg[:tasks],
monitor_conn: monitor_conn
)
puts format("%-12s %-10d %-8d %-20d", cfg[:mode], cfg[:capacity], cfg[:tasks], peak)
sleep 1 # let connections drain between runs
end

monitor_conn.close

puts "\nExpected: async mode should show significantly fewer peak connections"
puts "than thread mode, regardless of fiber capacity."
puts "\nDone."
167 changes: 167 additions & 0 deletions benchmarks/execution_pool_bench.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# frozen_string_literal: true

require_relative "bench_helper"

# Execution Pool Benchmark
# Compares ThreadPool vs AsyncPool throughput, latency, and memory.
#
# Usage:
# ruby benchmarks/execution_pool_bench.rb

CAPACITIES = [10, 50].freeze

def build_thread_pool(capacity)
Pgbus::ExecutionPools::ThreadPool.new(capacity: capacity)
end

def build_async_pool(capacity)
Pgbus::ExecutionPools::AsyncPool.new(capacity: capacity)
end

# --- 1. No-op throughput (scheduling overhead) ---

puts "=" * 70
puts "1. No-op throughput — scheduling overhead (IPS)"
puts "=" * 70

CAPACITIES.each do |cap|
Benchmark.ips do |x|
x.config(warmup: 2, time: 5)

x.report("ThreadPool(#{cap}) no-op") do
pool = build_thread_pool(cap)
done = Concurrent::CountDownLatch.new(cap)
cap.times { pool.post { done.count_down } }
done.wait(10)
pool.shutdown
pool.wait_for_termination(5)
end

x.report("AsyncPool(#{cap}) no-op") do
pool = build_async_pool(cap)
done = Concurrent::CountDownLatch.new(cap)
cap.times { pool.post { done.count_down } }
done.wait(10)
pool.shutdown
pool.wait_for_termination(5)
end

x.compare!
end
puts
end

# --- 2. I/O-bound throughput (fiber advantage zone) ---

puts "=" * 70
puts "2. I/O-bound throughput — sleep(0.01) simulating DB I/O"
puts "=" * 70

CAPACITIES.each do |cap|
Benchmark.ips do |x|
x.config(warmup: 1, time: 5)

x.report("ThreadPool(#{cap}) I/O") do
pool = build_thread_pool(cap)
done = Concurrent::CountDownLatch.new(cap)
cap.times do
pool.post do
sleep(0.01)
done.count_down
end
end
done.wait(10)
pool.shutdown
pool.wait_for_termination(5)
end

x.report("AsyncPool(#{cap}) I/O") do
pool = build_async_pool(cap)
done = Concurrent::CountDownLatch.new(cap)
cap.times do
pool.post do
sleep(0.01)
done.count_down
end
end
done.wait(10)
pool.shutdown
pool.wait_for_termination(5)
end

x.compare!
end
puts
end
Comment thread
mhenrixon marked this conversation as resolved.

# --- 3. Memory overhead ---

puts "=" * 70
puts "3. Memory overhead — 1000 tasks"
puts "=" * 70

[10, 50].each do |cap|
puts "\n--- Capacity: #{cap} ---"

%i[threads async].each do |mode|
report = MemoryProfiler.report do
100.times do
pool = if mode == :threads
build_thread_pool(cap)
else
build_async_pool(cap)
end
done = Concurrent::CountDownLatch.new(cap)
cap.times { pool.post { done.count_down } }
done.wait(10)
pool.shutdown
pool.wait_for_termination(5)
end
end

puts "\n#{mode.upcase} pool (#{cap} capacity × 100 iterations):"
puts " Total allocated: #{report.total_allocated_memsize} bytes"
puts " Total retained: #{report.total_retained_memsize} bytes"
puts " Allocated objects: #{report.total_allocated}"
puts " Retained objects: #{report.total_retained}"
end
end

# --- 4. Latency percentiles ---

puts "\n#{"=" * 70}"
puts "4. Latency percentiles — single task completion time"
puts "=" * 70

SAMPLES = 500

%i[threads async].each do |mode|
pool = mode == :threads ? build_thread_pool(5) : build_async_pool(5)
latencies = []

SAMPLES.times do
start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
done = Concurrent::Event.new
pool.post { done.set }
done.wait(5)
elapsed = (Process.clock_gettime(Process::CLOCK_MONOTONIC) - start) * 1_000_000 # microseconds
latencies << elapsed
end

pool.shutdown
pool.wait_for_termination(5)

latencies.sort!
p50 = latencies[(SAMPLES * 0.50).to_i]
p95 = latencies[(SAMPLES * 0.95).to_i]
p99 = latencies[(SAMPLES * 0.99).to_i]

puts "\n#{mode.upcase} pool latency (#{SAMPLES} samples):"
puts " p50: #{p50.round(1)} µs"
puts " p95: #{p95.round(1)} µs"
puts " p99: #{p99.round(1)} µs"
puts " min: #{latencies.first.round(1)} µs"
puts " max: #{latencies.last.round(1)} µs"
end

puts "\nDone."
7 changes: 7 additions & 0 deletions lib/pgbus/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def apply_start_options(args)
options = parse_start_options(args)

Pgbus.configuration.workers = options[:queues] if options[:queues]
Pgbus.configuration.execution_mode = options[:execution_mode].to_sym if options[:execution_mode]
apply_capsule_filter(options[:capsule]) if options[:capsule]
apply_role_filter(options)
end
Expand Down Expand Up @@ -70,6 +71,10 @@ def parse_start_options(args)
opts.on("--dispatcher-only", "Run only the dispatcher (the maintenance pod pattern)") do
options[:dispatcher_only] = true
end

opts.on("--execution-mode MODE", "Execution mode: threads (default) or async") do |v|
options[:execution_mode] = v
end
end.parse!(args.dup)
options
end
Expand Down Expand Up @@ -165,6 +170,8 @@ def print_help
pattern — exactly one of these per deployment)
--dispatcher-only Run only the dispatcher (the maintenance pod
pattern)
--execution-mode Execution mode: threads (default) or async
(fiber-based, lower connection usage)
HELP
end
end
Expand Down
Loading