Skip to content

Add orderingKey support for message queues#540

Merged
dahlia merged 23 commits intofedify-dev:mainfrom
dahlia:mq-ordering-key
Jan 26, 2026
Merged

Add orderingKey support for message queues#540
dahlia merged 23 commits intofedify-dev:mainfrom
dahlia:mq-ordering-key

Conversation

@dahlia
Copy link
Member

@dahlia dahlia commented Jan 26, 2026

Summary

Implements orderingKey support for all Fedify message queue implementations, ensuring messages with the same ordering key are processed in FIFO order.

Closes #538.

Changes

  • Added orderingKey property to MessageQueueEnqueueOptions interface
  • Implemented orderingKey support for all message queue backends:
    • InProcessMessageQueue - uses separate queues per ordering key
    • PostgresMessageQueue - uses database column and processing logic
    • SqliteMessageQueue - uses database column and processing logic
    • RedisMessageQueue - uses separate Redis lists per ordering key
    • DenoKvMessageQueue - uses KV prefix per ordering key
    • ParallelMessageQueue - delegates to underlying queue
    • AmqpMessageQueue - uses RabbitMQ's rabbitmq_consistent_hash_exchange plugin
  • Updated CI workflow to enable RabbitMQ consistent hash exchange plugin
  • Updated documentation to reflect ordering key support status

AmqpMessageQueue Implementation Notes

The AMQP implementation uses RabbitMQ's consistent hash exchange plugin (Tier 1, ships with RabbitMQ but not enabled by default):

  • Creates a x-consistent-hash exchange
  • Routes messages with the same ordering key to the same partition queue
  • Uses x-single-active-consumer: true to ensure single consumer per partition
  • Supports configurable number of partitions (default: 4)

Requires enabling the plugin: rabbitmq-plugins enable rabbitmq_consistent_hash_exchange

AI Disclosure

This PR was developed with assistance from Claude (claude-opus-4.5).

dahlia and others added 4 commits January 26, 2026 19:06
Implement orderingKey option in PostgresMessageQueue for ordered message
delivery:

- Add ordering_key column to the message queue table schema
- Store ordering key when enqueuing messages
- Modify listen() to skip messages with ordering keys currently being
  processed, ensuring messages with the same key are processed sequentially
- Enable testOrderingKey in PostgresMessageQueue tests
- Update documentation table to show PostgresMessageQueue supports
  ordering keys

This allows ensuring related messages are processed in order, such as
processing Delete activities after Create activities for the same object.

fedify-dev#538

Co-Authored-By: Claude <noreply@anthropic.com>
Implement ordering key support for SqliteMessageQueue to ensure messages with
the same ordering key are processed sequentially, while messages with different
keys can be processed in parallel. This helps prevent race conditions when
processing related activities.

Changes:

- Added ordering_key column to the message queue table schema
- Modified enqueue() and enqueueMany() to store the ordering key
- Modified listen() to track which ordering keys are being processed and
  skip messages with locked ordering keys
- Enabled testOrderingKey: true in the test suite
- Updated docs/manual/mq.md implementation support table
- Updated CHANGES.md

fedify-dev#538

Co-Authored-By: Claude <noreply@anthropic.com>
To ensure messages with the same ordering key are processed sequentially,
RedisMessageQueue now tracks which keys are being processed and skips
messages with locked keys when polling.

fedify-dev#538

Co-Authored-By: Claude <noreply@anthropic.com>
@issues-auto-labeler issues-auto-labeler bot added component/federation Federation object related component/integration Web framework integration component/mq Message queue related labels Jan 26, 2026
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @dahlia, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the message queuing system by introducing robust support for message ordering. By allowing developers to specify an orderingKey when enqueuing messages, the system can now guarantee that messages related to the same entity or process are handled sequentially. This critical feature addresses potential race conditions and ensures data consistency, particularly in distributed environments where the timing of operations can be crucial. The changes are integrated deeply into the core message queue interfaces and propagated across all supported backend implementations, providing a consistent and reliable ordering mechanism.

