Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ bundle exec rake # Both
Layer 6: Dashboard app/controllers/pgbus/, app/views/pgbus/
Layer 5: CLI lib/pgbus/cli.rb
Layer 4: Process Model lib/pgbus/process/ (supervisor, worker, dispatcher, consumer)
Execution Pools lib/pgbus/execution_pools/ (thread_pool, async_pool)
Layer 3: Event Bus lib/pgbus/event_bus/ (publisher, subscriber, registry, handler)
Layer 2: ActiveJob lib/pgbus/active_job/ (adapter, executor)
Layer 1: Client lib/pgbus/client.rb (PGMQ wrapper)
Expand Down
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,36 @@ end

When a limit is hit, the worker drains its thread pool, exits, and the supervisor forks a fresh process. RSS memory is sampled from `/proc/self/statm` (Linux) or `ps -o rss` (macOS).

### Async execution mode (fibers)

Workers can optionally execute jobs as fibers instead of threads. This is ideal for I/O-bound workloads (HTTP calls, email delivery, LLM API calls) where jobs spend most of their time waiting on network I/O.

```ruby
Pgbus.configure do |config|
# Global: all workers use async mode
config.execution_mode = :async

# Or per-worker: mix thread and async workers
config.workers = [
{ queues: %w[webhooks emails], threads: 100, execution_mode: :async },
{ queues: %w[default], threads: 5 } # stays thread-based
]
end
```

**Prerequisites:**

1. Add `gem "async"` to your Gemfile
2. Set `config.active_support.isolation_level = :fiber` in your Rails app

**Why it reduces connections:** In thread mode, each thread holds a database connection while waiting on I/O. With 50 threads, that's 50 connections. In async mode, 50 fibers share 3-5 connections because fibers yield during I/O and only one runs at a time.

**CLI flag:** `pgbus start --execution-mode async`

**Safety:** Messages stay in PGMQ with visibility timeout protection regardless of execution mode. If a fiber or worker crashes, the visibility timeout expires and messages become available for re-read. No data loss risk.

**Not recommended for:** CPU-bound jobs (image processing, heavy computation). These block the single reactor thread and should use thread mode.

## Routing and ordering

How messages flow between producers and the workers that handle them: priority sub-queues, consumer priority for active/standby workers, and single-active-consumer for strict ordering.
Expand Down
129 changes: 129 additions & 0 deletions benchmarks/connection_pool_bench.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# frozen_string_literal: true

# Connection Pool Benchmark
# Measures peak active Postgres connections under varying concurrency
# for ThreadPool vs AsyncPool execution modes.
#
# REQUIRES: PGBUS_DATABASE_URL environment variable
#
# Usage:
# PGBUS_DATABASE_URL=postgres://user@localhost/pgbus_test ruby benchmarks/connection_pool_bench.rb

require "benchmark/ips"
Comment thread
mhenrixon marked this conversation as resolved.
Outdated
require "json"
require "securerandom"
require "concurrent"
require "pgbus"
require "pg"

DATABASE_URL = ENV.fetch("PGBUS_DATABASE_URL") do
warn "PGBUS_DATABASE_URL not set. This benchmark requires a real PostgreSQL database."
warn "Example: PGBUS_DATABASE_URL=postgres://user@localhost:5432/pgbus_test ruby benchmarks/connection_pool_bench.rb"
exit 1
end

Pgbus.configure do |c|
c.logger = Logger.new(IO::NULL)
c.queue_prefix = "pgbus_connbench"
c.default_queue = "default"
c.stats_enabled = false
c.database_url = DATABASE_URL
end

def monitor_connection(url)
PG.connect(url)
end

def count_active_connections(conn, prefix = "pgbus_connbench")
result = conn.exec(
"SELECT count(*) FROM pg_stat_activity " \
"WHERE application_name LIKE '%#{prefix}%' OR query LIKE '%#{prefix}%'"
)
result[0]["count"].to_i
rescue PG::Error => e
warn "Connection count query failed: #{e.message}"
0
end
Comment thread
mhenrixon marked this conversation as resolved.

