Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 166 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ PostgreSQL-native job processing and event bus for Rails, built on [PGMQ](https:
- [Circuit breaker and queue pause/resume](#circuit-breaker-and-queue-pauseresume)
- [Prefetch flow control](#prefetch-flow-control)
- [Worker recycling](#worker-recycling)
- [Retry backoff](#retry-backoff)
- [Routing and ordering](#routing-and-ordering)
- [Priority queues](#priority-queues)
- [Consumer priority](#consumer-priority)
Expand All @@ -31,6 +32,10 @@ PostgreSQL-native job processing and event bus for Rails, built on [PGMQ](https:
- [Batches](#batches)
- [Transactional outbox](#transactional-outbox)
- [Archive compaction](#archive-compaction)
- [Observability](#observability)
- [Error reporting](#error-reporting)
- [Structured logging](#structured-logging)
- [Queue health monitoring](#queue-health-monitoring)
- [Real-time broadcasts](#real-time-broadcasts-turbo-streams-replacement)
- [Operations](#operations)
- [CLI](#cli)
Expand Down Expand Up @@ -63,6 +68,10 @@ PostgreSQL-native job processing and event bus for Rails, built on [PGMQ](https:
- **Single active consumer** -- advisory-lock-based exclusive queue processing for strict ordering
- **Consumer priority** -- higher-priority workers get first dibs, lower-priority workers back off
- **Job uniqueness** -- prevent duplicate jobs with reaper-based crash recovery, no TTL-driven expiry
- **Retry backoff** -- exponential backoff with jitter for VT-based retries, per-job overrides
- **Error reporting** -- pluggable error reporters for APM integration (Appsignal, Sentry, etc.)
- **Structured logging** -- JSON log formatter with component extraction and thread-local context
- **Queue health** -- dead tuple monitoring, autovacuum tuning, Prometheus metrics

## Requirements

Expand Down Expand Up @@ -463,6 +472,34 @@ end

When a limit is hit, the worker drains its thread pool, exits, and the supervisor forks a fresh process. RSS memory is sampled from `/proc/self/statm` (Linux) or `ps -o rss` (macOS).

### Retry backoff

When a job fails, Pgbus extends the PGMQ visibility timeout with exponential backoff so retries are spread out instead of bunched at fixed intervals:

```ruby
Pgbus.configure do |config|
config.retry_backoff = 5 # base delay (seconds)
config.retry_backoff_max = 300 # cap at 5 minutes
config.retry_backoff_jitter = 0.15 # +-15% randomization
end
```

The delay formula is `base * 2^(attempt-1) * (1 + random_jitter)`. For a job that fails 4 times with defaults: ~5s, ~10s, ~20s, ~40s before hitting DLQ on the 5th read.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Clarify the jitter formula to match the description.

Line 483 describes jitter as "+-15% randomization" (bidirectional), but the formula on line 487 uses (1 + random_jitter), which would produce only positive jitter (1.0 to 1.15 multiplier).

For true ±15% randomization, the formula should be something like:

base * 2^(attempt-1) * uniform(1-jitter, 1+jitter)

This would give a multiplier range of 0.85 to 1.15 when jitter=0.15, matching the "+-15%" description.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@README.md` at line 487, The README's backoff formula currently shows "base *
2^(attempt-1) * (1 + random_jitter)" which yields only positive jitter; update
the formula to reflect true ±jitter by using a uniform multiplier range, e.g.,
"base * 2^(attempt-1) * uniform(1 - jitter, 1 + jitter)" (or equivalently "base
* 2^(attempt-1) * (1 + random_in_range(-jitter, jitter))") so the described
"±15% randomization" matches the documented formula and clarify the variable
name "jitter" used in the description.


Comment on lines +487 to +488
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Description: Verify the default max_retries value in configuration.rb

rg -n "max_retries.*=" lib/pgbus/configuration.rb -A 2 -B 2

Repository: mhenrixon/pgbus

Length of output: 896


🏁 Script executed:

#!/bin/bash
# Search for the DLQ routing logic that uses read_ct and max_retries
rg -n "read_ct.*max_retries|max_retries.*read_ct" lib/pgbus/event_bus/ -A 3 -B 3

Repository: mhenrixon/pgbus

Length of output: 41


🏁 Script executed:

#!/bin/bash
# Also search for where messages are moved to DLQ based on retry count
rg -n "dlq|dead.*letter" lib/pgbus/event_bus/ -i -A 2 -B 2

Repository: mhenrixon/pgbus

Length of output: 41


🏁 Script executed:

#!/bin/bash
# List the structure of event_bus directory
find lib/pgbus/event_bus -type f -name "*.rb" | head -20

Repository: mhenrixon/pgbus

Length of output: 188


🏁 Script executed:

#!/bin/bash
# Search for max_retries usage across the codebase
rg -n "max_retries" lib/pgbus/ --type rb -B 2 -A 2

Repository: mhenrixon/pgbus

Length of output: 85


🏁 Script executed:

#!/bin/bash
# Search for max_retries usage across the codebase
rg -n "max_retries" lib/pgbus/ -A 2 -B 2

Repository: mhenrixon/pgbus

Length of output: 3756


🏁 Script executed:

#!/bin/bash
# Search for read_ct which indicates message read count
rg -n "read_ct" lib/pgbus/ -A 2 -B 2

Repository: mhenrixon/pgbus

Length of output: 15281


Update the retry failure example to match the default max_retries configuration.

The README example states "For a job that fails 4 times...before hitting DLQ on the 5th read," but the code checks read_ct > max_retries (where max_retries = 5 by default). This means a message must fail 5 times before being routed to DLQ on the 6th read.

Update the example to: "For a job that fails 5 times with defaults: ~5s, ~10s, ~20s, ~40s, ~80s before hitting DLQ on the 6th read."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@README.md` around lines 487 - 488, Update the README retry example to reflect
the actual default max_retries logic: the code checks read_ct > max_retries with
max_retries = 5, so a message must fail 5 times and will hit the DLQ on the 6th
read; change the sentence to: "For a job that fails 5 times with defaults: ~5s,
~10s, ~20s, ~40s, ~80s before hitting DLQ on the 6th read." Reference the README
line describing the delay formula and the variables max_retries and read_ct when
making this edit.

Jobs can override the global settings per-class:

```ruby
class FragileApiJob < ApplicationJob
include Pgbus::RetryBackoff::JobMixin

pgbus_retry_backoff base: 10, max: 600, jitter: 0.2

def perform(...)
# ...
end
end
```

### Async execution mode (fibers)

Workers can optionally execute jobs as fibers instead of threads. This is ideal for I/O-bound workloads (HTTP calls, email delivery, LLM API calls) where jobs spend most of their time waiting on network I/O.
Expand Down Expand Up @@ -662,6 +699,96 @@ as configuration. The dispatcher runs archive compaction as part of its
maintenance loop, deleting archived messages older than `archive_retention`
in batches to avoid long-running transactions.

## Observability

Error reporting, structured logging, and queue health monitoring.

### Error reporting

By default, Pgbus logs caught exceptions and continues. To route them to your APM service (Appsignal, Sentry, Honeybadger, etc.), push callable reporters onto `config.error_reporters`:

```ruby
Pgbus.configure do |c|
c.error_reporters << ->(ex, ctx) {
Appsignal.set_error(ex) { |t| t.set_tags(ctx) }
}
end
```

Each reporter receives `(exception, context_hash)`. The context hash includes keys like `action`, `queue`, `job_class`, and `msg_id` depending on the call site. Reporters that accept a third argument also receive the Pgbus configuration object.

Reporters are wired into all critical rescue paths: job execution failures, worker fetch/process errors, dispatcher maintenance, supervisor fork failures, circuit breaker trips, outbox publish errors, and failed event recording. Non-critical paths (dashboard queries, stat recording) remain log-only.

`ErrorReporter.report` is guaranteed to never raise — if a reporter or the logger itself throws, the error is swallowed silently. This preserves fault-tolerance invariants at every rescue site.

### Structured logging

Pgbus ships two log formatters inspired by Sidekiq's `Logger::Formatters`:

```ruby
Pgbus.configure do |c|
c.log_format = :json # or :text (default)
end
```

**Text format** (default):

```text
INFO 2025-01-15T10:30:00.000Z pid=1234 tid=abc queue=default: Starting job
```

**JSON format**:

```json
{"ts":"2025-01-15T10:30:00.000Z","pid":1234,"tid":"abc","lvl":"INFO","component":"Pgbus","msg":"Starting job","ctx":{"queue":"default"}}
```

The JSON formatter extracts `[Pgbus]` and `[Pgbus::Web]` prefixes from log messages into a separate `component` field so the `msg` field stays clean for log aggregators. Thread-local context can be added via `Pgbus::LogFormatter.with_context(queue: "default") { ... }` and appears under the `ctx` key.

You can also set a formatter directly on the logger:

```ruby
Pgbus.configure do |c|
c.logger.formatter = Pgbus::LogFormatter::JSON.new
end
```

### Queue health monitoring

The dashboard includes a **Queue Health** panel showing PostgreSQL vacuum stats per PGMQ table: dead tuple counts, live tuple counts, bloat ratio (dead / total), last vacuum age, and MVCC horizon age. The same stats appear on individual queue detail pages.

#### Autovacuum tuning

PGMQ queue tables have high insert/delete churn that overwhelms PostgreSQL's default autovacuum settings. Pgbus applies aggressive per-table tuning automatically:

- **New queues at runtime**: `Client#ensure_single_queue` applies tuning after `pgmq.create()`
- **Existing installations**: `rails generate pgbus:update` detects untuned tables
- **Fresh installs**: The install migration includes tuning for the default queue

To apply tuning manually or after `db:schema:load` (which loses `ALTER TABLE` settings):

```bash
rails generate pgbus:tune_autovacuum # Generate migration
rails generate pgbus:tune_autovacuum --database=pgbus # For separate database
```

The `pgbus:tune_autovacuum` rake task also hooks into `db:schema:load` automatically.

#### Prometheus metrics

When `config.metrics_enabled = true` (default), the dashboard exposes Prometheus-compatible gauges:

| Metric | Description |
|--------|-------------|
| `pgbus_table_dead_tuples` | Dead tuple count per PGMQ table |
| `pgbus_table_live_tuples` | Live tuple count per PGMQ table |
| `pgbus_table_bloat_ratio` | Dead / (dead + live) per table |
| `pgbus_table_last_vacuum_age_seconds` | Seconds since last vacuum per table |
| `pgbus_oldest_transaction_age_seconds` | MVCC horizon pin risk |
| `pgbus_worker_pool_capacity` | Total worker thread slots |
| `pgbus_worker_pool_busy` | Currently busy worker threads |
| `pgbus_worker_pool_utilization` | Busy / capacity ratio |

## Real-time broadcasts (turbo-streams replacement)

Pgbus ships a drop-in replacement for turbo-rails' `turbo_stream_from` helper that fixes several well-known ActionCable correctness bugs by using PGMQ message IDs as a replay cursor. Same API as turbo-rails. No Redis. No ActionCable. No lost messages on reconnect.
Expand Down Expand Up @@ -751,6 +878,34 @@ One Puma worker (or Falcon reactor) hosts one `Pgbus::Web::Streamer::Instance` s

Per-stream retention is handled by the main pgbus dispatcher process on the same interval as the dispatcher's `ARCHIVE_COMPACTION_INTERVAL` constant. Streams default to a 5-minute retention because SSE clients reconnect within seconds; chat-style applications override the retention to days via `streams_retention`.

### Stream name helpers

Apps using UUID primary keys with turbo-rails-style dom IDs can hit PGMQ's 47-character queue-name ceiling (`"gid://app/Ai::Chat/9c14e8b2-...:messages"` exceeds the limit before the `pgbus_` prefix is even added). Pgbus provides helpers to generate short, collision-safe stream names:

```ruby
# In your ApplicationRecord
class ApplicationRecord < ActiveRecord::Base
primary_abstract_class
include Pgbus::Streams::Streamable
end
```

This gives every model `short_id` (16-hex SHA-256 prefix of the GlobalID) and `to_stream_key`:

```ruby
chat = Ai::Chat.find("9c14e8b2-...")
chat.short_id # => "ai_chat_a3f8c1e9d2b47610"
chat.to_stream_key # => "ai_chat_a3f8c1e9d2b47610"

# Compose multi-part stream names
Pgbus.stream_key(chat, :messages) # => "ai_chat_a3f8c1e9d2b47610_messages"

# Use in views
<%= pgbus_stream_from Pgbus.stream_key(@chat, :messages) %>
```

The budget is computed from `config.queue_prefix` at call time so prefix overrides adjust automatically. If a stream name exceeds the budget, `Pgbus::Streams::StreamNameTooLong` is raised immediately with the offending name, computed budget, and a pointer to `Pgbus.stream_key` — before PGMQ is ever touched.

### Transactional broadcasts

**This is the feature no other Rails real-time stack can offer.** A broadcast issued inside an open ActiveRecord transaction is deferred until the transaction commits. If it rolls back, the broadcast silently drops — clients never see the change that the database never persisted.
Expand Down Expand Up @@ -1001,6 +1156,8 @@ Pgbus uses these tables (created via PGMQ and migrations):
| `pgbus_outbox_entries` | Transactional outbox entries pending publication |
| `pgbus_recurring_tasks` | Recurring job definitions |
| `pgbus_recurring_executions` | Recurring job execution history |
| `pgbus_presence_members` | Stream presence tracking (who is subscribed) |
| `pgbus_stream_stats` | Stream broadcast/connect/disconnect metrics (opt-in) |

### Switching from another backend

Expand Down Expand Up @@ -1058,6 +1215,9 @@ PostgreSQL + PGMQ
| `polling_interval` | `0.1` | Seconds between polls (LISTEN/NOTIFY is primary) |
| `visibility_timeout` | `30` | Time before unacked message becomes visible again. Accepts seconds or `ActiveSupport::Duration` (e.g. `10.minutes`) |
| `max_retries` | `5` | Failed reads before routing to dead letter queue |
| `retry_backoff` | `5` | Base delay in seconds for VT-based retry backoff (exponential: `base * 2^(attempt-1)`) |
| `retry_backoff_max` | `300` | Maximum retry delay in seconds (caps the exponential curve) |
| `retry_backoff_jitter` | `0.15` | Jitter factor (0-1) added to retry delays to spread retries |
| `max_jobs_per_worker` | `nil` | Recycle worker after N jobs (nil = unlimited) |
| `max_memory_mb` | `nil` | Recycle worker when memory exceeds N MB |
| `max_worker_lifetime` | `nil` | Recycle worker after N seconds. Accepts seconds or Duration. |
Expand All @@ -1080,6 +1240,12 @@ PostgreSQL + PGMQ
| `web_live_updates` | `true` | Enable Turbo Frames auto-refresh on dashboard |
| `stats_enabled` | `true` | Record job execution stats for insights dashboard |
| `stats_retention` | `30.days` | How long to keep job stats. Accepts seconds, Duration, or `nil` to disable cleanup |
| `streams_stats_enabled` | `false` | Record stream broadcast/connect/disconnect stats (opt-in, can be high volume) |
| `streams_path` | `nil` | Custom URL path for the SSE endpoint (nil = auto-detected from engine mount) |
| `execution_mode` | `:threads` | Global execution mode (`:threads` or `:async`). Per-worker override via capsule config. |
| `error_reporters` | `[]` | Array of callables invoked on caught exceptions. Each receives `(exception, context_hash)`. |
| `log_format` | `:text` | Log formatter (`:text` or `:json`). Sets `logger.formatter` automatically. |
| `metrics_enabled` | `true` | Enable Prometheus-compatible metrics on the dashboard |

## Development

Expand Down