pubsub

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 24, 2025 License: MIT Imports: 8 Imported by: 0

README

PubSub

Production-ready Pub/Sub library and standalone service for Go

Works both as a library for embedding in your application AND as a standalone microservice with REST API.

CI Go Reference Go Report Card License Release

✨ Features

Core Features
  • 📨 Reliable Message Delivery - Guaranteed delivery with exponential backoff retry
  • 🔄 Exponential Backoff - 30s → 1m → 2m → 4m → 8m → 16m → 30m (max)
  • 💀 Dead Letter Queue (DLQ) - Automatic handling of failed messages after 5 attempts
  • 📊 DLQ Statistics - Track failure reasons and resolution metrics
  • 🎯 Domain-Driven Design - Rich domain models with business logic
  • 🗄️ Repository Pattern - Clean data access abstraction
Architecture
  • 🔌 Pluggable - Bring your own Logger, Notification system
  • ⚙️ Options Pattern - Modern Go API (2025 best practices)
  • 🏗️ Clean Architecture - Services, Repositories, Models separation
  • ✅ Battle-Tested - Production-proven in FreiCON Railway Management System
Database Support
  • 🐬 MySQL - Full support with Relica adapters
  • 🐘 PostgreSQL - Full support with Relica adapters
  • 🪶 SQLite - Full support with Relica adapters
  • ⚡ Zero Dependencies - Relica query builder (no ORM bloat)
Deployment Options
  • 📚 As Library - Embed in your Go application
  • 🐳 As Service - Standalone PubSub server with REST API
  • ☸️ Docker Ready - Production Dockerfile + docker-compose
  • 🌐 Cloud Native - 12-factor app, ENV config, health checks

📦 Installation

As Library
go get github.com/coregx/pubsub@latest
As Standalone Service
# Using Docker (recommended)
cd cmd/pubsub-server
docker-compose up -d

# Or build from source
go build ./cmd/pubsub-server

🚦 Quick Start

Option 1: Standalone Service (Fastest!)
# Windows
cd cmd/pubsub-server
start.bat

# Linux/Mac
cd cmd/pubsub-server
docker-compose up -d

Access API at http://localhost:8080

See Server Documentation for API endpoints.

Option 2: Embedded Library
package main

import (
    "context"
    "database/sql"
    "time"

    "github.com/coregx/pubsub"
    "github.com/coregx/pubsub/adapters/relica"
    _ "github.com/go-sql-driver/mysql"
)

func main() {
    // Connect to database
    db, _ := sql.Open("mysql", "user:pass@tcp(localhost:3306)/pubsub?parseTime=true")

    // Create repositories (production-ready Relica adapters!)
    repos := relica.NewRepositories(db, "mysql")

    // Create services
    publisher, _ := pubsub.NewPublisher(
        pubsub.WithPublisherRepositories(
            repos.Message, repos.Queue, repos.Subscription, repos.Topic,
        ),
        pubsub.WithPublisherLogger(logger),
    )

    // Publish message
    result, _ := publisher.Publish(context.Background(), pubsub.PublishRequest{
        TopicCode:  "user.signup",
        Identifier: "user-123",
        Data:       `{"userId": 123, "email": "user@example.com"}`,
    })

    // Create worker for background processing
    worker, _ := pubsub.NewQueueWorker(
        pubsub.WithRepositories(repos.Queue, repos.Message, repos.Subscription, repos.DLQ),
        pubsub.WithDelivery(transmitterProvider, gateway),
        pubsub.WithLogger(logger),
    )

    // Run worker (processes queue every 30 seconds)
    worker.Run(context.Background(), 30*time.Second)
}

🗄️ Database Setup

import "github.com/coregx/pubsub/migrations"

// Apply all migrations
if err := migrations.ApplyAll(db); err != nil {
    log.Fatal(err)
}
Manual Migrations
# MySQL
mysql -u user -p database < migrations/mysql/001_core_tables.sql
mysql -u user -p database < migrations/mysql/002_retry_fields.sql
mysql -u user -p database < migrations/mysql/003_dead_letter_queue.sql

# PostgreSQL
psql -U user -d database -f migrations/postgres/001_core_tables.sql
...

# SQLite
sqlite3 pubsub.db < migrations/sqlite/001_core_tables.sql
...

🏗️ Architecture

┌─────────────────────────────────────┐
│         Your Application            │
│  (or REST API for standalone)       │
└─────────────┬───────────────────────┘
              │
┌─────────────▼───────────────────────┐
│          Services Layer             │
│  - Publisher                        │
│  - SubscriptionManager              │
│  - QueueWorker                      │
└─────────────┬───────────────────────┘
              │
┌─────────────▼───────────────────────┐
│       Relica Adapters               │
│  (Production-Ready Implementations) │
│  - Zero dependencies                │
│  - MySQL / PostgreSQL / SQLite      │
└─────────────┬───────────────────────┘
              │
┌─────────────▼───────────────────────┐
│          Database                   │
└─────────────────────────────────────┘

📡 REST API (Standalone Service)

When running as standalone service, PubSub-Go exposes these endpoints:

Publish Message
POST /api/v1/publish
Content-Type: application/json

{
  "topicCode": "user.signup",
  "identifier": "optional-dedup-key",
  "data": {
    "userId": 123,
    "email": "user@example.com"
  }
}
Subscribe to Topic
POST /api/v1/subscribe
{
  "subscriberId": 1,
  "topicCode": "user.signup",
  "identifier": "webhook-receiver-1"
}
List Subscriptions
GET /api/v1/subscriptions?subscriberId=1
Unsubscribe
DELETE /api/v1/subscriptions/123
Health Check
GET /api/v1/health

See API Documentation for full details.

🔧 Configuration