def update_peak(peak, current)
old = peak.value
peak.compare_and_set(old, current) if current > old
end

def measure_peak_connections(mode:, capacity:, tasks:, monitor_conn:)
pool = if mode == :threads
Pgbus::ExecutionPools::ThreadPool.new(capacity: capacity)
else
Pgbus::ExecutionPools::AsyncPool.new(capacity: capacity)
end

peak = Concurrent::AtomicFixnum.new(0)
done = Concurrent::CountDownLatch.new(tasks)

# Monitor thread samples pg_stat_activity during the benchmark
stop_monitoring = Concurrent::AtomicBoolean.new(false)
monitor = Thread.new do
while stop_monitoring.false?
current = count_active_connections(monitor_conn)
update_peak(peak, current)
sleep 0.01
end
end

tasks.times do
pool.post do
# Simulate a real job: query the database
conn = PG.connect(DATABASE_URL)
conn.exec("SELECT pg_sleep(0.01)")
conn.close
done.count_down
rescue PG::Error
done.count_down
end
end

done.wait(30)
stop_monitoring.make_true
monitor.join(2)

pool.shutdown
pool.wait_for_termination(10)

peak.value
end

puts "=" * 70
puts "Connection Pool Benchmark"
puts "Database: #{DATABASE_URL.sub(%r{//[^@]+@}, "//***@")}"
puts "=" * 70

monitor_conn = monitor_connection(DATABASE_URL)

configs = [
{ mode: :threads, capacity: 5, tasks: 20 },
{ mode: :threads, capacity: 10, tasks: 40 },
{ mode: :threads, capacity: 25, tasks: 100 },
{ mode: :async, capacity: 50, tasks: 200 },
{ mode: :async, capacity: 100, tasks: 400 }
]

header = ["Mode", "Capacity", "Tasks", "Peak Connections"]
puts format("\n%-12s %-10s %-8s %-20s", *header)
puts "-" * 55

configs.each do |cfg|
peak = measure_peak_connections(
mode: cfg[:mode],
capacity: cfg[:capacity],
tasks: cfg[:tasks],
monitor_conn: monitor_conn
)
puts format("%-12s %-10d %-8d %-20d", cfg[:mode], cfg[:capacity], cfg[:tasks], peak)
sleep 1 # let connections drain between runs
end

monitor_conn.close

puts "\nExpected: async mode should show significantly fewer peak connections"
puts "than thread mode, regardless of fiber capacity."
puts "\nDone."
167 changes: 167 additions & 0 deletions benchmarks/execution_pool_bench.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# frozen_string_literal: true

require_relative "bench_helper"

# Execution Pool Benchmark
# Compares ThreadPool vs AsyncPool throughput, latency, and memory.
#
# Usage:
# ruby benchmarks/execution_pool_bench.rb

CAPACITIES = [10, 50].freeze

def build_thread_pool(capacity)
Pgbus::ExecutionPools::ThreadPool.new(capacity: capacity)
end

def build_async_pool(capacity)
Pgbus::ExecutionPools::AsyncPool.new(capacity: capacity)
end

# --- 1. No-op throughput (scheduling overhead) ---

puts "=" * 70
puts "1. No-op throughput — scheduling overhead (IPS)"
puts "=" * 70

CAPACITIES.each do |cap|
Benchmark.ips do |x|
x.config(warmup: 2, time: 5)

x.report("ThreadPool(#{cap}) no-op") do
pool = build_thread_pool(cap)
done = Concurrent::CountDownLatch.new(cap)
cap.times { pool.post { done.count_down } }
done.wait(10)
pool.shutdown
pool.wait_for_termination(5)
end

x.report("AsyncPool(#{cap}) no-op") do
pool = build_async_pool(cap)
done = Concurrent::CountDownLatch.new(cap)
cap.times { pool.post { done.count_down } }
done.wait(10)
pool.shutdown
pool.wait_for_termination(5)
end

x.compare!
end
puts
end

# --- 2. I/O-bound throughput (fiber advantage zone) ---

puts "=" * 70
puts "2. I/O-bound throughput — sleep(0.01) simulating DB I/O"
puts "=" * 70

