Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion lib/pgbus/active_job/executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ def parse_enqueue_latency_from_string(str)
end

def handle_failure(message, queue_name, error, payload: nil)
Pgbus.logger.error { "[Pgbus] Job failed: #{error.class}: #{error.message}" }
ctx = { action: "execute_job", queue: queue_name, job_class: payload&.dig("job_class"),
msg_id: message.msg_id.to_i, read_ct: message.read_ct.to_i }
ErrorReporter.report(error, ctx)
Pgbus.logger.debug { error.backtrace&.join("\n") }

# Record failure for dashboard visibility.
Expand Down
2 changes: 1 addition & 1 deletion lib/pgbus/circuit_breaker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def trip!(queue_name, failure_count)
@failure_counts.delete(queue_name)
invalidate_cache(queue_name)
rescue StandardError => e
Pgbus.logger.error { "[Pgbus] Circuit breaker trip failed for #{queue_name}: #{e.message}" }
ErrorReporter.report(e, { action: "circuit_breaker_trip", queue: queue_name })
Comment thread
mhenrixon marked this conversation as resolved.
end

def check_paused(queue_name)
Expand Down
22 changes: 22 additions & 0 deletions lib/pgbus/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ class Configuration

# Logging
attr_accessor :logger
attr_reader :log_format # rubocop:disable Style/AccessorGrouping

# Error reporting — array of callable objects invoked on caught exceptions.
# Each receives (exception, context_hash) or (exception, context_hash, config).
attr_accessor :error_reporters

# LISTEN/NOTIFY. Only the on/off switch is user-facing — the throttle
# interval is a Postgres-side tuning knob that lives as a constant on
Expand Down Expand Up @@ -140,6 +145,8 @@ def initialize
@allowed_global_id_models = nil # nil = allow all (for backwards compat)

@logger = (defined?(Rails) && Rails.respond_to?(:logger) && Rails.logger) || Logger.new($stdout)
@log_format = :text
@error_reporters = []

@listen_notify = true

Expand Down Expand Up @@ -224,6 +231,21 @@ def execution_mode_for(worker_config)
ExecutionPools.normalize_mode(mode)
end

VALID_LOG_FORMATS = %i[text json].freeze

def log_format=(format)
format = format.to_sym
unless VALID_LOG_FORMATS.include?(format)
raise ArgumentError, "Invalid log_format: #{format}. Must be one of: #{VALID_LOG_FORMATS.join(", ")}"
end

@log_format = format
@logger.formatter = case format
when :json then LogFormatter::JSON.new
when :text then LogFormatter::Text.new
end
end

VALID_PGMQ_SCHEMA_MODES = %i[auto extension embedded].freeze

def pgmq_schema_mode=(mode)
Expand Down
48 changes: 48 additions & 0 deletions lib/pgbus/error_reporter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# frozen_string_literal: true

module Pgbus
# Central error reporting module. Iterates all configured error reporters
# and logs the error. Inspired by Sidekiq's error_handlers pattern.
#
# Usage:
# Pgbus::ErrorReporter.report(exception, { queue: "default" })
#
# Configuration:
# Pgbus.configure do |c|
# c.error_reporters << ->(ex, ctx) { Appsignal.set_error(ex) { |t| t.set_tags(ctx) } }
# end
module ErrorReporter
module_function

def report(exception, context = {}, config: Pgbus.configuration)
log_error(exception, context, config: config)

config.error_reporters.each do |handler|
call_handler(handler, exception, context, config)
rescue Exception => e # rubocop:disable Lint/RescueException
config.logger.error { "[Pgbus] Error reporter raised: #{e.class}: #{e.message}" }
end
rescue Exception # rubocop:disable Lint/RescueException
# ErrorReporter must never raise — callers sit inside rescue blocks
# where an unexpected raise would break fault-tolerance invariants.
nil
end

def call_handler(handler, exception, context, config)
target = handler.is_a?(Proc) ? handler : handler.method(:call)
if target.arity == 3 || (target.arity.negative? && target.parameters.size >= 3)
handler.call(exception, context, config)
else
handler.call(exception, context)
end
end

def log_error(exception, context, config:)
config.logger.error do
msg = "[Pgbus] #{exception.class}: #{exception.message}"
msg += " (#{context.inspect})" unless context.empty?
msg
end
end
end
end
9 changes: 1 addition & 8 deletions lib/pgbus/failed_event_recorder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,7 @@ def record!(queue_name:, msg_id:, payload:, headers:, error:, retry_count:)
]
)
rescue StandardError => e
# ERROR-level: silent loss of failure-tracking data defeats the
# purpose of the dashboard's "Failed Jobs" section. If recording
# fails, surface it loudly so the broken state can be diagnosed
# rather than silently masked.
Pgbus.logger.error do
"[Pgbus] Failed to record failed event for queue=#{queue_name} msg_id=#{msg_id}: " \
"#{e.class}: #{e.message}"
end
ErrorReporter.report(e, { action: "record_failed_event", queue: queue_name, msg_id: msg_id })
end