Library Configuration (Go)
// Options Pattern (2025 best practice)
worker, err := pubsub.NewQueueWorker(
    pubsub.WithRepositories(queueRepo, msgRepo, subRepo, dlqRepo),
    pubsub.WithDelivery(transmitterProvider, gateway),
    pubsub.WithLogger(logger),
    pubsub.WithBatchSize(100),              // optional
    pubsub.WithRetryStrategy(customStrategy), // optional
    pubsub.WithNotifications(notifService),  // optional
)
Service Configuration (ENV)
# Server
SERVER_HOST=0.0.0.0
SERVER_PORT=8080

# Database
DB_DRIVER=mysql
DB_HOST=localhost
DB_PORT=3306
DB_USER=pubsub
DB_PASSWORD=your_password
DB_NAME=pubsub
DB_PREFIX=pubsub_

# Worker
PUBSUB_BATCH_SIZE=100
PUBSUB_WORKER_INTERVAL=30
PUBSUB_ENABLE_NOTIFICATIONS=true

See .env.example for all options.

📊 How It Works

Message Flow
1. PUBLISH
   Publisher → Topic → Create Message
                    → Find Active Subscriptions
                    → Create Queue Items (one per subscription)

2. WORKER (Background)
   QueueWorker → Find Pending/Retryable Items (batch)
              → Deliver to Subscribers (via webhooks/gateway)
              → On Success: Mark as SENT
              → On Failure: Retry with exponential backoff
              → After 5 failures: Move to DLQ

3. DLQ (Dead Letter Queue)
   Failed items → Manual review
               → Resolve or Delete
Retry Schedule
Attempt 1: Immediate
Attempt 2: +30 seconds
Attempt 3: +1 minute
Attempt 4: +2 minutes
Attempt 5: +4 minutes
Attempt 6: +8 minutes (moves to DLQ after this)

🧪 Testing

# Run all tests
go test ./...

# With coverage
go test ./... -cover

# Model tests (95.9% coverage)
go test ./model/... -cover

# Integration tests (requires database)
go test ./adapters/relica/... -cover

🐳 Docker Deployment

Quick Start
cd cmd/pubsub-server
docker-compose up -d
Production Build
# Build image
docker build -t pubsub-server:0.1.0 -f cmd/pubsub-server/Dockerfile .

# Run with environment
docker run -d \
  -p 8080:8080 \
  -e DB_DRIVER=mysql \
  -e DB_HOST=mysql \
  -e DB_PASSWORD=secret \
  pubsub-server:0.1.0

📚 Examples

🗺️ Roadmap

v0.1.0 (Current - Alpha) ✅
  • Core PubSub functionality
  • Relica adapters (MySQL/PostgreSQL/SQLite)
  • Publisher + SubscriptionManager services
  • Standalone REST API server
  • Docker support
  • Health checks
v0.2.0 (Next)
  • Delivery providers (HTTP webhooks, gRPC)
  • Message encryption
  • Rate limiting
  • Metrics (Prometheus)
  • Admin UI
v1.0.0 (Stable)
  • OpenAPI/Swagger docs
  • Authentication/Authorization
  • Multi-tenancy
  • Message replay
  • Full test coverage (>90%)

🤝 Contributing

This is an alpha release. Contributions welcome!

  1. Fork the repository
  2. Create feature branch (git checkout -b feature/amazing)
  3. Commit changes (git commit -m 'feat: add amazing feature')
  4. Push to branch (git push origin feature/amazing)
  5. Open Pull Request

📄 License

MIT License - see LICENSE file for details.

🙏 Acknowledgments

  • Relica - Type-safe query builder (github.com/coregx/relica)
  • FreiCON - Original production testing ground
  • CoreGX Ecosystem - Part of CoreGX microservices suite

📞 Support


⚠️ Pre-Release Status

This is a pre-release version (v0.1.0 development). The library is production-ready and battle-tested in FreiCON Railway Management System with 95.9% test coverage and zero linter issues. APIs may evolve before v1.0.0 LTS release.

📦 Dependencies

This library uses Relica for type-safe database operations. All dependencies are properly published and available through Go modules.


Made with ❤️ by CoreGX Team

Documentation

Overview

Package pubsub provides a production-ready Pub/Sub library and standalone service for Go with reliable message delivery, retry logic, and Dead Letter Queue (DLQ) support.

Works both as a library for embedding in your application AND as a standalone microservice with REST API.

Features

  • Reliable Message Delivery with guaranteed delivery and exponential backoff retry
  • Exponential Backoff: 30s → 1m → 2m → 4m → 8m → 16m → 30m (max)
  • Dead Letter Queue (DLQ) automatically handles failed messages after 5 attempts
  • DLQ Statistics for tracking failure reasons and resolution metrics
  • Domain-Driven Design with rich domain models containing business logic
  • Repository Pattern for clean data access abstraction
  • Options Pattern for modern Go API design (2025 best practices)
  • Pluggable architecture: bring your own Logger, Notification system
  • Multi-Database Support: MySQL, PostgreSQL, SQLite via Relica adapters
  • Zero Dependencies: uses Relica query builder (no ORM bloat)
  • Embedded Migrations for easy database setup
  • Docker Ready: production Dockerfile with multi-stage builds
  • Cloud Native: 12-factor app, ENV config, health checks
  • Battle-tested in FreiCON Railway Management System

Quick Start

Option 1: As Embedded Library

First, apply the database migrations:

import (
    "database/sql"
    "github.com/coregx/pubsub"
    "github.com/coregx/pubsub/adapters/relica"
    "github.com/coregx/pubsub/migrations"
    _ "github.com/go-sql-driver/mysql"
)

// Connect to database
db, _ := sql.Open("mysql", "user:pass@tcp(localhost:3306)/pubsub?parseTime=true")

// Apply embedded migrations
if err := migrations.ApplyAll(db); err != nil {
    log.Fatal(err)
}

Use production-ready Relica adapters:

// Create all repositories at once
repos := relica.NewRepositories(db, "mysql")

// Create services with Options Pattern
publisher, _ := pubsub.NewPublisher(
    pubsub.WithPublisherRepositories(
        repos.Message, repos.Queue, repos.Subscription, repos.Topic,
    ),
    pubsub.WithPublisherLogger(logger),
)

