Skip to content

PostgresMessageQueue.listen() permanently stops processing when poll() throws #581

@dahlia

Description

@dahlia

Summary

When a transient error occurs during poll() in PostgresMessageQueue.listen(), the listener permanently stops processing messages without any recovery mechanism. This causes a growing message backlog that can only be resolved by restarting the application.

Observed behavior

In a production deployment with 3 application instances sharing the same PostgresMessageQueue, one instance stopped processing queued messages ~4 minutes after startup. The instance continued accepting and enqueuing new incoming activities (via POST /ap/inbox → 202), but never consumed any further messages from the queue. The other two instances continued processing normally.

Over ~11 hours, this resulted in a backlog of 4,000+ unprocessed messages in the fedify_message_v2 table. Restarting the affected instance restored processing.

Root cause

There are three code paths in listen() where an unhandled poll() rejection permanently kills the listener:

1. Fallback polling loop (line 348)

while (!signal?.aborted) {
  // ... wait for pollInterval ...
  await serializedPoll();  // ← unhandled: rejects listen()
}

If serializedPoll() rejects, the error propagates out of the while loop, causing the listen() promise to reject. The listener never polls again.

2. NOTIFY callback for immediate messages (line 321)

async (delay) => {
  const duration = Temporal.Duration.from(delay);
  const durationMs = duration.total("millisecond");
  if (durationMs < 1) await serializedPoll();  // ← unhandled
  // ...
},

A rejection here propagates to the postgres.js LISTEN handler, likely causing an unhandled promise rejection.

3. onsubscribe callback (line 330)

const listen = await this.#sql.listen(
  this.#channelName,
  async (delay) => { ... },
  serializedPoll,  // ← initial poll on subscribe, unhandled
);

If the initial poll on subscription fails, the error is unhandled.

What can cause poll() to throw?

  • Transient PostgreSQL errors (connection timeout, connection pool exhaustion, deadlock)
  • Handler errors that propagate through await handler(row.message) (lines 203, 276)
  • sql.reserve() failure under connection pressure (line 255)

Proposed fix

Wrap serializedPoll() calls with try-catch and log the error instead of letting it propagate:

// Fallback polling loop
while (!signal?.aborted) {
  // ... wait for pollInterval ...
  try {
    await serializedPoll();
  } catch (error) {
    logger.error(
      "Error while polling for messages; will retry on next interval: {error}",
      { error },
    );
  }
}

// NOTIFY callback
async (delay) => {
  const duration = Temporal.Duration.from(delay);
  const durationMs = duration.total("millisecond");
  if (durationMs < 1) {
    try {
      await serializedPoll();
    } catch (error) {
      logger.error(
        "Error while polling for messages on notify: {error}",
        { error },
      );
    }
  } else {
    // ... (setTimeout path already uses `void` which swallows rejections)
  }
},

Similarly, the onsubscribe callback (3rd argument to this.#sql.listen()) should also be wrapped.

Additional note: missing index

The initialize() method creates the fedify_message_v2 table without any index on the created column. The poll() dequeue query (ORDER BY created LIMIT 1 FOR UPDATE SKIP LOCKED) performs a sequential scan on the entire table, which degrades significantly as the backlog grows.

Adding an index during initialization would help:

CREATE INDEX IF NOT EXISTS idx_fedify_message_v2_created
  ON fedify_message_v2 (created);

In our case, adding this index improved the dequeue query from 5.3 ms (Seq Scan, 4,000 rows) to 0.067 ms (Index Scan) — a 79x improvement.

Environment

  • @fedify/postgres: 2.0.0-dev.395+0d27c2dd
  • PostgreSQL: 17
  • Runtime: Deno 2.6.10
  • 3 application instances sharing one PostgresMessageQueue

Metadata

Metadata

Assignees

Type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions