A high-performance Rust library for PostgreSQL logical and physical replication protocol parsing and streaming. This library provides a robust, type-safe interface for consuming PostgreSQL Write-Ahead Log (WAL) streams.
- Full Logical Replication Support: Implements PostgreSQL logical replication protocol versions 1-4
- Physical Replication Support: Stream raw WAL data for standby servers and PITR
- Streaming Transactions: Support for streaming large transactions (protocol v2+)
- Two-Phase Commit: Prepared transaction support (protocol v3+)
- Parallel Streaming: Multi-stream parallel replication (protocol v4+)
- Zero-Copy Operations: Efficient buffer management using the
bytescrate - Thread-Safe LSN Tracking: Atomic LSN feedback for producer-consumer patterns
- Connection Management: Built-in connection handling with exponential backoff retry logic
- Type-Safe API: Strongly typed message parsing with comprehensive error handling
Add this to your Cargo.toml:
[dependencies]
pg_walstream = "0.1.0"Make sure you have libpq development libraries installed:
Ubuntu/Debian:
sudo apt-get install libpq-dev \
clang \
libclang-dev CentOS/RHEL/Fedora:
sudo yum install postgresql-devel
# or
sudo dnf install postgresql-develThe Stream API provides an ergonomic, iterator-like interface:
use pg_walstream::{
LogicalReplicationStream, ReplicationStreamConfig, RetryConfig,
SharedLsnFeedback, CancellationToken,
};
use std::sync::Arc;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Configure the replication stream
let config = ReplicationStreamConfig::new(
"my_slot".to_string(), // Replication slot name
"my_publication".to_string(), // Publication name
2, // Protocol version
true, // Enable streaming
Duration::from_secs(10), // Feedback interval
Duration::from_secs(30), // Connection timeout
Duration::from_secs(60), // Health check interval
RetryConfig::default(), // Retry configuration
);
// Create and initialize the stream
let mut stream = LogicalReplicationStream::new(
"postgresql://postgres:password@localhost:5432/mydb?replication=database",
config,
).await?;
stream.start(None).await?;
// Create cancellation token for graceful shutdown
let cancel_token = CancellationToken::new();
// Convert to async Stream - provides iterator-like interface
let mut event_stream = stream.into_stream(cancel_token);
// Process events using Stream combinators
loop {
match event_stream.next().await {
Ok(event) => {
println!("Received event: {:?}", event);
// Update LSN feedback using the convenient method
event_stream.update_applied_lsn(event.lsn.value());
}
Err(e) if matches!(e, pg_walstream::ReplicationError::Cancelled(_)) => {
println!("Stream cancelled, shutting down gracefully");
break;
}
Err(e) => {
eprintln!("Error: {}", e);
break;
}
}
}
Ok(())
}For more control, you can use the traditional polling approach:
use pg_walstream::{
LogicalReplicationStream, ReplicationStreamConfig, RetryConfig,
SharedLsnFeedback, CancellationToken,
};
use std::sync::Arc;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = ReplicationStreamConfig::new(
"my_slot".to_string(),
"my_publication".to_string(),
2, true,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
);
let mut stream = LogicalReplicationStream::new(
"postgresql://postgres:password@localhost:5432/mydb?replication=database",
config,
).await?;
stream.start(None).await?;
let cancel_token = CancellationToken::new();
// Traditional polling loop with automatic retry
loop {
match stream.next_event_with_retry(&cancel_token).await {
Ok(event) => {
println!("Received event: {:?}", event);
stream.shared_lsn_feedback.update_applied_lsn(event.lsn.value());
}
Err(e) if matches!(e, pg_walstream::ReplicationError::Cancelled(_)) => {
println!("Cancelled, shutting down gracefully");
break;
}
Err(e) => {
eprintln!("Error: {}", e);
break;
}
}
}
Ok(())
}Thread-safe LSN tracking for feedback to PostgreSQL:
use pg_walstream::SharedLsnFeedback;
use std::sync::Arc;
let feedback = SharedLsnFeedback::new_shared();
// Producer thread: read LSN from feedback
let (flushed_lsn, applied_lsn) = feedback.get_feedback_lsn();
// Consumer thread: update LSN after processing
feedback.update_applied_lsn(commit_lsn);Before using this library, you need to configure PostgreSQL for replication:
Edit postgresql.conf:
wal_level = logical
max_replication_slots = 4
max_wal_senders = 4
Restart PostgreSQL after making these changes.
-- Create a publication for specific tables
CREATE PUBLICATION my_publication FOR TABLE users, orders;
-- Or publish all tables
CREATE PUBLICATION my_publication FOR ALL TABLES;-- Create a user with replication privileges
CREATE USER replication_user WITH REPLICATION PASSWORD 'secure_password';
-- Grant necessary permissions
GRANT SELECT ON ALL TABLES IN SCHEMA public TO replication_user;
GRANT USAGE ON SCHEMA public TO replication_user;The library provides two methods for creating replication slots:
temporary(bool): Create a temporary slot that is not saved to disk and is dropped on error or session end. Default:falsetwo_phase(bool): Enable two-phase commit support for logical slots. This allows the slot to receive prepared transaction events. Requires PostgreSQL 15+. Default:Nonereserve_wal(bool): Reserve WAL immediately for physical slots. Prevents WAL files from being removed before the slot is active. Default:Nonesnapshot(Option<String>): Control snapshot behavior for logical slots:"export"- Export the snapshot for use by other sessions"use"- Use an existing snapshot"nothing"- Don't export or use a snapshot- Default:
None
failover(bool): Enable the slot for failover synchronization. When enabled, the slot will be synchronized to standby servers for high availability. Requires PostgreSQL 16+. Default:None
The library supports all PostgreSQL logical replication message types:
- BEGIN: Transaction start
- COMMIT: Transaction commit
- ORIGIN: Replication origin
- RELATION: Table schema definition
- TYPE: Data type definition
- INSERT: Row insertion
- UPDATE: Row update
- DELETE: Row deletion
- TRUNCATE: Table truncation
- MESSAGE: Generic message
- STREAM_START: Streaming transaction start
- STREAM_STOP: Streaming transaction segment end
- STREAM_COMMIT: Streaming transaction commit
- STREAM_ABORT: Streaming transaction abort
- BEGIN_PREPARE: Prepared transaction start
- PREPARE: Transaction prepare
- COMMIT_PREPARED: Commit prepared transaction
- ROLLBACK_PREPARED: Rollback prepared transaction
- STREAM_PREPARE: Stream prepare message
The project includes 95 comprehensive unit tests covering:
- Protocol message parsing
- Buffer operations
- LSN tracking and thread safety
- Error handling
- Retry logic
- Type conversions
┌─────────────────────────────────────────┐
│ Application Layer │
│ (Your CDC / Replication Logic) │
└──────────────┬──────────────────────────┘
│
┌──────────────▼──────────────────────────┐
│ LogicalReplicationStream │
│ - Connection management │
│ - Event processing │
│ - LSN feedback │
└──────────────┬──────────────────────────┘
│
┌──────────────▼──────────────────────────┐
│ LogicalReplicationParser │
│ - Protocol parsing │
│ - Message deserialization │
└──────────────┬──────────────────────────┘
│
┌──────────────▼──────────────────────────┐
│ BufferReader / BufferWriter │
│ - Zero-copy operations │
│ - Binary protocol handling │
└─────────────────────────────────────────┘
- Zero-Copy: Uses
bytes::Bytesfor efficient buffer management - Atomic Operations: Thread-safe LSN tracking with minimal overhead
- Connection Pooling: Reusable connection with automatic retry
- Streaming Support: Handle large transactions without memory issues
- Efficient Blocking: Async I/O with tokio::select eliminates busy-waiting
- Requires PostgreSQL 14 or later for full protocol support
- Logical replication slot must be created before streaming
- Binary protocol only (no text-based protocol support)
- Requires
replicationpermission for the database user
- PostgreSQL Logical Replication Documentation
- Logical Replication Message Formats
- Replication Protocol
Contributions are welcome! Please feel free to submit a Pull Request.
Daniel Shih (dog830228@gmail.com)