// Create worker
worker, _ := pubsub.NewQueueWorker(
    pubsub.WithRepositories(repos.Queue, repos.Message, repos.Subscription, repos.DLQ),
    pubsub.WithDelivery(transmitterProvider, gateway),
    pubsub.WithLogger(logger),
)

// Run worker (processes queue every 30 seconds)
ctx := context.Background()
worker.Run(ctx, 30*time.Second)

Publish a message:

result, err := publisher.Publish(ctx, pubsub.PublishRequest{
    TopicCode:  "user.signup",
    Identifier: "user-123",
    Data:       `{"userId": 123, "email": "user@example.com"}`,
})

Option 2: As Standalone Service

Run the standalone PubSub server with Docker:

cd cmd/pubsub-server
docker-compose up -d

Access REST API at http://localhost:8080:

# Publish message
curl -X POST http://localhost:8080/api/v1/publish \
  -H "Content-Type: application/json" \
  -d '{"topicCode":"user.signup","identifier":"user-123","data":{"userId":123}}'

# Health check
curl http://localhost:8080/api/v1/health

See cmd/pubsub-server/README.md for full API documentation

Architecture

The library follows Clean Architecture and Domain-Driven Design principles:

┌─────────────────────────────────────┐
│         Application Layer           │
│  (Publisher, SubscriptionManager,   │
│   QueueWorker, REST API)            │
└─────────────┬───────────────────────┘
              │
┌─────────────▼───────────────────────┐
│         Domain Layer                │
│  (Rich models with business logic)  │
└─────────────┬───────────────────────┘
              │
┌─────────────▼───────────────────────┐
│       Relica Adapters               │
│  (Production-ready implementations) │
└─────────────┬───────────────────────┘
              │
┌─────────────▼───────────────────────┐
│    Database (MySQL/PostgreSQL/      │
│             SQLite)                 │
└─────────────────────────────────────┘

Key principles:

  • Domain models contain business logic (Queue.MarkFailed, Queue.ShouldRetry, etc.)
  • Repository Pattern abstracts database operations
  • Dependency Inversion via interfaces (Logger, Notification, Repositories)
  • Options Pattern for service configuration (2025 best practices)
  • Relica adapters provide zero-dependency database access

Message Flow

  1. PUBLISH Publisher → Topic → Create Message → Find Active Subscriptions → Create Queue Items (one per subscription)

  2. WORKER (Background) QueueWorker → Find Pending/Retryable Items (batch) → Deliver to Subscribers (via webhooks/gateway) → On Success: Mark as SENT → On Failure: Retry with exponential backoff → After 5 failures: Move to DLQ

  3. DLQ (Dead Letter Queue) Failed items → Manual review → Resolve or Delete

Retry Strategy

Failed message deliveries are automatically retried with exponential backoff:

Attempt 1: Immediate
Attempt 2: +30 seconds
Attempt 3: +1 minute
Attempt 4: +2 minutes
Attempt 5: +4 minutes
Attempt 6: +8 minutes (moves to DLQ after this)

After 5 failed attempts, messages are automatically moved to the Dead Letter Queue (DLQ) for manual inspection and resolution.

Database Schema

The library requires 7 database tables (created via embedded migrations):

pubsub_topic          - Topics for pub/sub messaging
pubsub_publisher      - Publisher configurations
pubsub_subscriber     - Subscriber configurations with webhook URLs
pubsub_subscription   - Subscription mappings (subscriber + topic)
pubsub_message        - Published messages
pubsub_queue          - Delivery queue with retry state
pubsub_dlq            - Dead Letter Queue for failed messages

Supports MySQL, PostgreSQL, and SQLite via Relica adapters. Table prefix can be customized (default: "pubsub_").

Examples

See the examples/ directory for complete working examples including:

  • Basic usage with database/sql
  • Custom logger integration
  • Repository implementations

For detailed documentation, see README.md and pkg.go.dev.

Index

Constants

View Source
const (
	// ErrCodeNoData indicates no data was found.
	ErrCodeNoData = "NO_DATA"

	// ErrCodeValidation indicates validation failed.
	ErrCodeValidation = "VALIDATION_ERROR"

	// ErrCodeConfiguration indicates invalid configuration.
	ErrCodeConfiguration = "CONFIGURATION_ERROR"

	// ErrCodeDatabase indicates database operation failed.
	ErrCodeDatabase = "DATABASE_ERROR"

	// ErrCodeDelivery indicates message delivery failed.
	ErrCodeDelivery = "DELIVERY_ERROR"
)

Error codes for pubsub operations.

Variables

View Source
var (
	// ErrNoData is returned when a query returns no results.
	// This is not necessarily an error condition in all cases.
	ErrNoData = &Error{
		Code:    ErrCodeNoData,
		Message: "no data found",
	}

	// ErrInvalidConfiguration is returned when worker configuration is invalid.
	ErrInvalidConfiguration = &Error{
		Code:    ErrCodeConfiguration,
		Message: "invalid worker configuration",
	}
)

Common errors.

View Source
var MigrationFiles embed.FS

MigrationFiles contains all SQL migration files embedded in the binary. Users can access these files programmatically to apply migrations using their preferred migration tool (goose, golang-migrate, atlas, etc.)

Example with goose:

import (
    "github.com/pressly/goose/v3"
    pubsub "github.com/coregx/pubsub"
)

goose.SetBaseFS(pubsub.MigrationFiles)
if err := goose.Up(db, "migrations"); err != nil {
    log.Fatal(err)
}

Example with golang-migrate:

import (
    "github.com/golang-migrate/migrate/v4"
    _ "github.com/golang-migrate/migrate/v4/database/mysql"
    "github.com/golang-migrate/migrate/v4/source/iofs"
    pubsub "github.com/coregx/pubsub"
)

source, err := iofs.New(pubsub.MigrationFiles, "migrations")
m, err := migrate.NewWithSourceInstance("iofs", source, "mysql://user:pass@tcp(host:port)/db")
m.Up()