def clear!(queue_name:, msg_id:)
Expand Down
96 changes: 96 additions & 0 deletions lib/pgbus/log_formatter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# frozen_string_literal: true

require "json"
require "logger"
require "time"

module Pgbus
# Log formatters for Pgbus, inspired by Sidekiq::Logger::Formatters.
#
# Usage:
# Pgbus.configure do |c|
# c.logger.formatter = Pgbus::LogFormatter::JSON.new
# end
#
# Or via the convenience config option:
# Pgbus.configure do |c|
# c.log_format = :json
# end
module LogFormatter
module_function

def tid
Thread.current[:pgbus_tid] ||= (Thread.current.object_id ^ ::Process.pid).to_s(36)
end

# Thread-local context for structured logging. Works like
# Sidekiq::Context — any key/value pairs set via with_context
# appear in the JSON output under the "ctx" key.
def with_context(hash)
orig = current_context.dup
current_context.merge!(hash)
yield
ensure
Thread.current[:pgbus_log_context] = orig
end

def current_context
Thread.current[:pgbus_log_context] ||= {}
end

# Human-readable text formatter with Pgbus context.
# Output: "INFO 2024-01-15T10:30:00.000Z pid=1234 tid=abc queue=default: message\n"
class Text < ::Logger::Formatter
def call(severity, time, _progname, message)
"#{severity} #{time.utc.iso8601(3)} pid=#{::Process.pid} tid=#{LogFormatter.tid}#{format_context}: #{message}\n"
end

private

def format_context
ctx = LogFormatter.current_context
return "" if ctx.empty?

" #{ctx.map { |k, v| "#{k}=#{v}" }.join(" ")}"
end
end

# JSON formatter for structured logging. Each log line is a single
# JSON object followed by a newline. Extracts the [Pgbus::Component]
# prefix from messages into a separate "component" field.
#
# Output fields:
# ts — ISO 8601 timestamp with milliseconds
# pid — process ID
# tid — thread ID (short hex)
# lvl — severity (DEBUG/INFO/WARN/ERROR/FATAL)
# msg — the log message (with component prefix stripped)
# component — extracted from [Pgbus] or [Pgbus::Foo] prefix (optional)
# ctx — thread-local context hash (optional, only when non-empty)
class JSON < ::Logger::Formatter
COMPONENT_PREFIX = /\A\[([^\]]+)\]\s*/

def call(severity, time, _progname, message)
msg = message.to_s
hash = {
ts: time.utc.iso8601(3),
pid: ::Process.pid,
tid: LogFormatter.tid,
lvl: severity
}

if (match = msg.match(COMPONENT_PREFIX))
hash[:component] = match[1]
msg = msg.sub(COMPONENT_PREFIX, "")
end

hash[:msg] = msg

ctx = LogFormatter.current_context
hash[:ctx] = ctx unless ctx.empty?

"#{::JSON.generate(hash)}\n"
end
end
end
end
8 changes: 4 additions & 4 deletions lib/pgbus/outbox/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def poll_and_publish
Pgbus.logger.debug { "[Pgbus] Outbox published #{published} entries" } if published.positive?
published
rescue StandardError => e
Pgbus.logger.error { "[Pgbus] Outbox poll error: #{e.message}" }
ErrorReporter.report(e, { action: "outbox_poll" })
0
end

Expand All @@ -92,7 +92,7 @@ def publish_single(entry)
entry.update!(published_at: Time.current)
true
rescue StandardError => e
Pgbus.logger.error { "[Pgbus] Failed to publish outbox entry #{entry.id}: #{e.message}" }
ErrorReporter.report(e, { action: "outbox_publish_topic", entry_id: entry.id })
false
end

Expand All @@ -112,7 +112,7 @@ def publish_queue_batch(entries)
group.each { |e| e.update!(published_at: now) }
succeeded += group.size
rescue StandardError => e
Pgbus.logger.error { "[Pgbus] Failed to batch-publish #{group.size} outbox entries: #{e.message}" }
ErrorReporter.report(e, { action: "outbox_batch_publish", queue: queue, batch_size: group.size })
# Fall back to individual publishing for this group
group.each { |entry| succeeded += 1 if publish_single_queue(entry) }
end
Expand All @@ -133,7 +133,7 @@ def publish_single_queue(entry)
entry.update!(published_at: Time.current)
true
rescue StandardError => e
Pgbus.logger.error { "[Pgbus] Failed to publish outbox entry #{entry.id}: #{e.message}" }
ErrorReporter.report(e, { action: "outbox_publish_queue", entry_id: entry.id })
false
end

