Durable pub/sub message queue with consumer-group offsets, built with Express + PostgreSQL. Messages are stored in a write-ahead log (B-tree indexed on (topic_id, id)) so publishes are append-only and pulls stay ordered. Each consumer group tracks its own offset, giving exactly-once delivery per group when combined with idempotent ACKs.
- Shows backend fundamentals: HTTP API design, SQL schema, index choice, ACID transactions, idempotent semantics.
- Uses consumer offsets instead of deletes, so reads are O(log n + k) (index seek + batch) and appends are O(1) amortized.
- Demonstrates safe concurrency: offset rows are locked
FOR UPDATEso multiple consumers in the same group cannot race. - Fully containerized (API + Postgres) and covered by integration tests using
pg-mem+supertest.
Prereqs: Node 20+, npm. For Docker users, Compose file is included.
# install deps
npm install
# run locally against your Postgres
cp .env.example .env
# update DATABASE_URL if needed, then:
npm run dev
# or build and run production bundle
npm run build
npm start
# run tests (uses in-memory Postgres)
npm testDocker (if Docker is available):
docker compose up --buildNote: Docker isn't installed in this environment, but the compose stack is ready for machines with Docker.
Base URL: http://localhost:4000/api
POST /topics– create or idempotently ensure a topic{ "name": "orders" }POST /topics/:name/publish– append a message to a topic{ "payload": { "orderId": 42, "event": "created" } }GET /topics/:name/pull?group=<consumer>&batch=10– pull the next batch for a consumer groupPOST /topics/:name/ack– advance a consumer group offset{ "group": "analytics", "lastAckedId": 15 }
- Messages are never deleted; consumers advance offsets.
- Ordering is preserved per topic via the
(topic_id, id)index. - ACKs are idempotent and monotonic (
last_acked_id = GREATEST(current, ack)), preventing rewind. - Offsets are fetched
FOR UPDATEto avoid lost updates when multiple consumers share a group.
- API layer: Express 5 routes (
src/routes/queueRoutes.ts) with centralized error handling. - Service layer:
QueueServiceorchestrates transactions and validation. - Persistence: PostgreSQL tables
topics,messages,consumer_offsets; B-tree indexes on(topic_id, id)and(topic_id, created_at)for fast range scans. - Schema bootstrap:
ensureSchemarunsmigrations/init.sqlat startup so local + container flows are zero-touch. - Testing: Node's built-in test runner +
pg-memsimulate Postgres;supertestcovers the HTTP surface.
topics(id, name UNIQUE, created_at)messages(id BIGSERIAL, topic_id FK, payload JSONB, created_at, INDEX topic_id,id)consumer_offsets(topic_id, consumer_group UNIQUE per topic, last_acked_id BIGINT)
curl -X POST localhost:4000/api/topics -H 'Content-Type: application/json' -d '{"name":"events"}'
curl -X POST localhost:4000/api/topics/events/publish \
-H 'Content-Type: application/json' \
-d '{"payload":{"seq":0,"event":"boot"}}'
curl "localhost:4000/api/topics/events/pull?group=worker-a&batch=10"
# => returns payload with messages and lastAckedId
curl -X POST localhost:4000/api/topics/events/ack \
-H 'Content-Type: application/json' \
-d '{"group":"worker-a","lastAckedId":1}'- Append: O(1) amortized (sequential
BIGSERIALinsert). - Pull: O(log n + k) where
kis batch size (index seek + range scan). - ACK: O(1) single-row update guarded by
FOR UPDATElock.
- Add long-polling or SSE for push-style delivery.
- Pluggable retention policies (time- or size-based trimming with tombstones).
- Dead-letter queues and retry counters per consumer group.
- Observability: request latency + queue depth metrics.
Mo Shirmohammadi (@mohosy)