Functions

func IsNoData

func IsNoData(err error) bool

IsNoData checks if an error is ErrNoData.

Types

type DLQRepository

type DLQRepository interface {
	// Load retrieves a DLQ item by ID.
	// Returns ErrNoData if not found.
	Load(ctx context.Context, id int64) (model.DeadLetterQueue, error)

	// Save creates a new DLQ item (if Id=0) or updates an existing one.
	// Returns the saved DLQ item with populated Id.
	Save(ctx context.Context, m model.DeadLetterQueue) (model.DeadLetterQueue, error)

	// Delete permanently removes a DLQ item from storage.
	// Should only be used after successful resolution or manual cleanup.
	Delete(ctx context.Context, m model.DeadLetterQueue) error

	// FindBySubscription retrieves DLQ items for a specific subscription.
	// Results are ordered by created_at DESC (newest first).
	FindBySubscription(ctx context.Context, subscriptionID int64, limit int) ([]model.DeadLetterQueue, error)

	// FindUnresolved retrieves unresolved DLQ items.
	// Results are ordered by created_at ASC (oldest first).
	FindUnresolved(ctx context.Context, limit int) ([]model.DeadLetterQueue, error)

	// FindOlderThan retrieves DLQ items older than the specified threshold.
	// Useful for identifying stuck items requiring attention.
	FindOlderThan(ctx context.Context, threshold time.Duration, limit int) ([]model.DeadLetterQueue, error)

	// FindByMessageID retrieves a DLQ item for a specific message.
	// Returns ErrNoData if not found.
	FindByMessageID(ctx context.Context, messageID int64) (model.DeadLetterQueue, error)

	// GetStats retrieves DLQ statistics including total count, unresolved count,
	// resolution rate, and average age.
	GetStats(ctx context.Context) (model.DLQStats, error)

	// CountUnresolved returns the count of unresolved DLQ items.
	// Useful for dashboard widgets and monitoring.
	CountUnresolved(ctx context.Context) (int, error)
}

DLQRepository defines the persistence interface for the Dead Letter Queue. The DLQ stores messages that failed delivery after all retry attempts.

type Error

type Error struct {
	// Code is a machine-readable error code
	Code string

	// Message is a human-readable error message
	Message string

	// Err is the underlying error (if any)
	Err error
}

Error represents a pubsub library error with categorization.

func NewError

func NewError(code, message string) *Error

NewError creates a new Error with the given code and message.

func NewErrorWithCause

func NewErrorWithCause(code, message string, cause error) *Error

NewErrorWithCause creates a new Error wrapping an underlying error.

func (*Error) Error

func (e *Error) Error() string

Error implements the error interface.

func (*Error) Unwrap

func (e *Error) Unwrap() error

Unwrap returns the underlying error.

type Filter

type Filter struct {
	SubscriberID int    // Filter by subscriber ID (0 = no filter)
	CbuID        int    // Filter by CBU ID (0 = no filter)
	TopicID      string // Filter by topic ID (empty = no filter)
	IsActive     bool   // Filter by active status
}

Filter represents query filtering options for subscriptions. Used by SubscriptionRepository.List to filter results.

type Logger

type Logger interface {
	// Debugf logs debug-level messages with printf-style formatting.
	Debugf(format string, args ...interface{})

	// Infof logs info-level messages with printf-style formatting.
	Infof(format string, args ...interface{})

	// Warnf logs warning-level messages with printf-style formatting.
	Warnf(format string, args ...interface{})

	// Errorf logs error-level messages with printf-style formatting.
	Errorf(format string, args ...interface{})

	// Info logs info-level messages without formatting.
	Info(message string)
}

Logger defines the logging interface required by the PubSub library. Implement this interface to integrate your logging system (zap, logrus, etc.).

Example implementation:

type ZapLogger struct {
    logger *zap.Logger
}

func (l *ZapLogger) Infof(format string, args ...interface{}) {
    l.logger.Sugar().Infof(format, args...)
}

type LoggingNotificationService

type LoggingNotificationService struct {
	// contains filtered or unexported fields
}

LoggingNotificationService is a simple implementation that logs notifications.

func NewLoggingNotificationService

func NewLoggingNotificationService(logger Logger) *LoggingNotificationService

NewLoggingNotificationService creates a new LoggingNotificationService.

func (*LoggingNotificationService) NotifyDLQItemAdded

func (n *LoggingNotificationService) NotifyDLQItemAdded(_ context.Context, dlq model.DeadLetterQueue) error

NotifyDLQItemAdded logs DLQ item addition.

func (*LoggingNotificationService) NotifyDeliveryFailure

func (n *LoggingNotificationService) NotifyDeliveryFailure(_ context.Context, queue *model.Queue, err error) error

NotifyDeliveryFailure logs delivery failure.

func (*LoggingNotificationService) NotifySubscriptionCreated

func (n *LoggingNotificationService) NotifySubscriptionCreated(_ context.Context, subscription model.Subscription) error

NotifySubscriptionCreated logs subscription creation.

func (*LoggingNotificationService) NotifySubscriptionDeactivated

func (n *LoggingNotificationService) NotifySubscriptionDeactivated(_ context.Context, subscription model.Subscription) error

NotifySubscriptionDeactivated logs subscription deactivation.

type MessageDeliveryGateway

type MessageDeliveryGateway interface {
	// DeliverMessage sends a message to the subscriber's webhook endpoint.
	// Returns error if delivery fails (network error, non-2xx response, timeout).
	DeliverMessage(ctx context.Context, callbackURL string, message *model.DataMessage) error
}

MessageDeliveryGateway defines the interface for delivering messages to subscriber webhooks. This interface avoids circular dependency with the transmitter package while enabling flexible delivery implementations (HTTP webhooks, gRPC, message queues, etc.).

Implementations should handle HTTP transport, retries at the transport level, and return errors for failed deliveries to trigger the retry mechanism.

type MessageRepository