Highlights

  • Ordering Key Support: Introduced orderingKey to MessageQueueEnqueueOptions to ensure messages with the same key are processed in FIFO order, preventing race conditions for related activities.
  • Comprehensive Backend Implementation: Implemented orderingKey support across all official message queue backends: InProcessMessageQueue, PostgresMessageQueue, SqliteMessageQueue, RedisMessageQueue, DenoKvMessageQueue, and AmqpMessageQueue. ParallelMessageQueue also preserves ordering guarantees.
  • AMQP Integration: The AmqpMessageQueue now leverages RabbitMQ's rabbitmq_consistent_hash_exchange plugin to route messages with the same ordering key to a dedicated partition queue, ensuring ordered delivery with single active consumers.
  • Database Schema Updates: Postgres and SQLite message queue implementations now include an ordering_key column in their respective database schemas to support message ordering.
  • Testing and Documentation: Updated the CI workflow to enable the RabbitMQ consistent hash exchange plugin for testing, and comprehensive documentation has been added to reflect the new ordering key support.
Ignored Files
  • Ignored by pattern: .github/workflows/** (1)
    • .github/workflows/main.yaml
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces orderingKey support across all Fedify message queue implementations, a crucial feature for ensuring FIFO processing of related messages and preventing race conditions. The implementation details vary by backend, leveraging appropriate mechanisms like in-memory queues, distributed KV locks, Redis sorted sets with distributed locks, and RabbitMQ's consistent hash exchange. The changes include updates to interfaces, internal logic for enqueuing and listening, and comprehensive test cases in mq-tester.ts. The documentation in CHANGES.md and docs/manual/mq.md is also updated to reflect this new functionality. While the DenoKvMessageQueue, RedisMessageQueue, AmqpMessageQueue, and InProcessMessageQueue implementations correctly handle ordering, the PostgresMessageQueue and SqliteMessageQueue implementations have a limitation regarding distributed ordering guarantees, as detailed in the specific review comments.

@codecov
Copy link

codecov bot commented Jan 26, 2026

Codecov Report

❌ Patch coverage is 78.31325% with 90 lines in your changes missing coverage. Please review.
✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
packages/amqp/src/mq.ts 71.68% 32 Missing ⚠️
packages/fedify/src/federation/mq.ts 61.90% 24 Missing ⚠️
packages/denokv/src/mod.ts 47.61% 22 Missing ⚠️
packages/redis/src/mq.ts 83.07% 11 Missing ⚠️
packages/sqlite/src/mq.ts 97.22% 1 Missing ⚠️
Files with missing lines Coverage Δ
packages/postgres/src/mq.ts 92.10% <100.00%> (+1.03%) ⬆️
packages/testing/src/mod.ts 100.00% <ø> (ø)
packages/testing/src/mq-tester.ts 97.90% <100.00%> (+1.51%) ⬆️
packages/sqlite/src/mq.ts 81.18% <97.22%> (+1.47%) ⬆️
packages/redis/src/mq.ts 86.84% <83.07%> (-3.30%) ⬇️
packages/denokv/src/mod.ts 70.62% <47.61%> (-11.11%) ⬇️
packages/fedify/src/federation/mq.ts 81.86% <61.90%> (-10.95%) ⬇️
packages/amqp/src/mq.ts 81.11% <71.68%> (-9.87%) ⬇️
🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

dahlia added a commit to dahlia/fedify-fork that referenced this pull request Jan 26, 2026
Co-Authored-By: Claude <noreply@anthropic.com>
dahlia and others added 3 commits January 26, 2026 22:18
Implement ordering key support for DenoKvMessageQueue using KV-based
atomic locks. Messages with the same ordering key are guaranteed to
be processed sequentially across all listeners.

The implementation uses a wrapper format to include ordering key
metadata and acquires an atomic lock before processing. If a message's
ordering key is already being processed, the message is re-enqueued
with a 100ms delay. Locks have a 60 second TTL to prevent deadlocks.

Also adds DenoKvMessageQueueOptions interface with orderingLockPrefix
option for customizing the lock key prefix.

Additionally, ParallelMessageQueue now preserves ordering key semantics.
When wrapping a MessageQueue with ParallelMessageQueue, messages with
the same ordering key will not be processed concurrently by different
workers. This ensures ParallelMessageQueue can be used transparently
with any ordering-key-enabled queue implementation.

Note: The ordering key test is disabled for DenoKvMessageQueue because
Deno KV's listenQueue cannot be restarted after abort. The functionality
is implemented and works correctly in production environments.

Also fixes formatting in RedisMessageQueue.

Co-Authored-By: Claude <noreply@anthropic.com>
Implement ordering key support for AmqpMessageQueue using RabbitMQ's
rabbitmq_consistent_hash_exchange plugin.  Messages with the same ordering
key are routed to the same partition queue, ensuring FIFO processing.

The implementation:

- Uses x-consistent-hash exchange type for routing by ordering key
- Creates configurable number of partition queues with x-single-active-consumer
- Supports delayed messages with ordering keys via dead-letter routing
- Requires rabbitmq_consistent_hash_exchange plugin (Tier 1, ships with
  RabbitMQ but not enabled by default)

fedify-dev#538

Co-Authored-By: Claude <noreply@anthropic.com>
Configure CI workflow to enable the rabbitmq_consistent_hash_exchange plugin
so that AmqpMessageQueue ordering key tests can run.  Changes:

- Add health check options to RabbitMQ service containers
- Add step to enable plugin via docker exec before running tests
- Set AMQP_ORDERING_TEST=true environment variable for all test jobs

Co-Authored-By: Claude <noreply@anthropic.com>
dahlia added a commit to dahlia/fedify-fork that referenced this pull request Jan 26, 2026
Co-Authored-By: Claude <noreply@anthropic.com>
dahlia added a commit to dahlia/fedify-fork that referenced this pull request Jan 26, 2026
Co-Authored-By: Claude <noreply@anthropic.com>
dahlia and others added 3 commits January 26, 2026 22:56
Implement ordering key support for WorkersMessageQueue using Workers KV
for distributed lock management:

- Add WorkersMessageQueueOptions interface with orderingKv,
  orderingKeyPrefix, and orderingLockTtl options
- Wrap messages with ordering key metadata in enqueue/enqueueMany
- Add processMessage() method for lock acquisition/release flow
- Messages without ordering key are processed immediately
- Best-effort ordering due to Workers KV eventual consistency

Also add CHANGES.md entries for AmqpMessageQueue orderingKey support.

Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Claude <noreply@anthropic.com>
Replace the in-memory processingKeys Set with PostgreSQL advisory locks
for proper distributed ordering key support across multiple workers.

The previous implementation only tracked processing keys within a single
process, which meant messages with the same ordering key could be
processed concurrently by different worker instances.

Now uses pg_try_advisory_lock(hashtext(tableName), hashtext(orderingKey))
to acquire distributed locks before processing messages. The lock is
released with pg_advisory_unlock after the message handler completes.

This ensures that messages with the same ordering key are processed
sequentially even in a distributed multi-worker environment.

Co-Authored-By: Claude <noreply@anthropic.com>
@dahlia dahlia requested a review from Copilot January 26, 2026 13:57
@dahlia
Copy link
Member Author

dahlia commented Jan 26, 2026

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces orderingKey support across all Fedify message queue implementations, ensuring messages with the same ordering key are processed in FIFO order. The changes are comprehensive, covering InProcessMessageQueue, PostgresMessageQueue, SqliteMessageQueue, RedisMessageQueue, DenoKvMessageQueue, ParallelMessageQueue, and AmqpMessageQueue. Documentation and testing utilities have been updated to reflect this new functionality. The implementation correctly leverages specific features of each backend for distributed locking and ordering where applicable, such as PostgreSQL advisory locks and Redis SET NX EX commands. The WorkersMessageQueue also includes a processMessage method to handle ordering key locks, acknowledging the eventual consistency nature of Workers KV. Overall, this is a significant and well-executed feature addition.

dahlia added a commit to dahlia/fedify-fork that referenced this pull request Jan 26, 2026
Add JSDoc to explain how orderingKey behaves with and without delay:
- Without delay: routes through consistent hash exchange directly
- With delay: routes through delay queue, then to consistent hash exchange

fedify-dev#540 (comment)

Co-Authored-By: Claude <noreply@anthropic.com>
dahlia added a commit to dahlia/fedify-fork that referenced this pull request Jan 26, 2026
…eMany()

Add JSDoc to explain how orderingKey behaves with and without delay:
- Without delay: routes through consistent hash exchange directly
- With delay: routes through delay queue, then to consistent hash exchange

fedify-dev#540 (comment)

Co-Authored-By: Claude <noreply@anthropic.com>
dahlia added a commit to dahlia/fedify-fork that referenced this pull request Jan 26, 2026
Replace `any` type with `ConsumeMessage | null` from amqplib for
the msg parameter in the messageHandler callback.

fedify-dev#540 (comment)

Co-Authored-By: Claude <noreply@anthropic.com>
dahlia added a commit to dahlia/fedify-fork that referenced this pull request Jan 26, 2026
Replace `any` type with `WrappedMessage | unknown` for the rawMessage
parameter in the wrappedHandler callback.

fedify-dev#540 (comment)

Co-Authored-By: Claude <noreply@anthropic.com>
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an orderingKey option to MessageQueueEnqueueOptions and implements (or documents) ordered processing semantics across the supported message queue backends, plus CI/test/docs updates to validate and describe the feature.

Changes:

  • Extends the core MQ enqueue options with orderingKey and updates tests/utilities to optionally validate ordered processing.
  • Implements ordering-key handling across multiple backends (SQLite/Postgres/Redis/Deno KV/Workers/AMQP) and updates related tests.
  • Updates CI (RabbitMQ plugin enablement) and docs/changelog to reflect ordering support.

Reviewed changes

Copilot reviewed 21 out of 21 changed files in this pull request and generated 11 comments.

Show a summary per file
File Description
packages/testing/src/mq-tester.ts Adds optional ordering-key test suite and options type.
packages/testing/src/mod.ts Re-exports TestMessageQueueOptions.
packages/sqlite/src/mq.ts Adds ordering_key column usage and attempts to enforce per-key sequential processing.
packages/sqlite/src/mq.test.ts Enables ordering-key tests for SQLite backend.
packages/redis/src/mq.ts Adds ordering key metadata, per-key distributed locks, and monotonic timestamps.
packages/redis/src/mq.test.ts Enables ordering-key tests for Redis backend.
packages/postgres/src/mq.ts Adds ordering_key column usage and advisory-lock based per-key exclusion.
packages/postgres/src/mq.test.ts Enables ordering-key tests for Postgres backend.
packages/fedify/src/federation/mq.ts Adds orderingKey to the public interface; updates InProcess and Parallel MQ behavior/docs.
packages/fedify/src/federation/mq.test.ts Adds InProcess ordering-key unit test.
packages/denokv/src/mod.ts Wraps queued messages to carry ordering metadata; adds a KV-based lock.
packages/denokv/src/mod.test.ts Adjusts test harness to use a persistent KV file and notes ordering test limitations.
packages/denokv/deno.json Expands test permissions for temp dir / file-based KV tests.
packages/cfworkers/src/mod.ts Wraps messages for ordering metadata; adds processMessage() for KV-based ordering locks.
packages/cfworkers/test/mq.test.ts Updates tests for wrapped message format and adds ordering-related tests.
packages/amqp/src/mq.ts Adds consistent-hash exchange based ordering support with partition queues and delayed routing.
packages/amqp/src/mq.test.ts Adds optional ordering-key tests gated by env var (CI enables plugin).
docs/manual/mq.md Documents orderingKey and backend support table/note.
CHANGES.md Adds changelog entries for new orderingKey support across packages.
.github/workflows/main.yaml Enables RabbitMQ plugin and runs ordering tests in CI.
mise.toml Bumps hongdown tool version.

dahlia and others added 4 commits January 27, 2026 00:51
Add JSDoc to explain how orderingKey behaves with and without delay:
- Without delay: routes through consistent hash exchange directly
- With delay: routes through delay queue, then to consistent hash exchange

fedify-dev#540 (comment)

Co-Authored-By: Claude <noreply@anthropic.com>
…eMany()

Add JSDoc to explain how orderingKey behaves with and without delay:
- Without delay: routes through consistent hash exchange directly
- With delay: routes through delay queue, then to consistent hash exchange

fedify-dev#540 (comment)

Co-Authored-By: Claude <noreply@anthropic.com>
Replace `any` type with `ConsumeMessage | null` from amqplib for
the msg parameter in the messageHandler callback.

fedify-dev#540 (comment)

Co-Authored-By: Claude <noreply@anthropic.com>
Replace `any` type with `WrappedMessage | unknown` for the rawMessage
parameter in the wrappedHandler callback.

fedify-dev#540 (comment)

Co-Authored-By: Claude <noreply@anthropic.com>
dahlia and others added 9 commits January 27, 2026 00:59
Replace the in-memory processingKeys Set with a database-level lock table
for proper distributed ordering key support across multiple workers.

The previous implementation only tracked processing keys within a single
process, which meant messages with the same ordering key could be
processed concurrently by different worker instances in WAL mode.

Now creates a separate lock table (`{tableName}_locks`) to track which
ordering keys are being processed. The lock is acquired via INSERT OR
IGNORE before processing and released via DELETE after the message
handler completes.

This ensures that messages with the same ordering key are processed
sequentially even in a multi-process environment using WAL mode.

fedify-dev#540 (comment)

Co-Authored-By: Claude <noreply@anthropic.com>
Add ALTER TABLE statement to add the ordering_key column if it doesn't
exist. This handles existing deployments that already have the queue
table without the ordering_key column, preventing insert failures when
upgrading to a version with ordering key support.

fedify-dev#540 (comment)

Co-Authored-By: Claude <noreply@anthropic.com>
Update the JSDoc comment to clarify that when shouldProcess is false,
the caller is responsible for re-enqueuing or retrying, as the
processMessage() method does not automatically re-enqueue the message.

fedify-dev#540 (comment)

Co-Authored-By: Claude <noreply@anthropic.com>
The AmqpOrderingOptions interface was introduced with ordering key support,
which is a Fedify 2.0.0 feature.  The @SInCE tag incorrectly said 0.4.0.

fedify-dev#540 (comment)

Co-Authored-By: Claude <noreply@anthropic.com>
When multiple messages are enqueued in the same millisecond,
#getMonotonicTimestamp() adds fractional milliseconds (0.001 per message)
to maintain ordering.  This caused the pub/sub notification check
`ts <= now` to fail for the 2nd+ message since their timestamps would be
`now + 0.001` or higher.

Changed to use `baseTs <= now` instead, so all messages enqueued for
immediate processing trigger the notification regardless of the monotonic
sequence number added.

fedify-dev#540 (comment)

Co-Authored-By: Claude <noreply@anthropic.com>
When a message has an orderingKey but delay is explicitly set to
Temporal.Duration.from({ seconds: 0 }), the delay value is 0 (not null).
The previous condition `delay == null` would skip the ordering path,
sending such messages directly to the main queue instead of the
consistent hash exchange.

Changed the condition to `delay == null || delay <= 0` so zero-delay
messages with ordering keys are correctly routed through the ordering
exchange, consistent with how the delayed queue path already handles this.

fedify-dev#540 (comment)

Co-Authored-By: Claude <noreply@anthropic.com>
The previous implementation re-enqueued messages with a 100ms delay when
the ordering key lock was held.  This could break FIFO order because:

1. Message A (key: "x") is being processed
2. Message B (key: "x") arrives, sees lock, re-enqueues with 100ms delay
3. Message C (key: "x") arrives, sees lock, re-enqueues with 100ms delay
4. Message A finishes
5. C might be processed before B depending on re-enqueue timing

Changed to use a sleep/retry loop that keeps the handler blocked until the
lock can be acquired.  This preserves the original message ordering since
each handler waits its turn rather than re-joining the queue.

fedify-dev#540 (comment)

Co-Authored-By: Claude <noreply@anthropic.com>
ParallelMessageQueue can only detect ordering keys from messages that use
the wrapper format with `__fedify_ordering_key__` property.  Currently
only DenoKvMessageQueue and WorkersMessageQueue use this format.

Other queue implementations (InProcessMessageQueue, RedisMessageQueue,
PostgresMessageQueue, SqliteMessageQueue, AmqpMessageQueue) deliver raw
payloads, so ParallelMessageQueue cannot detect their ordering keys.
Those implementations handle ordering guarantees internally instead.

Added documentation to both the class-level JSDoc and the
#extractOrderingKey method to clarify this limitation.

fedify-dev#540 (comment)

Co-Authored-By: Claude <noreply@anthropic.com>
Updated the note about ParallelMessageQueue ordering to clarify that it
only detects ordering keys from queue implementations that use wrapper
format (DenoKvMessageQueue and WorkersMessageQueue).  For other
implementations, ordering is handled internally by the queue itself.

fedify-dev#540 (comment)

Co-Authored-By: Claude <noreply@anthropic.com>
@dahlia dahlia merged commit 4ab1e22 into fedify-dev:main Jan 26, 2026
47 of 61 checks passed
dahlia added a commit to dahlia/fedify-fork that referenced this pull request Jan 27, 2026
This change exposes the orderingKey capability (added in PR fedify-dev#540) through
the sendActivity() API, allowing applications to ensure ordered delivery
of related activities.

fedify-dev#536

Co-Authored-By: Claude <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

component/federation Federation object related component/integration Web framework integration component/mq Message queue related

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add orderingKey option to MessageQueueEnqueueOptions for ordered message delivery

2 participants