Expand Down
2 changes: 1 addition & 1 deletion lib/pgbus/process/dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def run_if_due(now, ivar, interval)
yield
instance_variable_set(ivar, now)
rescue StandardError => e
Pgbus.logger.error { "[Pgbus] Dispatcher maintenance error: #{e.message}" }
ErrorReporter.report(e, { action: "dispatcher_maintenance", task: ivar.to_s.delete_prefix("@last_").delete_suffix("_at") })
end

def cleanup_processed_events
Expand Down
12 changes: 6 additions & 6 deletions lib/pgbus/process/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def fork_worker(worker_config)
@forks[pid] = { type: :worker, config: worker_config }
Pgbus.logger.info { "[Pgbus] Forked worker pid=#{pid} queues=#{queues.join(",")} mode=#{exec_mode}" }
rescue Errno::EAGAIN, Errno::ENOMEM => e
Pgbus.logger.error { "[Pgbus] Fork failed for worker: #{e.message}" }
ErrorReporter.report(e, { action: "fork_worker", queues: queues })
end

def fork_dispatcher
Expand All @@ -103,7 +103,7 @@ def fork_dispatcher
@forks[pid] = { type: :dispatcher }
Pgbus.logger.info { "[Pgbus] Forked dispatcher pid=#{pid}" }
rescue Errno::EAGAIN, Errno::ENOMEM => e
Pgbus.logger.error { "[Pgbus] Fork failed for dispatcher: #{e.message}" }
ErrorReporter.report(e, { action: "fork_dispatcher" })
end

def boot_scheduler
Expand Down Expand Up @@ -132,7 +132,7 @@ def fork_scheduler
@forks[pid] = { type: :scheduler }
Pgbus.logger.info { "[Pgbus] Forked scheduler pid=#{pid}" }
rescue Errno::EAGAIN, Errno::ENOMEM => e
Pgbus.logger.error { "[Pgbus] Fork failed for scheduler: #{e.message}" }
ErrorReporter.report(e, { action: "fork_scheduler" })
end

def recurring_tasks_configured?
Expand Down Expand Up @@ -186,7 +186,7 @@ def fork_consumer(consumer_config)
@forks[pid] = { type: :consumer, config: consumer_config }
Pgbus.logger.info { "[Pgbus] Forked consumer pid=#{pid} topics=#{topics.join(",")}" }
rescue Errno::EAGAIN, Errno::ENOMEM => e
Pgbus.logger.error { "[Pgbus] Fork failed for consumer: #{e.message}" }
ErrorReporter.report(e, { action: "fork_consumer", topics: topics })
end

def boot_outbox_poller
Expand All @@ -212,7 +212,7 @@ def fork_outbox_poller
@forks[pid] = { type: :outbox_poller }
Pgbus.logger.info { "[Pgbus] Forked outbox poller pid=#{pid}" }
rescue Errno::EAGAIN, Errno::ENOMEM => e
Pgbus.logger.error { "[Pgbus] Fork failed for outbox poller: #{e.message}" }
ErrorReporter.report(e, { action: "fork_outbox_poller" })
end

def monitor_loop
Expand Down Expand Up @@ -282,7 +282,7 @@ def setup_child_process
def bootstrap_queues
Pgbus.client.ensure_all_queues
rescue StandardError => e
Pgbus.logger.error { "[Pgbus] Failed to bootstrap queues: #{e.message}" }
ErrorReporter.report(e, { action: "bootstrap_queues" })
end

def load_rails_app
Expand Down
4 changes: 2 additions & 2 deletions lib/pgbus/process/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def fetch_messages(qty)
if undefined_queue_table_error?(e)
evict_missing_queues(e)
else
Pgbus.logger.error { "[Pgbus] Error fetching messages: #{e.message}" }
ErrorReporter.report(e, { action: "fetch_messages", queues: active_queues })
end
[]
end
Expand Down Expand Up @@ -223,7 +223,7 @@ def process_message(message, queue_name, source_queue: nil)
@jobs_failed.increment
@rate_counter.increment(:failed)
@circuit_breaker.record_failure(queue_name)
Pgbus.logger.error { "[Pgbus] Unhandled error processing message: #{e.message}" }
ErrorReporter.report(e, { action: "process_message", queue: queue_name })
ensure
@in_flight.decrement
end
Expand Down
27 changes: 27 additions & 0 deletions spec/pgbus/configuration_error_reporters_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# frozen_string_literal: true

require "spec_helper"

RSpec.describe Pgbus::Configuration do
subject(:config) { described_class.new }

describe "#error_reporters" do
it "defaults to an empty array" do
expect(config.error_reporters).to eq([])
end

it "accepts callable objects" do
reporter = ->(ex, ctx) { [ex, ctx] }
config.error_reporters << reporter

expect(config.error_reporters).to contain_exactly(reporter)
end

it "can be replaced entirely" do
reporter = ->(ex, ctx) { [ex, ctx] }
config.error_reporters = [reporter]

expect(config.error_reporters).to contain_exactly(reporter)
end
end
end
Loading