type MessageRepository interface {
	// Load retrieves a message by ID.
	// Returns ErrNoData if not found.
	Load(ctx context.Context, id int64) (model.Message, error)

	// Save creates a new message (if Id=0) or updates an existing one.
	// Returns the saved message with populated Id.
	Save(ctx context.Context, m model.Message) (model.Message, error)

	// Delete permanently removes a message from storage.
	// Should only be used for cleanup, not during normal operation.
	Delete(ctx context.Context, m model.Message) error

	// FindOutdatedMessages finds messages older than the specified number of days.
	// Used for cleanup/archival operations.
	FindOutdatedMessages(ctx context.Context, days int) ([]model.Message, error)
}

MessageRepository defines the persistence interface for published messages. Messages are immutable once created and represent the actual message payloads.

type NoOpNotificationService

type NoOpNotificationService struct{}

NoOpNotificationService is a no-op implementation of NotificationService. Use this when notifications are not needed.

func (*NoOpNotificationService) NotifyDLQItemAdded

func (n *NoOpNotificationService) NotifyDLQItemAdded(_ context.Context, _ model.DeadLetterQueue) error

NotifyDLQItemAdded does nothing.

func (*NoOpNotificationService) NotifyDeliveryFailure

func (n *NoOpNotificationService) NotifyDeliveryFailure(_ context.Context, _ *model.Queue, _ error) error

NotifyDeliveryFailure does nothing.

func (*NoOpNotificationService) NotifySubscriptionCreated

func (n *NoOpNotificationService) NotifySubscriptionCreated(_ context.Context, _ model.Subscription) error

NotifySubscriptionCreated does nothing.

func (*NoOpNotificationService) NotifySubscriptionDeactivated

func (n *NoOpNotificationService) NotifySubscriptionDeactivated(_ context.Context, _ model.Subscription) error

NotifySubscriptionDeactivated does nothing.

type NoopLogger

type NoopLogger struct{}

NoopLogger is a no-operation logger implementation useful for testing or when logging is not desired. All methods are no-ops.

func (*NoopLogger) Debugf

func (l *NoopLogger) Debugf(_ string, _ ...interface{})

Debugf implements Logger.Debugf as a no-op.

func (*NoopLogger) Errorf

func (l *NoopLogger) Errorf(_ string, _ ...interface{})

Errorf implements Logger.Errorf as a no-op.

func (*NoopLogger) Info

func (l *NoopLogger) Info(_ string)

Info implements Logger.Info as a no-op.

func (*NoopLogger) Infof

func (l *NoopLogger) Infof(_ string, _ ...interface{})

Infof implements Logger.Infof as a no-op.

func (*NoopLogger) Warnf

func (l *NoopLogger) Warnf(_ string, _ ...interface{})

Warnf implements Logger.Warnf as a no-op.

type NotificationService

type NotificationService interface {
	// NotifyDLQItemAdded is called when a message is moved to the Dead Letter Queue.
	// This indicates a message failed after all retry attempts.
	NotifyDLQItemAdded(ctx context.Context, dlq model.DeadLetterQueue) error

	// NotifyDeliveryFailure is called when a message delivery fails.
	// This is informational and happens before moving to DLQ.
	NotifyDeliveryFailure(ctx context.Context, queue *model.Queue, err error) error

	// NotifySubscriptionCreated is called when a new subscription is created.
	NotifySubscriptionCreated(ctx context.Context, subscription model.Subscription) error

	// NotifySubscriptionDeactivated is called when a subscription is deactivated.
	NotifySubscriptionDeactivated(ctx context.Context, subscription model.Subscription) error
}

NotificationService defines an optional interface for sending notifications about pub/sub system events (failures, DLQ items, etc.).

Implementations might send emails, Slack messages, SMS, or log to monitoring systems.

type Option

type Option func(*QueueWorker) error

Option is a function that configures a QueueWorker. Used with the Options Pattern (2025 Go best practice) for flexible service construction.

Example:

worker, err := pubsub.NewQueueWorker(
    pubsub.WithRepositories(queueRepo, msgRepo, subRepo, dlqRepo),
    pubsub.WithDelivery(transmitterProvider, gateway),
    pubsub.WithLogger(logger),
    pubsub.WithBatchSize(200), // optional
)

func WithBatchSize

func WithBatchSize(size int) Option

WithBatchSize sets the number of queue items to process per batch. This is an optional configuration - default is 100 items per batch.

Must be > 0. Larger batches improve throughput but use more memory. Smaller batches reduce latency and memory usage.

Recommended values:

  • Low volume: 50-100
  • Medium volume: 100-500
  • High volume: 500-1000

func WithDelivery

func WithDelivery(
	transmitterProvider TransmitterProvider,
	gateway MessageDeliveryGateway,
) Option

WithDelivery sets the message delivery dependencies for the queue worker. Both provider and gateway are required and must not be nil.

This is a required option for NewQueueWorker.

Parameters:

  • transmitterProvider: Resolves subscriber webhook URLs
  • gateway: Handles actual HTTP/gRPC message delivery

func WithLogger

func WithLogger(logger Logger) Option

WithLogger sets the logger instance for the queue worker. Logger is required and must not be nil.

This is a required option for NewQueueWorker.

Use NoopLogger for silent operation or implement Logger interface to integrate with your logging system (zap, logrus, etc.).

func WithNotifications

func WithNotifications(service NotificationService) Option

WithNotifications sets an optional notification service for the queue worker. This is an optional configuration - if not provided, NoOpNotificationService will be used (no notifications).

The notification service receives callbacks for:

  • Delivery failures (every failed attempt)
  • DLQ item additions (when message exhausts retries)

Use this to integrate with alerting systems (email, Slack, PagerDuty, etc.).

func WithRepositories

func WithRepositories(
	queueRepo QueueRepository,
	messageRepo MessageRepository,
	subscriptionRepo SubscriptionRepository,
	dlqRepo DLQRepository,
) Option

WithRepositories sets the required repository dependencies for the queue worker. All four repositories are required and must not be nil.

