Skip to content

mohosy/postgres-backed-pubsub-queue

Repository files navigation

postgres-backed-pubsub-queue

TypeScript Express PostgreSQL Docker License: MIT

Durable pub/sub message queue with consumer-group offsets, built with Express + PostgreSQL. Messages are stored in a write-ahead log (B-tree indexed on (topic_id, id)) so publishes are append-only and pulls stay ordered. Each consumer group tracks its own offset, giving exactly-once delivery per group when combined with idempotent ACKs.

Why this is interesting

  • Shows backend fundamentals: HTTP API design, SQL schema, index choice, ACID transactions, idempotent semantics.
  • Uses consumer offsets instead of deletes, so reads are O(log n + k) (index seek + batch) and appends are O(1) amortized.
  • Demonstrates safe concurrency: offset rows are locked FOR UPDATE so multiple consumers in the same group cannot race.
  • Fully containerized (API + Postgres) and covered by integration tests using pg-mem + supertest.

Quick start

Prereqs: Node 20+, npm. For Docker users, Compose file is included.

# install deps
npm install

# run locally against your Postgres
cp .env.example .env
# update DATABASE_URL if needed, then:
npm run dev

# or build and run production bundle
npm run build
npm start

# run tests (uses in-memory Postgres)
npm test

Docker (if Docker is available):

docker compose up --build

Note: Docker isn't installed in this environment, but the compose stack is ready for machines with Docker.

API

Base URL: http://localhost:4000/api

  • POST /topics – create or idempotently ensure a topic
    { "name": "orders" }
  • POST /topics/:name/publish – append a message to a topic
    { "payload": { "orderId": 42, "event": "created" } }
  • GET /topics/:name/pull?group=<consumer>&batch=10 – pull the next batch for a consumer group
  • POST /topics/:name/ack – advance a consumer group offset
    { "group": "analytics", "lastAckedId": 15 }

Delivery semantics

  • Messages are never deleted; consumers advance offsets.
  • Ordering is preserved per topic via the (topic_id, id) index.
  • ACKs are idempotent and monotonic (last_acked_id = GREATEST(current, ack)), preventing rewind.
  • Offsets are fetched FOR UPDATE to avoid lost updates when multiple consumers share a group.

Architecture

  • API layer: Express 5 routes (src/routes/queueRoutes.ts) with centralized error handling.
  • Service layer: QueueService orchestrates transactions and validation.
  • Persistence: PostgreSQL tables topics, messages, consumer_offsets; B-tree indexes on (topic_id, id) and (topic_id, created_at) for fast range scans.
  • Schema bootstrap: ensureSchema runs migrations/init.sql at startup so local + container flows are zero-touch.
  • Testing: Node's built-in test runner + pg-mem simulate Postgres; supertest covers the HTTP surface.

Schema (migrations/init.sql)

  • topics(id, name UNIQUE, created_at)
  • messages(id BIGSERIAL, topic_id FK, payload JSONB, created_at, INDEX topic_id,id)
  • consumer_offsets(topic_id, consumer_group UNIQUE per topic, last_acked_id BIGINT)

Sample session

curl -X POST localhost:4000/api/topics -H 'Content-Type: application/json' -d '{"name":"events"}'

curl -X POST localhost:4000/api/topics/events/publish \
  -H 'Content-Type: application/json' \
  -d '{"payload":{"seq":0,"event":"boot"}}'

curl "localhost:4000/api/topics/events/pull?group=worker-a&batch=10"
# => returns payload with messages and lastAckedId

curl -X POST localhost:4000/api/topics/events/ack \
  -H 'Content-Type: application/json' \
  -d '{"group":"worker-a","lastAckedId":1}'

Complexity notes

  • Append: O(1) amortized (sequential BIGSERIAL insert).
  • Pull: O(log n + k) where k is batch size (index seek + range scan).
  • ACK: O(1) single-row update guarded by FOR UPDATE lock.

Roadmap ideas

  1. Add long-polling or SSE for push-style delivery.
  2. Pluggable retention policies (time- or size-based trimming with tombstones).
  3. Dead-letter queues and retry counters per consumer group.
  4. Observability: request latency + queue depth metrics.

Author

Mo Shirmohammadi (@mohosy)

About

Durable pub/sub message queue with consumer groups, built on PostgreSQL LISTEN/NOTIFY.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors