-
-
Notifications
You must be signed in to change notification settings - Fork 96
Description
Summary
When a transient error occurs during poll() in PostgresMessageQueue.listen(), the listener permanently stops processing messages without any recovery mechanism. This causes a growing message backlog that can only be resolved by restarting the application.
Observed behavior
In a production deployment with 3 application instances sharing the same PostgresMessageQueue, one instance stopped processing queued messages ~4 minutes after startup. The instance continued accepting and enqueuing new incoming activities (via POST /ap/inbox → 202), but never consumed any further messages from the queue. The other two instances continued processing normally.
Over ~11 hours, this resulted in a backlog of 4,000+ unprocessed messages in the fedify_message_v2 table. Restarting the affected instance restored processing.
Root cause
There are three code paths in listen() where an unhandled poll() rejection permanently kills the listener:
1. Fallback polling loop (line 348)
while (!signal?.aborted) {
// ... wait for pollInterval ...
await serializedPoll(); // ← unhandled: rejects listen()
}If serializedPoll() rejects, the error propagates out of the while loop, causing the listen() promise to reject. The listener never polls again.
2. NOTIFY callback for immediate messages (line 321)
async (delay) => {
const duration = Temporal.Duration.from(delay);
const durationMs = duration.total("millisecond");
if (durationMs < 1) await serializedPoll(); // ← unhandled
// ...
},A rejection here propagates to the postgres.js LISTEN handler, likely causing an unhandled promise rejection.
3. onsubscribe callback (line 330)
const listen = await this.#sql.listen(
this.#channelName,
async (delay) => { ... },
serializedPoll, // ← initial poll on subscribe, unhandled
);If the initial poll on subscription fails, the error is unhandled.
What can cause poll() to throw?
- Transient PostgreSQL errors (connection timeout, connection pool exhaustion, deadlock)
- Handler errors that propagate through
await handler(row.message)(lines 203, 276) sql.reserve()failure under connection pressure (line 255)
Proposed fix
Wrap serializedPoll() calls with try-catch and log the error instead of letting it propagate:
// Fallback polling loop
while (!signal?.aborted) {
// ... wait for pollInterval ...
try {
await serializedPoll();
} catch (error) {
logger.error(
"Error while polling for messages; will retry on next interval: {error}",
{ error },
);
}
}
// NOTIFY callback
async (delay) => {
const duration = Temporal.Duration.from(delay);
const durationMs = duration.total("millisecond");
if (durationMs < 1) {
try {
await serializedPoll();
} catch (error) {
logger.error(
"Error while polling for messages on notify: {error}",
{ error },
);
}
} else {
// ... (setTimeout path already uses `void` which swallows rejections)
}
},Similarly, the onsubscribe callback (3rd argument to this.#sql.listen()) should also be wrapped.
Additional note: missing index
The initialize() method creates the fedify_message_v2 table without any index on the created column. The poll() dequeue query (ORDER BY created LIMIT 1 FOR UPDATE SKIP LOCKED) performs a sequential scan on the entire table, which degrades significantly as the backlog grows.
Adding an index during initialization would help:
CREATE INDEX IF NOT EXISTS idx_fedify_message_v2_created
ON fedify_message_v2 (created);In our case, adding this index improved the dequeue query from 5.3 ms (Seq Scan, 4,000 rows) to 0.067 ms (Index Scan) — a 79x improvement.
Environment
@fedify/postgres: 2.0.0-dev.395+0d27c2dd- PostgreSQL: 17
- Runtime: Deno 2.6.10
- 3 application instances sharing one
PostgresMessageQueue