This is a required option for NewQueueWorker.

Parameters:

  • queueRepo: Queue item persistence
  • messageRepo: Message persistence
  • subscriptionRepo: Subscription persistence
  • dlqRepo: Dead Letter Queue persistence

func WithRetryStrategy

func WithRetryStrategy(strategy retry.Strategy) Option

WithRetryStrategy sets a custom retry strategy for the queue worker. This is an optional configuration - if not provided, retry.DefaultStrategy() will be used.

The default strategy implements exponential backoff: 30s → 1m → 2m → 4m → 8m → 16m → 30m (max).

Use this option to customize:

  • Retry delays (backoff schedule)
  • Maximum retry attempts before DLQ
  • DLQ threshold

type PublishRequest

type PublishRequest struct {
	TopicCode  string // Topic code to publish to
	Identifier string // Message identifier (event type)
	Data       string // Message payload
}

PublishRequest represents a request to publish a message.

type PublishResult

type PublishResult struct {
	MessageID         int64   // Created message ID
	QueueItemsCreated int     // Number of queue items created
	SubscriptionsIDs  []int64 // Subscription IDs that received the message
}

PublishResult represents the result of a publish operation.

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher handles publishing messages to topics and creating queue items for active subscriptions.

func NewPublisher

func NewPublisher(opts ...PublisherOption) (*Publisher, error)

NewPublisher creates a new Publisher with the provided options.

Required options:

  • WithPublisherRepositories: message, queue, subscription, and topic repositories
  • WithPublisherLogger: logger instance

Example:

publisher, err := pubsub.NewPublisher(
    pubsub.WithPublisherRepositories(msgRepo, queueRepo, subRepo, topicRepo),
    pubsub.WithPublisherLogger(logger),
)

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, req PublishRequest) (*PublishResult, error)

Publish publishes a message to a topic and creates queue items for all active subscriptions.

The process:

  1. Validate topic exists
  2. Create message record
  3. Find all active subscriptions for the topic
  4. Create queue items for each subscription

Returns PublishResult with message ID and queue item count, or error if publish fails.

func (*Publisher) PublishBatch

func (p *Publisher) PublishBatch(ctx context.Context, requests []PublishRequest) ([]*PublishResult, error)

PublishBatch publishes multiple messages in a batch. This is more efficient than calling Publish multiple times.

type PublisherOption

type PublisherOption func(*Publisher) error

PublisherOption configures a Publisher.

func WithPublisherLogger

func WithPublisherLogger(logger Logger) PublisherOption

WithPublisherLogger sets the logger instance.

func WithPublisherRepositories

func WithPublisherRepositories(
	messageRepo MessageRepository,
	queueRepo QueueRepository,
	subscriptionRepo SubscriptionRepository,
	topicRepo TopicRepository,
) PublisherOption

WithPublisherRepositories sets the required repository dependencies.

type PublisherRepository

type PublisherRepository interface {
	// Load retrieves a publisher by ID.
	// Returns ErrNoData if not found.
	Load(ctx context.Context, id int64) (model.Publisher, error)

	// Save creates a new publisher (if Id=0) or updates an existing one.
	// Returns the saved publisher with populated Id.
	Save(ctx context.Context, m model.Publisher) (model.Publisher, error)

	// GetByPublisherCode retrieves a publisher by its unique code.
	// Returns ErrNoData if not found.
	GetByPublisherCode(ctx context.Context, publisherCode string) (model.Publisher, error)
}

PublisherRepository defines the persistence interface for publisher configurations. Publishers represent message sources in the system.

type QueueRepository

type QueueRepository interface {
	// Load retrieves a queue item by ID.
	// Returns ErrNoData if not found.
	Load(ctx context.Context, id int64) (model.Queue, error)

	// Save creates a new queue item (if Id=0) or updates an existing one.
	// Returns the saved queue item with populated Id.
	Save(ctx context.Context, m *model.Queue) (*model.Queue, error)

	// Delete permanently removes a queue item from storage.
	Delete(ctx context.Context, m *model.Queue) error

	// FindByMessageID finds a queue item for a specific message and subscription.
	// Returns ErrNoData if not found.
	FindByMessageID(ctx context.Context, subscriptionID, messageID int64) (model.Queue, error)

	// FindBySubscriptionID retrieves all queue items for a subscription.
	// Returns empty slice if none found.
	FindBySubscriptionID(ctx context.Context, subscriptionID int64) ([]model.Queue, error)

	// FindPendingItems finds queue items ready for first-time delivery.
	// Items must have status=PENDING and next_retry_at <= now.
	// Results are ordered by created_at ASC (FIFO).
	FindPendingItems(ctx context.Context, limit int) ([]model.Queue, error)

	// FindRetryableItems finds queue items ready for retry.
	// Items must have status=FAILED and next_retry_at <= now.
	// Results are ordered by created_at ASC (oldest failures first).
	FindRetryableItems(ctx context.Context, limit int) ([]model.Queue, error)

	// FindExpiredItems finds queue items that have expired.
	// Items must have expires_at <= now and status != SENT.
	// Results are ordered by expires_at ASC (oldest first).
	FindExpiredItems(ctx context.Context, limit int) ([]model.Queue, error)

	// UpdateNextRetry updates the retry schedule for a queue item.
	// Used by retry middleware to schedule next delivery attempt.
	UpdateNextRetry(ctx context.Context, id int64, nextRetryAt time.Time, attemptCount int) error
}

QueueRepository defines the persistence interface for queue items. Queue items represent pending or retrying message deliveries.

Implementations must be safe for concurrent use and should use database transactions where appropriate.

type QueueWorker

type QueueWorker struct {
	// contains filtered or unexported fields
}

QueueWorker processes the message delivery queue with automatic retry logic. It handles pending messages, failed retries, and Dead Letter Queue management.

The worker runs continuously in the background, processing batches at regular intervals. It implements exponential backoff retry strategy and moves permanently failed messages to the Dead Letter Queue (DLQ) for manual inspection.

