Documentation
¶
Overview ¶
Package pubsub provides a production-ready Pub/Sub library and standalone service for Go with reliable message delivery, retry logic, and Dead Letter Queue (DLQ) support.
Works both as a library for embedding in your application AND as a standalone microservice with REST API.
Features ¶
- Reliable Message Delivery with guaranteed delivery and exponential backoff retry
- Exponential Backoff: 30s → 1m → 2m → 4m → 8m → 16m → 30m (max)
- Dead Letter Queue (DLQ) automatically handles failed messages after 5 attempts
- DLQ Statistics for tracking failure reasons and resolution metrics
- Domain-Driven Design with rich domain models containing business logic
- Repository Pattern for clean data access abstraction
- Options Pattern for modern Go API design (2025 best practices)
- Pluggable architecture: bring your own Logger, Notification system
- Multi-Database Support: MySQL, PostgreSQL, SQLite via Relica adapters
- Zero Dependencies: uses Relica query builder (no ORM bloat)
- Embedded Migrations for easy database setup
- Docker Ready: production Dockerfile with multi-stage builds
- Cloud Native: 12-factor app, ENV config, health checks
- Battle-tested in FreiCON Railway Management System
Quick Start ¶
Option 1: As Embedded Library ¶
First, apply the database migrations:
import (
"database/sql"
"github.com/coregx/pubsub"
"github.com/coregx/pubsub/adapters/relica"
"github.com/coregx/pubsub/migrations"
_ "github.com/go-sql-driver/mysql"
)
// Connect to database
db, _ := sql.Open("mysql", "user:pass@tcp(localhost:3306)/pubsub?parseTime=true")
// Apply embedded migrations
if err := migrations.ApplyAll(db); err != nil {
log.Fatal(err)
}
Use production-ready Relica adapters:
// Create all repositories at once
repos := relica.NewRepositories(db, "mysql")
// Create services with Options Pattern
publisher, _ := pubsub.NewPublisher(
pubsub.WithPublisherRepositories(
repos.Message, repos.Queue, repos.Subscription, repos.Topic,
),
pubsub.WithPublisherLogger(logger),
)
// Create worker
worker, _ := pubsub.NewQueueWorker(
pubsub.WithRepositories(repos.Queue, repos.Message, repos.Subscription, repos.DLQ),
pubsub.WithDelivery(transmitterProvider, gateway),
pubsub.WithLogger(logger),
)
// Run worker (processes queue every 30 seconds)
ctx := context.Background()
worker.Run(ctx, 30*time.Second)
Publish a message:
result, err := publisher.Publish(ctx, pubsub.PublishRequest{
TopicCode: "user.signup",
Identifier: "user-123",
Data: `{"userId": 123, "email": "user@example.com"}`,
})
Option 2: As Standalone Service ¶
Run the standalone PubSub server with Docker:
cd cmd/pubsub-server docker-compose up -d
Access REST API at http://localhost:8080:
# Publish message
curl -X POST http://localhost:8080/api/v1/publish \
-H "Content-Type: application/json" \
-d '{"topicCode":"user.signup","identifier":"user-123","data":{"userId":123}}'
# Health check
curl http://localhost:8080/api/v1/health
See cmd/pubsub-server/README.md for full API documentation
Architecture ¶
The library follows Clean Architecture and Domain-Driven Design principles:
┌─────────────────────────────────────┐
│ Application Layer │
│ (Publisher, SubscriptionManager, │
│ QueueWorker, REST API) │
└─────────────┬───────────────────────┘
│
┌─────────────▼───────────────────────┐
│ Domain Layer │
│ (Rich models with business logic) │
└─────────────┬───────────────────────┘
│
┌─────────────▼───────────────────────┐
│ Relica Adapters │
│ (Production-ready implementations) │
└─────────────┬───────────────────────┘
│
┌─────────────▼───────────────────────┐
│ Database (MySQL/PostgreSQL/ │
│ SQLite) │
└─────────────────────────────────────┘
Key principles:
- Domain models contain business logic (Queue.MarkFailed, Queue.ShouldRetry, etc.)
- Repository Pattern abstracts database operations
- Dependency Inversion via interfaces (Logger, Notification, Repositories)
- Options Pattern for service configuration (2025 best practices)
- Relica adapters provide zero-dependency database access
Message Flow ¶
PUBLISH Publisher → Topic → Create Message → Find Active Subscriptions → Create Queue Items (one per subscription)
WORKER (Background) QueueWorker → Find Pending/Retryable Items (batch) → Deliver to Subscribers (via webhooks/gateway) → On Success: Mark as SENT → On Failure: Retry with exponential backoff → After 5 failures: Move to DLQ
DLQ (Dead Letter Queue) Failed items → Manual review → Resolve or Delete
Retry Strategy ¶
Failed message deliveries are automatically retried with exponential backoff:
Attempt 1: Immediate Attempt 2: +30 seconds Attempt 3: +1 minute Attempt 4: +2 minutes Attempt 5: +4 minutes Attempt 6: +8 minutes (moves to DLQ after this)
After 5 failed attempts, messages are automatically moved to the Dead Letter Queue (DLQ) for manual inspection and resolution.
Database Schema ¶
The library requires 7 database tables (created via embedded migrations):
pubsub_topic - Topics for pub/sub messaging pubsub_publisher - Publisher configurations pubsub_subscriber - Subscriber configurations with webhook URLs pubsub_subscription - Subscription mappings (subscriber + topic) pubsub_message - Published messages pubsub_queue - Delivery queue with retry state pubsub_dlq - Dead Letter Queue for failed messages
Supports MySQL, PostgreSQL, and SQLite via Relica adapters. Table prefix can be customized (default: "pubsub_").
Examples ¶
See the examples/ directory for complete working examples including:
- Basic usage with database/sql
- Custom logger integration
- Repository implementations
For detailed documentation, see README.md and pkg.go.dev.
Index ¶
- Constants
- Variables
- func IsNoData(err error) bool
- type DLQRepository
- type Error
- type Filter
- type Logger
- type LoggingNotificationService
- func (n *LoggingNotificationService) NotifyDLQItemAdded(_ context.Context, dlq model.DeadLetterQueue) error
- func (n *LoggingNotificationService) NotifyDeliveryFailure(_ context.Context, queue *model.Queue, err error) error
- func (n *LoggingNotificationService) NotifySubscriptionCreated(_ context.Context, subscription model.Subscription) error
- func (n *LoggingNotificationService) NotifySubscriptionDeactivated(_ context.Context, subscription model.Subscription) error
- type MessageDeliveryGateway
- type MessageRepository
- type NoOpNotificationService
- func (n *NoOpNotificationService) NotifyDLQItemAdded(_ context.Context, _ model.DeadLetterQueue) error
- func (n *NoOpNotificationService) NotifyDeliveryFailure(_ context.Context, _ *model.Queue, _ error) error
- func (n *NoOpNotificationService) NotifySubscriptionCreated(_ context.Context, _ model.Subscription) error
- func (n *NoOpNotificationService) NotifySubscriptionDeactivated(_ context.Context, _ model.Subscription) error
- type NoopLogger
- type NotificationService
- type Option
- func WithBatchSize(size int) Option
- func WithDelivery(transmitterProvider TransmitterProvider, gateway MessageDeliveryGateway) Option
- func WithLogger(logger Logger) Option
- func WithNotifications(service NotificationService) Option
- func WithRepositories(queueRepo QueueRepository, messageRepo MessageRepository, ...) Option
- func WithRetryStrategy(strategy retry.Strategy) Option
- type PublishRequest
- type PublishResult
- type Publisher
- type PublisherOption
- type PublisherRepository
- type QueueRepository
- type QueueWorker
- func (w *QueueWorker) CleanupExpiredItems(ctx context.Context) (int, error)
- func (w *QueueWorker) GetDLQStats(ctx context.Context) (model.DLQStats, error)
- func (w *QueueWorker) GetRetrySchedule() string
- func (w *QueueWorker) ProcessPendingItems(ctx context.Context) (int, error)
- func (w *QueueWorker) ProcessRetryableItems(ctx context.Context) (int, error)
- func (w *QueueWorker) Run(ctx context.Context, interval time.Duration)
- type SubscribeRequest
- type SubscriberRepository
- type SubscriptionManager
- func (sm *SubscriptionManager) GetSubscription(ctx context.Context, subscriptionID int64) (*model.Subscription, error)
- func (sm *SubscriptionManager) ListSubscriptions(ctx context.Context, subscriberID int64, identifier string) ([]model.Subscription, error)
- func (sm *SubscriptionManager) ReactivateSubscription(ctx context.Context, subscriptionID int64) (*model.Subscription, error)
- func (sm *SubscriptionManager) Subscribe(ctx context.Context, req SubscribeRequest) (*model.Subscription, error)
- func (sm *SubscriptionManager) Unsubscribe(ctx context.Context, subscriptionID int64) (*model.Subscription, error)
- type SubscriptionManagerOption
- type SubscriptionRepository
- type TopicRepository
- type TransmitterProvider
Constants ¶
const ( // ErrCodeNoData indicates no data was found. ErrCodeNoData = "NO_DATA" // ErrCodeValidation indicates validation failed. ErrCodeValidation = "VALIDATION_ERROR" // ErrCodeConfiguration indicates invalid configuration. ErrCodeConfiguration = "CONFIGURATION_ERROR" // ErrCodeDatabase indicates database operation failed. ErrCodeDatabase = "DATABASE_ERROR" // ErrCodeDelivery indicates message delivery failed. ErrCodeDelivery = "DELIVERY_ERROR" )
Error codes for pubsub operations.
Variables ¶
var ( // ErrNoData is returned when a query returns no results. // This is not necessarily an error condition in all cases. ErrNoData = &Error{ Code: ErrCodeNoData, Message: "no data found", } // ErrInvalidConfiguration is returned when worker configuration is invalid. ErrInvalidConfiguration = &Error{ Code: ErrCodeConfiguration, Message: "invalid worker configuration", } )
Common errors.
var MigrationFiles embed.FS
MigrationFiles contains all SQL migration files embedded in the binary. Users can access these files programmatically to apply migrations using their preferred migration tool (goose, golang-migrate, atlas, etc.)
Example with goose:
import (
"github.com/pressly/goose/v3"
pubsub "github.com/coregx/pubsub"
)
goose.SetBaseFS(pubsub.MigrationFiles)
if err := goose.Up(db, "migrations"); err != nil {
log.Fatal(err)
}
Example with golang-migrate:
import (
"github.com/golang-migrate/migrate/v4"
_ "github.com/golang-migrate/migrate/v4/database/mysql"
"github.com/golang-migrate/migrate/v4/source/iofs"
pubsub "github.com/coregx/pubsub"
)
source, err := iofs.New(pubsub.MigrationFiles, "migrations")
m, err := migrate.NewWithSourceInstance("iofs", source, "mysql://user:pass@tcp(host:port)/db")
m.Up()
Functions ¶
Types ¶
type DLQRepository ¶
type DLQRepository interface {
// Load retrieves a DLQ item by ID.
// Returns ErrNoData if not found.
Load(ctx context.Context, id int64) (model.DeadLetterQueue, error)
// Save creates a new DLQ item (if Id=0) or updates an existing one.
// Returns the saved DLQ item with populated Id.
Save(ctx context.Context, m model.DeadLetterQueue) (model.DeadLetterQueue, error)
// Delete permanently removes a DLQ item from storage.
// Should only be used after successful resolution or manual cleanup.
Delete(ctx context.Context, m model.DeadLetterQueue) error
// FindBySubscription retrieves DLQ items for a specific subscription.
// Results are ordered by created_at DESC (newest first).
FindBySubscription(ctx context.Context, subscriptionID int64, limit int) ([]model.DeadLetterQueue, error)
// FindUnresolved retrieves unresolved DLQ items.
// Results are ordered by created_at ASC (oldest first).
FindUnresolved(ctx context.Context, limit int) ([]model.DeadLetterQueue, error)
// FindOlderThan retrieves DLQ items older than the specified threshold.
// Useful for identifying stuck items requiring attention.
FindOlderThan(ctx context.Context, threshold time.Duration, limit int) ([]model.DeadLetterQueue, error)
// FindByMessageID retrieves a DLQ item for a specific message.
// Returns ErrNoData if not found.
FindByMessageID(ctx context.Context, messageID int64) (model.DeadLetterQueue, error)
// GetStats retrieves DLQ statistics including total count, unresolved count,
// resolution rate, and average age.
GetStats(ctx context.Context) (model.DLQStats, error)
// CountUnresolved returns the count of unresolved DLQ items.
// Useful for dashboard widgets and monitoring.
CountUnresolved(ctx context.Context) (int, error)
}
DLQRepository defines the persistence interface for the Dead Letter Queue. The DLQ stores messages that failed delivery after all retry attempts.
type Error ¶
type Error struct {
// Code is a machine-readable error code
Code string
// Message is a human-readable error message
Message string
// Err is the underlying error (if any)
Err error
}
Error represents a pubsub library error with categorization.
func NewErrorWithCause ¶
NewErrorWithCause creates a new Error wrapping an underlying error.
type Filter ¶
type Filter struct {
SubscriberID int // Filter by subscriber ID (0 = no filter)
CbuID int // Filter by CBU ID (0 = no filter)
TopicID string // Filter by topic ID (empty = no filter)
IsActive bool // Filter by active status
}
Filter represents query filtering options for subscriptions. Used by SubscriptionRepository.List to filter results.
type Logger ¶
type Logger interface {
// Debugf logs debug-level messages with printf-style formatting.
Debugf(format string, args ...interface{})
// Infof logs info-level messages with printf-style formatting.
Infof(format string, args ...interface{})
// Warnf logs warning-level messages with printf-style formatting.
Warnf(format string, args ...interface{})
// Errorf logs error-level messages with printf-style formatting.
Errorf(format string, args ...interface{})
// Info logs info-level messages without formatting.
Info(message string)
}
Logger defines the logging interface required by the PubSub library. Implement this interface to integrate your logging system (zap, logrus, etc.).
Example implementation:
type ZapLogger struct {
logger *zap.Logger
}
func (l *ZapLogger) Infof(format string, args ...interface{}) {
l.logger.Sugar().Infof(format, args...)
}
type LoggingNotificationService ¶
type LoggingNotificationService struct {
// contains filtered or unexported fields
}
LoggingNotificationService is a simple implementation that logs notifications.
func NewLoggingNotificationService ¶
func NewLoggingNotificationService(logger Logger) *LoggingNotificationService
NewLoggingNotificationService creates a new LoggingNotificationService.
func (*LoggingNotificationService) NotifyDLQItemAdded ¶
func (n *LoggingNotificationService) NotifyDLQItemAdded(_ context.Context, dlq model.DeadLetterQueue) error
NotifyDLQItemAdded logs DLQ item addition.
func (*LoggingNotificationService) NotifyDeliveryFailure ¶
func (n *LoggingNotificationService) NotifyDeliveryFailure(_ context.Context, queue *model.Queue, err error) error
NotifyDeliveryFailure logs delivery failure.
func (*LoggingNotificationService) NotifySubscriptionCreated ¶
func (n *LoggingNotificationService) NotifySubscriptionCreated(_ context.Context, subscription model.Subscription) error
NotifySubscriptionCreated logs subscription creation.
func (*LoggingNotificationService) NotifySubscriptionDeactivated ¶
func (n *LoggingNotificationService) NotifySubscriptionDeactivated(_ context.Context, subscription model.Subscription) error
NotifySubscriptionDeactivated logs subscription deactivation.
type MessageDeliveryGateway ¶
type MessageDeliveryGateway interface {
// DeliverMessage sends a message to the subscriber's webhook endpoint.
// Returns error if delivery fails (network error, non-2xx response, timeout).
DeliverMessage(ctx context.Context, callbackURL string, message *model.DataMessage) error
}
MessageDeliveryGateway defines the interface for delivering messages to subscriber webhooks. This interface avoids circular dependency with the transmitter package while enabling flexible delivery implementations (HTTP webhooks, gRPC, message queues, etc.).
Implementations should handle HTTP transport, retries at the transport level, and return errors for failed deliveries to trigger the retry mechanism.
type MessageRepository ¶
type MessageRepository interface {
// Load retrieves a message by ID.
// Returns ErrNoData if not found.
Load(ctx context.Context, id int64) (model.Message, error)
// Save creates a new message (if Id=0) or updates an existing one.
// Returns the saved message with populated Id.
Save(ctx context.Context, m model.Message) (model.Message, error)
// Delete permanently removes a message from storage.
// Should only be used for cleanup, not during normal operation.
Delete(ctx context.Context, m model.Message) error
// FindOutdatedMessages finds messages older than the specified number of days.
// Used for cleanup/archival operations.
FindOutdatedMessages(ctx context.Context, days int) ([]model.Message, error)
}
MessageRepository defines the persistence interface for published messages. Messages are immutable once created and represent the actual message payloads.
type NoOpNotificationService ¶
type NoOpNotificationService struct{}
NoOpNotificationService is a no-op implementation of NotificationService. Use this when notifications are not needed.
func (*NoOpNotificationService) NotifyDLQItemAdded ¶
func (n *NoOpNotificationService) NotifyDLQItemAdded(_ context.Context, _ model.DeadLetterQueue) error
NotifyDLQItemAdded does nothing.
func (*NoOpNotificationService) NotifyDeliveryFailure ¶
func (n *NoOpNotificationService) NotifyDeliveryFailure(_ context.Context, _ *model.Queue, _ error) error
NotifyDeliveryFailure does nothing.
func (*NoOpNotificationService) NotifySubscriptionCreated ¶
func (n *NoOpNotificationService) NotifySubscriptionCreated(_ context.Context, _ model.Subscription) error
NotifySubscriptionCreated does nothing.
func (*NoOpNotificationService) NotifySubscriptionDeactivated ¶
func (n *NoOpNotificationService) NotifySubscriptionDeactivated(_ context.Context, _ model.Subscription) error
NotifySubscriptionDeactivated does nothing.
type NoopLogger ¶
type NoopLogger struct{}
NoopLogger is a no-operation logger implementation useful for testing or when logging is not desired. All methods are no-ops.
func (*NoopLogger) Debugf ¶
func (l *NoopLogger) Debugf(_ string, _ ...interface{})
Debugf implements Logger.Debugf as a no-op.
func (*NoopLogger) Errorf ¶
func (l *NoopLogger) Errorf(_ string, _ ...interface{})
Errorf implements Logger.Errorf as a no-op.
func (*NoopLogger) Info ¶
func (l *NoopLogger) Info(_ string)
Info implements Logger.Info as a no-op.
func (*NoopLogger) Infof ¶
func (l *NoopLogger) Infof(_ string, _ ...interface{})
Infof implements Logger.Infof as a no-op.
func (*NoopLogger) Warnf ¶
func (l *NoopLogger) Warnf(_ string, _ ...interface{})
Warnf implements Logger.Warnf as a no-op.
type NotificationService ¶
type NotificationService interface {
// NotifyDLQItemAdded is called when a message is moved to the Dead Letter Queue.
// This indicates a message failed after all retry attempts.
NotifyDLQItemAdded(ctx context.Context, dlq model.DeadLetterQueue) error
// NotifyDeliveryFailure is called when a message delivery fails.
// This is informational and happens before moving to DLQ.
NotifyDeliveryFailure(ctx context.Context, queue *model.Queue, err error) error
// NotifySubscriptionCreated is called when a new subscription is created.
NotifySubscriptionCreated(ctx context.Context, subscription model.Subscription) error
// NotifySubscriptionDeactivated is called when a subscription is deactivated.
NotifySubscriptionDeactivated(ctx context.Context, subscription model.Subscription) error
}
NotificationService defines an optional interface for sending notifications about pub/sub system events (failures, DLQ items, etc.).
Implementations might send emails, Slack messages, SMS, or log to monitoring systems.
type Option ¶
type Option func(*QueueWorker) error
Option is a function that configures a QueueWorker. Used with the Options Pattern (2025 Go best practice) for flexible service construction.
Example:
worker, err := pubsub.NewQueueWorker(
pubsub.WithRepositories(queueRepo, msgRepo, subRepo, dlqRepo),
pubsub.WithDelivery(transmitterProvider, gateway),
pubsub.WithLogger(logger),
pubsub.WithBatchSize(200), // optional
)
func WithBatchSize ¶
WithBatchSize sets the number of queue items to process per batch. This is an optional configuration - default is 100 items per batch.
Must be > 0. Larger batches improve throughput but use more memory. Smaller batches reduce latency and memory usage.
Recommended values:
- Low volume: 50-100
- Medium volume: 100-500
- High volume: 500-1000
func WithDelivery ¶
func WithDelivery( transmitterProvider TransmitterProvider, gateway MessageDeliveryGateway, ) Option
WithDelivery sets the message delivery dependencies for the queue worker. Both provider and gateway are required and must not be nil.
This is a required option for NewQueueWorker.
Parameters:
- transmitterProvider: Resolves subscriber webhook URLs
- gateway: Handles actual HTTP/gRPC message delivery
func WithLogger ¶
WithLogger sets the logger instance for the queue worker. Logger is required and must not be nil.
This is a required option for NewQueueWorker.
Use NoopLogger for silent operation or implement Logger interface to integrate with your logging system (zap, logrus, etc.).
func WithNotifications ¶
func WithNotifications(service NotificationService) Option
WithNotifications sets an optional notification service for the queue worker. This is an optional configuration - if not provided, NoOpNotificationService will be used (no notifications).
The notification service receives callbacks for:
- Delivery failures (every failed attempt)
- DLQ item additions (when message exhausts retries)
Use this to integrate with alerting systems (email, Slack, PagerDuty, etc.).
func WithRepositories ¶
func WithRepositories( queueRepo QueueRepository, messageRepo MessageRepository, subscriptionRepo SubscriptionRepository, dlqRepo DLQRepository, ) Option
WithRepositories sets the required repository dependencies for the queue worker. All four repositories are required and must not be nil.
This is a required option for NewQueueWorker.
Parameters:
- queueRepo: Queue item persistence
- messageRepo: Message persistence
- subscriptionRepo: Subscription persistence
- dlqRepo: Dead Letter Queue persistence
func WithRetryStrategy ¶
WithRetryStrategy sets a custom retry strategy for the queue worker. This is an optional configuration - if not provided, retry.DefaultStrategy() will be used.
The default strategy implements exponential backoff: 30s → 1m → 2m → 4m → 8m → 16m → 30m (max).
Use this option to customize:
- Retry delays (backoff schedule)
- Maximum retry attempts before DLQ
- DLQ threshold
type PublishRequest ¶
type PublishRequest struct {
TopicCode string // Topic code to publish to
Identifier string // Message identifier (event type)
Data string // Message payload
}
PublishRequest represents a request to publish a message.
type PublishResult ¶
type PublishResult struct {
MessageID int64 // Created message ID
QueueItemsCreated int // Number of queue items created
SubscriptionsIDs []int64 // Subscription IDs that received the message
}
PublishResult represents the result of a publish operation.
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher handles publishing messages to topics and creating queue items for active subscriptions.
func NewPublisher ¶
func NewPublisher(opts ...PublisherOption) (*Publisher, error)
NewPublisher creates a new Publisher with the provided options.
Required options:
- WithPublisherRepositories: message, queue, subscription, and topic repositories
- WithPublisherLogger: logger instance
Example:
publisher, err := pubsub.NewPublisher(
pubsub.WithPublisherRepositories(msgRepo, queueRepo, subRepo, topicRepo),
pubsub.WithPublisherLogger(logger),
)
func (*Publisher) Publish ¶
func (p *Publisher) Publish(ctx context.Context, req PublishRequest) (*PublishResult, error)
Publish publishes a message to a topic and creates queue items for all active subscriptions.
The process:
- Validate topic exists
- Create message record
- Find all active subscriptions for the topic
- Create queue items for each subscription
Returns PublishResult with message ID and queue item count, or error if publish fails.
func (*Publisher) PublishBatch ¶
func (p *Publisher) PublishBatch(ctx context.Context, requests []PublishRequest) ([]*PublishResult, error)
PublishBatch publishes multiple messages in a batch. This is more efficient than calling Publish multiple times.
type PublisherOption ¶
PublisherOption configures a Publisher.
func WithPublisherLogger ¶
func WithPublisherLogger(logger Logger) PublisherOption
WithPublisherLogger sets the logger instance.
func WithPublisherRepositories ¶
func WithPublisherRepositories( messageRepo MessageRepository, queueRepo QueueRepository, subscriptionRepo SubscriptionRepository, topicRepo TopicRepository, ) PublisherOption
WithPublisherRepositories sets the required repository dependencies.
type PublisherRepository ¶
type PublisherRepository interface {
// Load retrieves a publisher by ID.
// Returns ErrNoData if not found.
Load(ctx context.Context, id int64) (model.Publisher, error)
// Save creates a new publisher (if Id=0) or updates an existing one.
// Returns the saved publisher with populated Id.
Save(ctx context.Context, m model.Publisher) (model.Publisher, error)
// GetByPublisherCode retrieves a publisher by its unique code.
// Returns ErrNoData if not found.
GetByPublisherCode(ctx context.Context, publisherCode string) (model.Publisher, error)
}
PublisherRepository defines the persistence interface for publisher configurations. Publishers represent message sources in the system.
type QueueRepository ¶
type QueueRepository interface {
// Load retrieves a queue item by ID.
// Returns ErrNoData if not found.
Load(ctx context.Context, id int64) (model.Queue, error)
// Save creates a new queue item (if Id=0) or updates an existing one.
// Returns the saved queue item with populated Id.
Save(ctx context.Context, m *model.Queue) (*model.Queue, error)
// Delete permanently removes a queue item from storage.
Delete(ctx context.Context, m *model.Queue) error
// FindByMessageID finds a queue item for a specific message and subscription.
// Returns ErrNoData if not found.
FindByMessageID(ctx context.Context, subscriptionID, messageID int64) (model.Queue, error)
// FindBySubscriptionID retrieves all queue items for a subscription.
// Returns empty slice if none found.
FindBySubscriptionID(ctx context.Context, subscriptionID int64) ([]model.Queue, error)
// FindPendingItems finds queue items ready for first-time delivery.
// Items must have status=PENDING and next_retry_at <= now.
// Results are ordered by created_at ASC (FIFO).
FindPendingItems(ctx context.Context, limit int) ([]model.Queue, error)
// FindRetryableItems finds queue items ready for retry.
// Items must have status=FAILED and next_retry_at <= now.
// Results are ordered by created_at ASC (oldest failures first).
FindRetryableItems(ctx context.Context, limit int) ([]model.Queue, error)
// FindExpiredItems finds queue items that have expired.
// Items must have expires_at <= now and status != SENT.
// Results are ordered by expires_at ASC (oldest first).
FindExpiredItems(ctx context.Context, limit int) ([]model.Queue, error)
// UpdateNextRetry updates the retry schedule for a queue item.
// Used by retry middleware to schedule next delivery attempt.
UpdateNextRetry(ctx context.Context, id int64, nextRetryAt time.Time, attemptCount int) error
}
QueueRepository defines the persistence interface for queue items. Queue items represent pending or retrying message deliveries.
Implementations must be safe for concurrent use and should use database transactions where appropriate.
type QueueWorker ¶
type QueueWorker struct {
// contains filtered or unexported fields
}
QueueWorker processes the message delivery queue with automatic retry logic. It handles pending messages, failed retries, and Dead Letter Queue management.
The worker runs continuously in the background, processing batches at regular intervals. It implements exponential backoff retry strategy and moves permanently failed messages to the Dead Letter Queue (DLQ) for manual inspection.
Key responsibilities:
- Process pending queue items (first delivery attempt)
- Retry failed deliveries with exponential backoff
- Move exhausted retries to DLQ
- Clean up expired queue items
- Send notifications for delivery failures and DLQ additions
Thread safety: Safe for concurrent use. Each batch is processed sequentially.
func NewQueueWorker ¶
func NewQueueWorker(opts ...Option) (*QueueWorker, error)
NewQueueWorker creates a new queue worker with the provided options.
Required options:
- WithRepositories: queue, message, subscription, and DLQ repositories
- WithDelivery: transmitter provider and message delivery gateway
- WithLogger: logger instance
Optional options:
- WithRetryStrategy: custom retry strategy (default: retry.DefaultStrategy())
- WithBatchSize: batch processing size (default: 100)
Example:
worker, err := pubsub.NewQueueWorker(
pubsub.WithRepositories(queueRepo, msgRepo, subRepo, dlqRepo),
pubsub.WithDelivery(transmitterProvider, gateway),
pubsub.WithLogger(logger),
pubsub.WithBatchSize(200), // optional
)
if err != nil {
log.Fatal(err)
}
func (*QueueWorker) CleanupExpiredItems ¶
func (w *QueueWorker) CleanupExpiredItems(ctx context.Context) (int, error)
CleanupExpiredItems removes expired queue items from the queue. Items are considered expired when expires_at <= now and status != SENT.
This prevents the queue from growing indefinitely with stale messages. Returns the number of deleted items and any critical error.
func (*QueueWorker) GetDLQStats ¶
GetDLQStats retrieves Dead Letter Queue statistics for monitoring. Returns aggregated stats including total count, unresolved count, resolution rate, and average age.
Useful for dashboards, monitoring systems, and operational visibility.
func (*QueueWorker) GetRetrySchedule ¶
func (w *QueueWorker) GetRetrySchedule() string
GetRetrySchedule returns a human-readable description of the retry schedule. Useful for displaying retry configuration to users or in logs.
Example output: "30s → 1m → 2m → 4m → 8m → 16m → 30m".
func (*QueueWorker) ProcessPendingItems ¶
func (w *QueueWorker) ProcessPendingItems(ctx context.Context) (int, error)
ProcessPendingItems processes pending queue items ready for first delivery attempt. It finds all items with status=PENDING and next_retry_at <= now, ordered by created_at ASC (FIFO).
Returns the number of successfully processed items and any critical error. Individual item failures are logged but don't stop batch processing.
func (*QueueWorker) ProcessRetryableItems ¶
func (w *QueueWorker) ProcessRetryableItems(ctx context.Context) (int, error)
ProcessRetryableItems processes failed items ready for retry attempts. It finds all items with status=FAILED and next_retry_at <= now, ordered by created_at ASC.
Returns the number of successfully processed items and any critical error. Individual item failures are logged but don't stop batch processing.
func (*QueueWorker) Run ¶
func (w *QueueWorker) Run(ctx context.Context, interval time.Duration)
Run starts the queue worker event loop that processes messages continuously. It runs until the context is canceled, processing batches at the specified interval.
Each batch processes:
- Pending items (first delivery attempt)
- Retryable items (retry after backoff delay)
- Expired items (cleanup)
This method blocks and should typically be run in a goroutine.
Example:
ctx := context.Background() go worker.Run(ctx, 30*time.Second) // Process every 30 seconds
type SubscribeRequest ¶
type SubscribeRequest struct {
SubscriberID int64 // ID of the subscriber (required, must exist)
TopicCode string // Topic code to subscribe to (required, must exist)
Identifier string // Event identifier filter (required, e.g., "user-123")
CallbackURL string // Webhook URL for message delivery (optional, can be set on subscriber)
}
SubscribeRequest represents a request to create a new subscription. All fields except CallbackURL are required.
type SubscriberRepository ¶
type SubscriberRepository interface {
// Load retrieves a subscriber by ID.
// Returns ErrNoData if not found.
Load(ctx context.Context, id int64) (model.Subscriber, error)
// Save creates a new subscriber (if Id=0) or updates an existing one.
// Returns the saved subscriber with populated Id.
Save(ctx context.Context, m model.Subscriber) (model.Subscriber, error)
// FindByClientID retrieves a subscriber by client ID.
// Returns ErrNoData if not found.
FindByClientID(ctx context.Context, clientID int64) (model.Subscriber, error)
// FindByName retrieves a subscriber by name.
// Returns ErrNoData if not found.
FindByName(ctx context.Context, name string) (model.Subscriber, error)
}
SubscriberRepository defines the persistence interface for subscriber configurations. Subscribers represent message consumers with webhook URLs for delivery.
type SubscriptionManager ¶
type SubscriptionManager struct {
// contains filtered or unexported fields
}
SubscriptionManager handles subscription lifecycle management for the pub/sub system. It provides high-level operations for creating, managing, and querying subscriptions that connect subscribers to topics.
Key operations:
- Subscribe: Create new subscriptions with validation
- Unsubscribe: Deactivate existing subscriptions
- ListSubscriptions: Query subscriptions by subscriber and identifier
- ReactivateSubscription: Re-enable previously deactivated subscriptions
Thread safety: Safe for concurrent use.
func NewSubscriptionManager ¶
func NewSubscriptionManager(opts ...SubscriptionManagerOption) (*SubscriptionManager, error)
NewSubscriptionManager creates a new SubscriptionManager with the provided options.
Required options:
- WithSubscriptionManagerRepositories: subscription, subscriber, and topic repositories
- WithSubscriptionManagerLogger: logger instance
Example:
manager, err := pubsub.NewSubscriptionManager(
pubsub.WithSubscriptionManagerRepositories(subRepo, subscriberRepo, topicRepo),
pubsub.WithSubscriptionManagerLogger(logger),
)
func (*SubscriptionManager) GetSubscription ¶
func (sm *SubscriptionManager) GetSubscription(ctx context.Context, subscriptionID int64) (*model.Subscription, error)
GetSubscription retrieves a single subscription by ID. Returns the subscription or error if not found.
func (*SubscriptionManager) ListSubscriptions ¶
func (sm *SubscriptionManager) ListSubscriptions(ctx context.Context, subscriberID int64, identifier string) ([]model.Subscription, error)
ListSubscriptions returns all active subscriptions for a subscriber. Optionally filters by event identifier if provided.
Parameters:
- subscriberID: Required, must be > 0
- identifier: Optional filter for event type (empty string = no filter)
Returns empty slice if no subscriptions found (not an error).
func (*SubscriptionManager) ReactivateSubscription ¶
func (sm *SubscriptionManager) ReactivateSubscription(ctx context.Context, subscriptionID int64) (*model.Subscription, error)
ReactivateSubscription reactivates a previously deactivated subscription. If the subscription is already active, returns without error.
This allows resuming message delivery to a subscriber that was temporarily unsubscribed.
func (*SubscriptionManager) Subscribe ¶
func (sm *SubscriptionManager) Subscribe(ctx context.Context, req SubscribeRequest) (*model.Subscription, error)
Subscribe creates a new subscription connecting a subscriber to a topic. It validates that both the subscriber and topic exist before creating the subscription. If an active subscription already exists, returns the existing subscription.
Validation:
- SubscriberID must be > 0 and exist in database
- TopicCode must not be empty and exist in database
- Identifier must not be empty
Returns the created (or existing) subscription, or an error if validation fails.
func (*SubscriptionManager) Unsubscribe ¶
func (sm *SubscriptionManager) Unsubscribe(ctx context.Context, subscriptionID int64) (*model.Subscription, error)
Unsubscribe deactivates an existing subscription. This is a soft delete - the subscription record remains in the database but becomes inactive. If the subscription is already inactive, returns the subscription without error.
The subscription can be reactivated later using ReactivateSubscription.
Returns the deactivated subscription or error if operation fails.
type SubscriptionManagerOption ¶
type SubscriptionManagerOption func(*SubscriptionManager) error
SubscriptionManagerOption is a function that configures a SubscriptionManager. Used with the Options Pattern for flexible service construction.
func WithSubscriptionManagerLogger ¶
func WithSubscriptionManagerLogger(logger Logger) SubscriptionManagerOption
WithSubscriptionManagerLogger sets the logger instance for the subscription manager. Logger is required and must not be nil.
This is a required option for NewSubscriptionManager.
func WithSubscriptionManagerRepositories ¶
func WithSubscriptionManagerRepositories( subscriptionRepo SubscriptionRepository, subscriberRepo SubscriberRepository, topicRepo TopicRepository, ) SubscriptionManagerOption
WithSubscriptionManagerRepositories sets the required repository dependencies for the subscription manager. All repositories are required and must not be nil.
This is a required option for NewSubscriptionManager.
type SubscriptionRepository ¶
type SubscriptionRepository interface {
// Load retrieves a subscription by ID.
// Returns ErrNoData if not found.
Load(ctx context.Context, id int64) (model.Subscription, error)
// Save creates a new subscription (if Id=0) or updates an existing one.
// Returns the saved subscription with populated Id.
Save(ctx context.Context, m model.Subscription) (model.Subscription, error)
// FindActive finds active subscriptions matching the criteria.
// If subscriberID=0, searches all subscribers.
// If identifier is empty, searches all identifiers.
FindActive(ctx context.Context, subscriberID int64, identifier string) ([]model.Subscription, error)
// List retrieves subscriptions matching the filter criteria.
// Returns empty slice if none found.
List(ctx context.Context, filter Filter) ([]model.Subscription, error)
// FindAllActive retrieves all active subscriptions with full details.
// Returns SubscriptionFull with joined subscriber and topic information.
FindAllActive(ctx context.Context) ([]model.SubscriptionFull, error)
}
SubscriptionRepository defines the persistence interface for subscription mappings. Subscriptions connect subscribers to topics, enabling message delivery.
type TopicRepository ¶
type TopicRepository interface {
// Load retrieves a topic by ID.
// Returns ErrNoData if not found.
Load(ctx context.Context, id int64) (model.Topic, error)
// Save creates a new topic (if Id=0) or updates an existing one.
// Returns the saved topic with populated Id.
Save(ctx context.Context, m model.Topic) (model.Topic, error)
// GetByTopicCode retrieves a topic by its unique code.
// Returns ErrNoData if not found.
GetByTopicCode(ctx context.Context, topicCode string) (model.Topic, error)
}
TopicRepository defines the persistence interface for topic configurations. Topics represent message categories for pub/sub routing.
type TransmitterProvider ¶
type TransmitterProvider interface {
// GetCallbackUrl retrieves the webhook URL for a subscriber.
// Returns ErrNoData if subscriber not found.
GetCallbackUrl(ctx context.Context, subscriberID int64) (string, error)
}
TransmitterProvider provides subscriber callback URL resolution without circular dependency. This interface decouples the worker from the transmitter/subscriber details.
Implementations typically fetch webhook URLs from subscriber configuration.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
adapters
|
|
|
relica
Package relica provides repository implementations using Relica query builder.
|
Package relica provides repository implementations using Relica query builder. |
|
cmd
|
|
|
pubsub-server
command
Package main provides the PubSub server executable with HTTP API and background worker.
|
Package main provides the PubSub server executable with HTTP API and background worker. |
|
pubsub-server/internal/api
Package api provides HTTP handlers for the PubSub server REST API.
|
Package api provides HTTP handlers for the PubSub server REST API. |
|
pubsub-server/internal/config
Package config provides configuration management for the PubSub standalone server.
|
Package config provides configuration management for the PubSub standalone server. |
|
examples
|
|
|
basic
command
|
|
|
internal
|
|
|
Package model contains all domain models and data structures for the PubSub system.
|
Package model contains all domain models and data structures for the PubSub system. |
|
Package retry provides exponential backoff retry strategies for message delivery.
|
Package retry provides exponential backoff retry strategies for message delivery. |