Skip to content

Latest commit

 

History

History
243 lines (179 loc) · 6.53 KB

File metadata and controls

243 lines (179 loc) · 6.53 KB

PostgreSQL WAL Streaming Examples

This directory contains examples demonstrating how to use the pg_walstream library to stream PostgreSQL Write-Ahead Log (WAL) changes.

Each example is an independent binary project with its own Cargo.toml, so example-specific dependencies (e.g. futures, flate2, tokio-stream) do not pollute pg_walstream's [dev-dependencies].

Running an Example

cd examples/<project-name>
cargo run

Examples

1. basic-streaming

Demonstrates the high-level Stream API wrapped with futures::stream::unfold for Stream trait compatibility.

Features:

  • Async iterator-like interface with futures::Stream compatibility
  • Automatic event processing with retry logic
  • Comprehensive event type handling (Insert, Update, Delete, etc.)
  • Stream combinators support (filter, take_while, etc.)
  • Graceful shutdown with Ctrl+C

Run:

cd examples/basic-streaming
cargo run

2. rate-limited-streaming

Demonstrates rate limiting and flow control using futures::Stream combinators.

Features:

  • Rate limiting to prevent overwhelming downstream systems
  • Configurable events per second limit
  • Real-time rate statistics and monitoring
  • Backpressure handling with automatic throttling
  • Practical example of Stream trait usage
  • Protection for downstream APIs with rate limits

Run:

cd examples/rate-limited-streaming
cargo run

# Custom rate limit
MAX_EVENTS_PER_SECOND=50 cargo run

Use Cases:

  • Protecting downstream APIs from being overwhelmed
  • Complying with third-party service rate limits
  • Spreading load over time for cost optimization
  • Controlled batch processing
  • Testing with realistic production loads

3. polling

Shows the lower-level polling API for manual event retrieval.

Features:

  • Manual event polling with next_event()
  • More control over the polling loop
  • Suitable for custom integration scenarios

Run:

cd examples/polling
cargo run

4. safe-transaction-consumer

Advanced example demonstrating safe transaction processing with ordered commits.

Features:

  • Transaction buffering until commit
  • Ordered transaction application
  • LSN feedback only after successful application
  • Backpressure handling when too many transactions are buffered
  • Graceful shutdown with proper cleanup
  • Real-time statistics reporting
  • Transaction boundary enforcement

Run:

cd examples/safe-transaction-consumer
cargo run

Use Cases:

  • Building production-ready replication consumers
  • Ensuring data consistency across systems
  • Implementing exactly-once processing semantics
  • Safe CDC (Change Data Capture) pipelines

5. pg-basebackup

Complete implementation of pg_basebackup functionality for creating physical database backups.

Features:

  • Full physical backup of PostgreSQL cluster
  • Streaming backup data via replication protocol
  • Tar archive extraction and file writing
  • Progress reporting and verification
  • Backup manifest generation with checksums
  • Support for all BASE_BACKUP options (compression, WAL inclusion, etc.)
  • Production-ready error handling

Run:

cd examples/pg-basebackup
cargo run

# Or with custom backup directory:
BACKUP_DIR="/tmp/pg_basebackup" cargo run

What It Does:

  1. Connects to PostgreSQL in replication mode
  2. Initiates BASE_BACKUP with optimal settings
  3. Streams backup data as tar archives
  4. Extracts and writes files to ./backup/ directory
  5. Generates backup manifest for verification

Use Cases:

  • Setting up physical replication standby servers
  • Point-in-time recovery (PITR) setup
  • Disaster recovery preparation
  • Database migrations and cloning
  • Automated backup solutions

6. arbitrary-fuzzing

Demonstrates generating arbitrary pg_walstream types for fuzzing without modifying the library.

Run:

cd examples/arbitrary-fuzzing
cargo run

7. typed-deserialization

Demonstrates deserializing WAL event data directly into user-defined Rust structs using RowData::deserialize_into().

  • Automatic text-to-type coercion (PostgreSQL sends all values as text)
  • NULL handling with Option<T> fields
  • #[serde(rename)] for column-to-field name mapping
  • #[serde(default)] for missing/evolving columns
  • Enum deserialization from text columns
  • ChangeEvent convenience methods (deserialize_insert, deserialize_update, etc.)
  • Error handling for type mismatches and NULL violations

Run:

cd examples/typed-deserialization
cargo run

No database required — this example uses in-memory RowData and ChangeEvent objects to demonstrate the deserialization API.

Prerequisites

For Logical Replication Examples (basic_streaming, polling_example, safe_transaction_consumer)

1. PostgreSQL Configuration

Ensure your PostgreSQL server is configured for logical replication. Edit postgresql.conf:

wal_level = logical
max_replication_slots = 4
max_wal_senders = 4

Restart PostgreSQL after making changes.

2. Create a Publication

CREATE PUBLICATION my_publication FOR ALL TABLES;

3. Set Environment Variable

export DATABASE_URL="postgresql://postgres:password@localhost:5432/postgres?replication=database"

Important: The connection string must include ?replication=database parameter.

For Physical Replication Example (pg_basebackup)

1. PostgreSQL Configuration

For base backups and physical replication. Edit postgresql.conf:

wal_level = replica          # or 'logical' (logical includes replica)
max_wal_senders = 10
max_replication_slots = 10

2. Authentication Setup

Edit pg_hba.conf to allow replication connections:

# TYPE  DATABASE        USER            ADDRESS                 METHOD
host    replication     postgres        127.0.0.1/32            trust
# or for password authentication:
host    replication     postgres        127.0.0.1/32            md5

3. Connection String

export DATABASE_URL="postgresql://postgres:password@localhost:5432/postgres?replication=database"

Note: Physical replication requires the replication=database parameter and appropriate permissions.

Testing

Start an example:

cargo run --example basic_streaming

In another terminal, make database changes:

CREATE TABLE test_users (id SERIAL PRIMARY KEY, name TEXT, email TEXT);
INSERT INTO test_users (name, email) VALUES ('Alice', 'alice@example.com');
UPDATE test_users SET email = 'alice.new@example.com' WHERE name = 'Alice';
DELETE FROM test_users WHERE name = 'Alice';

You should see the changes appearing in the example output.