Key responsibilities:

  • Process pending queue items (first delivery attempt)
  • Retry failed deliveries with exponential backoff
  • Move exhausted retries to DLQ
  • Clean up expired queue items
  • Send notifications for delivery failures and DLQ additions

Thread safety: Safe for concurrent use. Each batch is processed sequentially.

func NewQueueWorker

func NewQueueWorker(opts ...Option) (*QueueWorker, error)

NewQueueWorker creates a new queue worker with the provided options.

Required options:

  • WithRepositories: queue, message, subscription, and DLQ repositories
  • WithDelivery: transmitter provider and message delivery gateway
  • WithLogger: logger instance

Optional options:

  • WithRetryStrategy: custom retry strategy (default: retry.DefaultStrategy())
  • WithBatchSize: batch processing size (default: 100)

Example:

worker, err := pubsub.NewQueueWorker(
    pubsub.WithRepositories(queueRepo, msgRepo, subRepo, dlqRepo),
    pubsub.WithDelivery(transmitterProvider, gateway),
    pubsub.WithLogger(logger),
    pubsub.WithBatchSize(200), // optional
)
if err != nil {
    log.Fatal(err)
}

func (*QueueWorker) CleanupExpiredItems

func (w *QueueWorker) CleanupExpiredItems(ctx context.Context) (int, error)

CleanupExpiredItems removes expired queue items from the queue. Items are considered expired when expires_at <= now and status != SENT.

This prevents the queue from growing indefinitely with stale messages. Returns the number of deleted items and any critical error.

func (*QueueWorker) GetDLQStats

func (w *QueueWorker) GetDLQStats(ctx context.Context) (model.DLQStats, error)

GetDLQStats retrieves Dead Letter Queue statistics for monitoring. Returns aggregated stats including total count, unresolved count, resolution rate, and average age.

Useful for dashboards, monitoring systems, and operational visibility.

func (*QueueWorker) GetRetrySchedule

func (w *QueueWorker) GetRetrySchedule() string

GetRetrySchedule returns a human-readable description of the retry schedule. Useful for displaying retry configuration to users or in logs.

Example output: "30s → 1m → 2m → 4m → 8m → 16m → 30m".

func (*QueueWorker) ProcessPendingItems

func (w *QueueWorker) ProcessPendingItems(ctx context.Context) (int, error)

ProcessPendingItems processes pending queue items ready for first delivery attempt. It finds all items with status=PENDING and next_retry_at <= now, ordered by created_at ASC (FIFO).

Returns the number of successfully processed items and any critical error. Individual item failures are logged but don't stop batch processing.

func (*QueueWorker) ProcessRetryableItems

func (w *QueueWorker) ProcessRetryableItems(ctx context.Context) (int, error)

ProcessRetryableItems processes failed items ready for retry attempts. It finds all items with status=FAILED and next_retry_at <= now, ordered by created_at ASC.

Returns the number of successfully processed items and any critical error. Individual item failures are logged but don't stop batch processing.

func (*QueueWorker) Run

func (w *QueueWorker) Run(ctx context.Context, interval time.Duration)

Run starts the queue worker event loop that processes messages continuously. It runs until the context is canceled, processing batches at the specified interval.

Each batch processes:

  • Pending items (first delivery attempt)
  • Retryable items (retry after backoff delay)
  • Expired items (cleanup)

This method blocks and should typically be run in a goroutine.

Example:

ctx := context.Background()
go worker.Run(ctx, 30*time.Second) // Process every 30 seconds

type SubscribeRequest

type SubscribeRequest struct {
	SubscriberID int64  // ID of the subscriber (required, must exist)
	TopicCode    string // Topic code to subscribe to (required, must exist)
	Identifier   string // Event identifier filter (required, e.g., "user-123")
	CallbackURL  string // Webhook URL for message delivery (optional, can be set on subscriber)
}

SubscribeRequest represents a request to create a new subscription. All fields except CallbackURL are required.

type SubscriberRepository

type SubscriberRepository interface {
	// Load retrieves a subscriber by ID.
	// Returns ErrNoData if not found.
	Load(ctx context.Context, id int64) (model.Subscriber, error)

	// Save creates a new subscriber (if Id=0) or updates an existing one.
	// Returns the saved subscriber with populated Id.
	Save(ctx context.Context, m model.Subscriber) (model.Subscriber, error)

	// FindByClientID retrieves a subscriber by client ID.
	// Returns ErrNoData if not found.
	FindByClientID(ctx context.Context, clientID int64) (model.Subscriber, error)

	// FindByName retrieves a subscriber by name.
	// Returns ErrNoData if not found.
	FindByName(ctx context.Context, name string) (model.Subscriber, error)
}

SubscriberRepository defines the persistence interface for subscriber configurations. Subscribers represent message consumers with webhook URLs for delivery.

type SubscriptionManager

type SubscriptionManager struct {
	// contains filtered or unexported fields
}

SubscriptionManager handles subscription lifecycle management for the pub/sub system. It provides high-level operations for creating, managing, and querying subscriptions that connect subscribers to topics.

Key operations:

  • Subscribe: Create new subscriptions with validation
  • Unsubscribe: Deactivate existing subscriptions
  • ListSubscriptions: Query subscriptions by subscriber and identifier
  • ReactivateSubscription: Re-enable previously deactivated subscriptions

Thread safety: Safe for concurrent use.

func NewSubscriptionManager

func NewSubscriptionManager(opts ...SubscriptionManagerOption) (*SubscriptionManager, error)

NewSubscriptionManager creates a new SubscriptionManager with the provided options.

Required options:

  • WithSubscriptionManagerRepositories: subscription, subscriber, and topic repositories
  • WithSubscriptionManagerLogger: logger instance

Example:

manager, err := pubsub.NewSubscriptionManager(
    pubsub.WithSubscriptionManagerRepositories(subRepo, subscriberRepo, topicRepo),
    pubsub.WithSubscriptionManagerLogger(logger),
)

