A distributed task queue in Go that processes thousands of asynchronous jobs per second. It features priority scheduling, automatic retries, horizontal scaling, and production-grade monitoring. The system is fully containerized, has comprehensive tests, and includes Prometheus metrics.
- Priority Queue System - Four priority levels (Low, Medium, High, Critical)
- Distributed Workers - Horizontal scaling with multiple worker instances
- Task Persistence - Redis-backed storage for reliability
- Automatic Retries - Exponential backoff for failed tasks
- Graceful Shutdown - Clean shutdown with task completion
- RESTful API - Complete HTTP API for task management
- Observability - Prometheus metrics integration
- Health Checks - Built-in health endpoints
- Structured Logging - JSON logging with Zap
- Comprehensive Testing - Unit tests with high coverage
- Docker Support - Full containerization with Docker Compose
┌─────────────┐ HTTP ┌──────────┐
│ Client │ ────────────► │ Server │
└─────────────┘ └──────────┘
│
▼
┌──────────┐
│ Redis │
└──────────┘
│
┌────────────────┼────────────────┐
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Worker 1│ │ Worker 2│ │ Worker 3│
└─────────┘ └─────────┘ └─────────┘
# Start all services (Redis, Server, 3 Workers, Prometheus)
make docker-up
# View logs
make docker-logs
# Stop all services
make docker-down# Install dependencies
make deps
# Start Redis (required)
docker run -d -p 6379:6379 redis:7-alpine
# Run tests
make test
# Build binaries
make build
# Run server
make run-server
# Run worker (in another terminal)
make run-workercurl -X POST http://localhost:8080/api/v1/tasks \
-H "Content-Type: application/json" \
-d '{
"type": "send_email",
"priority": 2,
"payload": {
"recipient": "user@example.com",
"subject": "Hello World",
"body": "This is a test email"
},
"max_retries": 3
}'Response:
{
"task_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "submitted"
}curl http://localhost:8080/api/v1/tasks/550e8400-e29b-41d4-a716-446655440000Response:
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"type": "send_email",
"priority": 2,
"status": "completed",
"payload": {
"recipient": "user@example.com",
"subject": "Hello World"
},
"max_retries": 3,
"retry_count": 0,
"created_at": "2024-01-15T10:30:00Z",
"started_at": "2024-01-15T10:30:01Z",
"completed_at": "2024-01-15T10:30:03Z",
"worker_id": "worker-1"
}curl http://localhost:8080/api/v1/statsResponse:
{
"pending": 5,
"processing": 3,
"completed": 142,
"failed": 2
}curl http://localhost:8080/healthThe system comes with example handlers for common task types:
{
"type": "send_email",
"priority": 2,
"payload": {
"recipient": "user@example.com",
"subject": "Subject",
"body": "Message body"
}
}{
"type": "process_image",
"priority": 1,
"payload": {
"image_url": "https://example.com/image.jpg",
"operation": "resize"
}
}{
"type": "export_data",
"priority": 0,
"payload": {
"format": "csv",
"filters": {}
}
}{
"type": "call_webhook",
"priority": 3,
"payload": {
"url": "https://example.com/webhook",
"method": "POST",
"body": {}
}
}| Priority | Value | Use Case |
|---|---|---|
| Critical | 3 | Time-sensitive operations, alerts |
| High | 2 | User-facing operations |
| Medium | 1 | Background processing |
| Low | 0 | Batch jobs, cleanup tasks |
To add custom task handlers, register them in your code:
queue.RegisterHandler("my_task", func(ctx context.Context, t *task.Task) error {
// Extract payload
data := t.Payload["key"].(string)
// Perform work
result, err := processData(data)
if err != nil {
return err // Will trigger retry
}
// Success
return nil
})Access metrics at http://localhost:8080/metrics
Available metrics:
tasks_submitted_total- Total tasks submitted by type and prioritytasks_processed_total- Total tasks processed by type and statustask_duration_seconds- Task processing duration histogramqueue_size- Current queue size by priorityworkers_active- Number of active workerstask_retries_total- Total retry attempts by type
Access Prometheus UI at http://localhost:9090
Example queries:
# Task processing rate
rate(tasks_processed_total[5m])
# Average task duration by type
rate(task_duration_seconds_sum[5m]) / rate(task_duration_seconds_count[5m])
# Failed task rate
rate(tasks_processed_total{status="failed"}[5m])
Server:
REDIS_ADDR- Redis address (default:localhost:6379)REDIS_PASSWORD- Redis password (default: empty)PORT- HTTP server port (default:8080)
Worker:
REDIS_ADDR- Redis address (default:localhost:6379)REDIS_PASSWORD- Redis password (default: empty)WORKER_ID- Unique worker identifier (default:worker-1)
# Run all tests
make test
# Run tests with coverage report
make test-coverage
# View coverage in browser
open coverage.htmldistributed-task-queue/
├── cmd/
│ ├── server/ # HTTP API server
│ └── worker/ # Task worker
├── internal/
│ ├── queue/ # Core queue implementation
│ ├── task/ # Task definitions
│ ├── storage/ # Redis & memory storage
│ └── metrics/ # Prometheus metrics
├── api/ # HTTP handlers
├── docker-compose.yml # Docker orchestration
├── Makefile # Build automation
└── README.md # This file
- Throughput: 1000+ tasks/second per worker
- Latency: <100ms task submission
- Scalability: Linear scaling with worker count
- Reliability: Automatic retry with exponential backoff
Horizontal Scaling:
# Add more workers
docker-compose up -d --scale worker=5Tuning:
- Adjust
numWorkersper process for CPU-bound tasks - Increase Redis connection pool for high throughput
- Configure task timeouts based on workload
- Add authentication/authorization to API endpoints
- Use TLS for Redis connections
- Implement rate limiting
- Validate task payloads
- Configure Redis persistence (AOF/RDB)
- Set up Redis replication for high availability
- Monitor queue depth and worker health
- Implement dead letter queues
- Export metrics to monitoring system
- Set up alerting for queue depth, failure rate
- Configure structured logging aggregation
- Add distributed tracing
This project demonstrates:
✅ Advanced Go - Goroutines, channels, interfaces, context
✅ Distributed Systems - Message queues, task distribution
✅ Production Practices - Testing, monitoring, Docker
✅ API Design - RESTful endpoints, error handling
✅ Persistence - Redis integration, data modeling
✅ Observability - Metrics, logging, health checks
✅ DevOps - Docker, CI/CD ready, configuration management