CAPACITIES.each do |cap|
Benchmark.ips do |x|
x.config(warmup: 1, time: 5)

x.report("ThreadPool(#{cap}) I/O") do
pool = build_thread_pool(cap)
done = Concurrent::CountDownLatch.new(cap)
cap.times do
pool.post do
sleep(0.01)
done.count_down
end
end
done.wait(10)
pool.shutdown
pool.wait_for_termination(5)
end

x.report("AsyncPool(#{cap}) I/O") do
pool = build_async_pool(cap)
done = Concurrent::CountDownLatch.new(cap)
cap.times do
pool.post do
sleep(0.01)
done.count_down
end
end
done.wait(10)
pool.shutdown
pool.wait_for_termination(5)
end

x.compare!
end
puts
end
Comment thread
mhenrixon marked this conversation as resolved.

# --- 3. Memory overhead ---

puts "=" * 70
puts "3. Memory overhead — 1000 tasks"
puts "=" * 70

[10, 50].each do |cap|
puts "\n--- Capacity: #{cap} ---"

%i[threads async].each do |mode|
report = MemoryProfiler.report do
100.times do
pool = if mode == :threads
build_thread_pool(cap)
else
build_async_pool(cap)
end
done = Concurrent::CountDownLatch.new(cap)
cap.times { pool.post { done.count_down } }
done.wait(10)
pool.shutdown
pool.wait_for_termination(5)
end
end

puts "\n#{mode.upcase} pool (#{cap} capacity × 100 iterations):"
puts " Total allocated: #{report.total_allocated_memsize} bytes"
puts " Total retained: #{report.total_retained_memsize} bytes"
puts " Allocated objects: #{report.total_allocated}"
puts " Retained objects: #{report.total_retained}"
end
end

# --- 4. Latency percentiles ---

puts "\n#{"=" * 70}"
puts "4. Latency percentiles — single task completion time"
puts "=" * 70

SAMPLES = 500

%i[threads async].each do |mode|
pool = mode == :threads ? build_thread_pool(5) : build_async_pool(5)
latencies = []

SAMPLES.times do
start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
done = Concurrent::Event.new
pool.post { done.set }
done.wait(5)
elapsed = (Process.clock_gettime(Process::CLOCK_MONOTONIC) - start) * 1_000_000 # microseconds
latencies << elapsed
end

pool.shutdown
pool.wait_for_termination(5)

latencies.sort!
p50 = latencies[(SAMPLES * 0.50).to_i]
p95 = latencies[(SAMPLES * 0.95).to_i]
p99 = latencies[(SAMPLES * 0.99).to_i]

puts "\n#{mode.upcase} pool latency (#{SAMPLES} samples):"
puts " p50: #{p50.round(1)} µs"
puts " p95: #{p95.round(1)} µs"
puts " p99: #{p99.round(1)} µs"
puts " min: #{latencies.first.round(1)} µs"
puts " max: #{latencies.last.round(1)} µs"
end

puts "\nDone."
7 changes: 7 additions & 0 deletions lib/pgbus/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def apply_start_options(args)
options = parse_start_options(args)

Pgbus.configuration.workers = options[:queues] if options[:queues]
Pgbus.configuration.execution_mode = options[:execution_mode].to_sym if options[:execution_mode]
apply_capsule_filter(options[:capsule]) if options[:capsule]
apply_role_filter(options)
end
Expand Down Expand Up @@ -70,6 +71,10 @@ def parse_start_options(args)
opts.on("--dispatcher-only", "Run only the dispatcher (the maintenance pod pattern)") do
options[:dispatcher_only] = true
end

opts.on("--execution-mode MODE", "Execution mode: threads (default) or async") do |v|
options[:execution_mode] = v
end
end.parse!(args.dup)
options
end
Expand Down Expand Up @@ -165,6 +170,8 @@ def print_help
pattern — exactly one of these per deployment)
--dispatcher-only Run only the dispatcher (the maintenance pod
pattern)
--execution-mode Execution mode: threads (default) or async
(fiber-based, lower connection usage)
HELP
end
end
Expand Down
Loading
Loading