func (*SubscriptionManager) GetSubscription

func (sm *SubscriptionManager) GetSubscription(ctx context.Context, subscriptionID int64) (*model.Subscription, error)

GetSubscription retrieves a single subscription by ID. Returns the subscription or error if not found.

func (*SubscriptionManager) ListSubscriptions

func (sm *SubscriptionManager) ListSubscriptions(ctx context.Context, subscriberID int64, identifier string) ([]model.Subscription, error)

ListSubscriptions returns all active subscriptions for a subscriber. Optionally filters by event identifier if provided.

Parameters:

  • subscriberID: Required, must be > 0
  • identifier: Optional filter for event type (empty string = no filter)

Returns empty slice if no subscriptions found (not an error).

func (*SubscriptionManager) ReactivateSubscription

func (sm *SubscriptionManager) ReactivateSubscription(ctx context.Context, subscriptionID int64) (*model.Subscription, error)

ReactivateSubscription reactivates a previously deactivated subscription. If the subscription is already active, returns without error.

This allows resuming message delivery to a subscriber that was temporarily unsubscribed.

func (*SubscriptionManager) Subscribe

Subscribe creates a new subscription connecting a subscriber to a topic. It validates that both the subscriber and topic exist before creating the subscription. If an active subscription already exists, returns the existing subscription.

Validation:

  • SubscriberID must be > 0 and exist in database
  • TopicCode must not be empty and exist in database
  • Identifier must not be empty

Returns the created (or existing) subscription, or an error if validation fails.

func (*SubscriptionManager) Unsubscribe

func (sm *SubscriptionManager) Unsubscribe(ctx context.Context, subscriptionID int64) (*model.Subscription, error)

Unsubscribe deactivates an existing subscription. This is a soft delete - the subscription record remains in the database but becomes inactive. If the subscription is already inactive, returns the subscription without error.

The subscription can be reactivated later using ReactivateSubscription.

Returns the deactivated subscription or error if operation fails.

type SubscriptionManagerOption

type SubscriptionManagerOption func(*SubscriptionManager) error

SubscriptionManagerOption is a function that configures a SubscriptionManager. Used with the Options Pattern for flexible service construction.

func WithSubscriptionManagerLogger

func WithSubscriptionManagerLogger(logger Logger) SubscriptionManagerOption

WithSubscriptionManagerLogger sets the logger instance for the subscription manager. Logger is required and must not be nil.

This is a required option for NewSubscriptionManager.

func WithSubscriptionManagerRepositories

func WithSubscriptionManagerRepositories(
	subscriptionRepo SubscriptionRepository,
	subscriberRepo SubscriberRepository,
	topicRepo TopicRepository,
) SubscriptionManagerOption

WithSubscriptionManagerRepositories sets the required repository dependencies for the subscription manager. All repositories are required and must not be nil.

This is a required option for NewSubscriptionManager.

type SubscriptionRepository

type SubscriptionRepository interface {
	// Load retrieves a subscription by ID.
	// Returns ErrNoData if not found.
	Load(ctx context.Context, id int64) (model.Subscription, error)

	// Save creates a new subscription (if Id=0) or updates an existing one.
	// Returns the saved subscription with populated Id.
	Save(ctx context.Context, m model.Subscription) (model.Subscription, error)

	// FindActive finds active subscriptions matching the criteria.
	// If subscriberID=0, searches all subscribers.
	// If identifier is empty, searches all identifiers.
	FindActive(ctx context.Context, subscriberID int64, identifier string) ([]model.Subscription, error)

	// List retrieves subscriptions matching the filter criteria.
	// Returns empty slice if none found.
	List(ctx context.Context, filter Filter) ([]model.Subscription, error)

	// FindAllActive retrieves all active subscriptions with full details.
	// Returns SubscriptionFull with joined subscriber and topic information.
	FindAllActive(ctx context.Context) ([]model.SubscriptionFull, error)
}

SubscriptionRepository defines the persistence interface for subscription mappings. Subscriptions connect subscribers to topics, enabling message delivery.

type TopicRepository

type TopicRepository interface {
	// Load retrieves a topic by ID.
	// Returns ErrNoData if not found.
	Load(ctx context.Context, id int64) (model.Topic, error)

	// Save creates a new topic (if Id=0) or updates an existing one.
	// Returns the saved topic with populated Id.
	Save(ctx context.Context, m model.Topic) (model.Topic, error)

	// GetByTopicCode retrieves a topic by its unique code.
	// Returns ErrNoData if not found.
	GetByTopicCode(ctx context.Context, topicCode string) (model.Topic, error)
}

TopicRepository defines the persistence interface for topic configurations. Topics represent message categories for pub/sub routing.

type TransmitterProvider

type TransmitterProvider interface {
	// GetCallbackUrl retrieves the webhook URL for a subscriber.
	// Returns ErrNoData if subscriber not found.
	GetCallbackUrl(ctx context.Context, subscriberID int64) (string, error)
}

TransmitterProvider provides subscriber callback URL resolution without circular dependency. This interface decouples the worker from the transmitter/subscriber details.

Implementations typically fetch webhook URLs from subscriber configuration.

Directories

Path Synopsis
adapters
relica
Package relica provides repository implementations using Relica query builder.
Package relica provides repository implementations using Relica query builder.
cmd
pubsub-server command
Package main provides the PubSub server executable with HTTP API and background worker.
Package main provides the PubSub server executable with HTTP API and background worker.
pubsub-server/internal/api
Package api provides HTTP handlers for the PubSub server REST API.
Package api provides HTTP handlers for the PubSub server REST API.
pubsub-server/internal/config
Package config provides configuration management for the PubSub standalone server.
Package config provides configuration management for the PubSub standalone server.
examples
basic command
internal
Package model contains all domain models and data structures for the PubSub system.
Package model contains all domain models and data structures for the PubSub system.
Package retry provides exponential backoff retry strategies for message delivery.
Package retry provides exponential backoff retry strategies for message delivery.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL