Add orderingKey support for message queues#540
Conversation
fedify-dev#538 Co-Authored-By: Claude <noreply@anthropic.com>
Implement orderingKey option in PostgresMessageQueue for ordered message delivery: - Add ordering_key column to the message queue table schema - Store ordering key when enqueuing messages - Modify listen() to skip messages with ordering keys currently being processed, ensuring messages with the same key are processed sequentially - Enable testOrderingKey in PostgresMessageQueue tests - Update documentation table to show PostgresMessageQueue supports ordering keys This allows ensuring related messages are processed in order, such as processing Delete activities after Create activities for the same object. fedify-dev#538 Co-Authored-By: Claude <noreply@anthropic.com>
Implement ordering key support for SqliteMessageQueue to ensure messages with the same ordering key are processed sequentially, while messages with different keys can be processed in parallel. This helps prevent race conditions when processing related activities. Changes: - Added ordering_key column to the message queue table schema - Modified enqueue() and enqueueMany() to store the ordering key - Modified listen() to track which ordering keys are being processed and skip messages with locked ordering keys - Enabled testOrderingKey: true in the test suite - Updated docs/manual/mq.md implementation support table - Updated CHANGES.md fedify-dev#538 Co-Authored-By: Claude <noreply@anthropic.com>
To ensure messages with the same ordering key are processed sequentially, RedisMessageQueue now tracks which keys are being processed and skips messages with locked keys when polling. fedify-dev#538 Co-Authored-By: Claude <noreply@anthropic.com>
Summary of ChangesHello @dahlia, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the message queuing system by introducing robust support for message ordering. By allowing developers to specify an Highlights
Ignored Files
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces orderingKey support across all Fedify message queue implementations, a crucial feature for ensuring FIFO processing of related messages and preventing race conditions. The implementation details vary by backend, leveraging appropriate mechanisms like in-memory queues, distributed KV locks, Redis sorted sets with distributed locks, and RabbitMQ's consistent hash exchange. The changes include updates to interfaces, internal logic for enqueuing and listening, and comprehensive test cases in mq-tester.ts. The documentation in CHANGES.md and docs/manual/mq.md is also updated to reflect this new functionality. While the DenoKvMessageQueue, RedisMessageQueue, AmqpMessageQueue, and InProcessMessageQueue implementations correctly handle ordering, the PostgresMessageQueue and SqliteMessageQueue implementations have a limitation regarding distributed ordering guarantees, as detailed in the specific review comments.
Codecov Report❌ Patch coverage is
🚀 New features to boost your workflow:
|
Co-Authored-By: Claude <noreply@anthropic.com>
Implement ordering key support for DenoKvMessageQueue using KV-based atomic locks. Messages with the same ordering key are guaranteed to be processed sequentially across all listeners. The implementation uses a wrapper format to include ordering key metadata and acquires an atomic lock before processing. If a message's ordering key is already being processed, the message is re-enqueued with a 100ms delay. Locks have a 60 second TTL to prevent deadlocks. Also adds DenoKvMessageQueueOptions interface with orderingLockPrefix option for customizing the lock key prefix. Additionally, ParallelMessageQueue now preserves ordering key semantics. When wrapping a MessageQueue with ParallelMessageQueue, messages with the same ordering key will not be processed concurrently by different workers. This ensures ParallelMessageQueue can be used transparently with any ordering-key-enabled queue implementation. Note: The ordering key test is disabled for DenoKvMessageQueue because Deno KV's listenQueue cannot be restarted after abort. The functionality is implemented and works correctly in production environments. Also fixes formatting in RedisMessageQueue. Co-Authored-By: Claude <noreply@anthropic.com>
Implement ordering key support for AmqpMessageQueue using RabbitMQ's rabbitmq_consistent_hash_exchange plugin. Messages with the same ordering key are routed to the same partition queue, ensuring FIFO processing. The implementation: - Uses x-consistent-hash exchange type for routing by ordering key - Creates configurable number of partition queues with x-single-active-consumer - Supports delayed messages with ordering keys via dead-letter routing - Requires rabbitmq_consistent_hash_exchange plugin (Tier 1, ships with RabbitMQ but not enabled by default) fedify-dev#538 Co-Authored-By: Claude <noreply@anthropic.com>
Configure CI workflow to enable the rabbitmq_consistent_hash_exchange plugin so that AmqpMessageQueue ordering key tests can run. Changes: - Add health check options to RabbitMQ service containers - Add step to enable plugin via docker exec before running tests - Set AMQP_ORDERING_TEST=true environment variable for all test jobs Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Claude <noreply@anthropic.com>
166bdf4 to
3a09cea
Compare
Co-Authored-By: Claude <noreply@anthropic.com>
3a09cea to
4a10caa
Compare
Implement ordering key support for WorkersMessageQueue using Workers KV for distributed lock management: - Add WorkersMessageQueueOptions interface with orderingKv, orderingKeyPrefix, and orderingLockTtl options - Wrap messages with ordering key metadata in enqueue/enqueueMany - Add processMessage() method for lock acquisition/release flow - Messages without ordering key are processed immediately - Best-effort ordering due to Workers KV eventual consistency Also add CHANGES.md entries for AmqpMessageQueue orderingKey support. Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Claude <noreply@anthropic.com>
Replace the in-memory processingKeys Set with PostgreSQL advisory locks for proper distributed ordering key support across multiple workers. The previous implementation only tracked processing keys within a single process, which meant messages with the same ordering key could be processed concurrently by different worker instances. Now uses pg_try_advisory_lock(hashtext(tableName), hashtext(orderingKey)) to acquire distributed locks before processing messages. The lock is released with pg_advisory_unlock after the message handler completes. This ensures that messages with the same ordering key are processed sequentially even in a distributed multi-worker environment. Co-Authored-By: Claude <noreply@anthropic.com>
4a10caa to
afad110
Compare
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces orderingKey support across all Fedify message queue implementations, ensuring messages with the same ordering key are processed in FIFO order. The changes are comprehensive, covering InProcessMessageQueue, PostgresMessageQueue, SqliteMessageQueue, RedisMessageQueue, DenoKvMessageQueue, ParallelMessageQueue, and AmqpMessageQueue. Documentation and testing utilities have been updated to reflect this new functionality. The implementation correctly leverages specific features of each backend for distributed locking and ordering where applicable, such as PostgreSQL advisory locks and Redis SET NX EX commands. The WorkersMessageQueue also includes a processMessage method to handle ordering key locks, acknowledging the eventual consistency nature of Workers KV. Overall, this is a significant and well-executed feature addition.
Add JSDoc to explain how orderingKey behaves with and without delay: - Without delay: routes through consistent hash exchange directly - With delay: routes through delay queue, then to consistent hash exchange fedify-dev#540 (comment) Co-Authored-By: Claude <noreply@anthropic.com>
…eMany() Add JSDoc to explain how orderingKey behaves with and without delay: - Without delay: routes through consistent hash exchange directly - With delay: routes through delay queue, then to consistent hash exchange fedify-dev#540 (comment) Co-Authored-By: Claude <noreply@anthropic.com>
Replace `any` type with `ConsumeMessage | null` from amqplib for the msg parameter in the messageHandler callback. fedify-dev#540 (comment) Co-Authored-By: Claude <noreply@anthropic.com>
Replace `any` type with `WrappedMessage | unknown` for the rawMessage parameter in the wrappedHandler callback. fedify-dev#540 (comment) Co-Authored-By: Claude <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Adds an orderingKey option to MessageQueueEnqueueOptions and implements (or documents) ordered processing semantics across the supported message queue backends, plus CI/test/docs updates to validate and describe the feature.
Changes:
- Extends the core MQ enqueue options with
orderingKeyand updates tests/utilities to optionally validate ordered processing. - Implements ordering-key handling across multiple backends (SQLite/Postgres/Redis/Deno KV/Workers/AMQP) and updates related tests.
- Updates CI (RabbitMQ plugin enablement) and docs/changelog to reflect ordering support.
Reviewed changes
Copilot reviewed 21 out of 21 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| packages/testing/src/mq-tester.ts | Adds optional ordering-key test suite and options type. |
| packages/testing/src/mod.ts | Re-exports TestMessageQueueOptions. |
| packages/sqlite/src/mq.ts | Adds ordering_key column usage and attempts to enforce per-key sequential processing. |
| packages/sqlite/src/mq.test.ts | Enables ordering-key tests for SQLite backend. |
| packages/redis/src/mq.ts | Adds ordering key metadata, per-key distributed locks, and monotonic timestamps. |
| packages/redis/src/mq.test.ts | Enables ordering-key tests for Redis backend. |
| packages/postgres/src/mq.ts | Adds ordering_key column usage and advisory-lock based per-key exclusion. |
| packages/postgres/src/mq.test.ts | Enables ordering-key tests for Postgres backend. |
| packages/fedify/src/federation/mq.ts | Adds orderingKey to the public interface; updates InProcess and Parallel MQ behavior/docs. |
| packages/fedify/src/federation/mq.test.ts | Adds InProcess ordering-key unit test. |
| packages/denokv/src/mod.ts | Wraps queued messages to carry ordering metadata; adds a KV-based lock. |
| packages/denokv/src/mod.test.ts | Adjusts test harness to use a persistent KV file and notes ordering test limitations. |
| packages/denokv/deno.json | Expands test permissions for temp dir / file-based KV tests. |
| packages/cfworkers/src/mod.ts | Wraps messages for ordering metadata; adds processMessage() for KV-based ordering locks. |
| packages/cfworkers/test/mq.test.ts | Updates tests for wrapped message format and adds ordering-related tests. |
| packages/amqp/src/mq.ts | Adds consistent-hash exchange based ordering support with partition queues and delayed routing. |
| packages/amqp/src/mq.test.ts | Adds optional ordering-key tests gated by env var (CI enables plugin). |
| docs/manual/mq.md | Documents orderingKey and backend support table/note. |
| CHANGES.md | Adds changelog entries for new orderingKey support across packages. |
| .github/workflows/main.yaml | Enables RabbitMQ plugin and runs ordering tests in CI. |
| mise.toml | Bumps hongdown tool version. |
Add JSDoc to explain how orderingKey behaves with and without delay: - Without delay: routes through consistent hash exchange directly - With delay: routes through delay queue, then to consistent hash exchange fedify-dev#540 (comment) Co-Authored-By: Claude <noreply@anthropic.com>
…eMany() Add JSDoc to explain how orderingKey behaves with and without delay: - Without delay: routes through consistent hash exchange directly - With delay: routes through delay queue, then to consistent hash exchange fedify-dev#540 (comment) Co-Authored-By: Claude <noreply@anthropic.com>
Replace `any` type with `ConsumeMessage | null` from amqplib for the msg parameter in the messageHandler callback. fedify-dev#540 (comment) Co-Authored-By: Claude <noreply@anthropic.com>
Replace `any` type with `WrappedMessage | unknown` for the rawMessage parameter in the wrappedHandler callback. fedify-dev#540 (comment) Co-Authored-By: Claude <noreply@anthropic.com>
81e77c0 to
d6d3481
Compare
Replace the in-memory processingKeys Set with a database-level lock table
for proper distributed ordering key support across multiple workers.
The previous implementation only tracked processing keys within a single
process, which meant messages with the same ordering key could be
processed concurrently by different worker instances in WAL mode.
Now creates a separate lock table (`{tableName}_locks`) to track which
ordering keys are being processed. The lock is acquired via INSERT OR
IGNORE before processing and released via DELETE after the message
handler completes.
This ensures that messages with the same ordering key are processed
sequentially even in a multi-process environment using WAL mode.
fedify-dev#540 (comment)
Co-Authored-By: Claude <noreply@anthropic.com>
Add ALTER TABLE statement to add the ordering_key column if it doesn't exist. This handles existing deployments that already have the queue table without the ordering_key column, preventing insert failures when upgrading to a version with ordering key support. fedify-dev#540 (comment) Co-Authored-By: Claude <noreply@anthropic.com>
Update the JSDoc comment to clarify that when shouldProcess is false, the caller is responsible for re-enqueuing or retrying, as the processMessage() method does not automatically re-enqueue the message. fedify-dev#540 (comment) Co-Authored-By: Claude <noreply@anthropic.com>
The AmqpOrderingOptions interface was introduced with ordering key support, which is a Fedify 2.0.0 feature. The @SInCE tag incorrectly said 0.4.0. fedify-dev#540 (comment) Co-Authored-By: Claude <noreply@anthropic.com>
When multiple messages are enqueued in the same millisecond, #getMonotonicTimestamp() adds fractional milliseconds (0.001 per message) to maintain ordering. This caused the pub/sub notification check `ts <= now` to fail for the 2nd+ message since their timestamps would be `now + 0.001` or higher. Changed to use `baseTs <= now` instead, so all messages enqueued for immediate processing trigger the notification regardless of the monotonic sequence number added. fedify-dev#540 (comment) Co-Authored-By: Claude <noreply@anthropic.com>
When a message has an orderingKey but delay is explicitly set to
Temporal.Duration.from({ seconds: 0 }), the delay value is 0 (not null).
The previous condition `delay == null` would skip the ordering path,
sending such messages directly to the main queue instead of the
consistent hash exchange.
Changed the condition to `delay == null || delay <= 0` so zero-delay
messages with ordering keys are correctly routed through the ordering
exchange, consistent with how the delayed queue path already handles this.
fedify-dev#540 (comment)
Co-Authored-By: Claude <noreply@anthropic.com>
The previous implementation re-enqueued messages with a 100ms delay when the ordering key lock was held. This could break FIFO order because: 1. Message A (key: "x") is being processed 2. Message B (key: "x") arrives, sees lock, re-enqueues with 100ms delay 3. Message C (key: "x") arrives, sees lock, re-enqueues with 100ms delay 4. Message A finishes 5. C might be processed before B depending on re-enqueue timing Changed to use a sleep/retry loop that keeps the handler blocked until the lock can be acquired. This preserves the original message ordering since each handler waits its turn rather than re-joining the queue. fedify-dev#540 (comment) Co-Authored-By: Claude <noreply@anthropic.com>
ParallelMessageQueue can only detect ordering keys from messages that use the wrapper format with `__fedify_ordering_key__` property. Currently only DenoKvMessageQueue and WorkersMessageQueue use this format. Other queue implementations (InProcessMessageQueue, RedisMessageQueue, PostgresMessageQueue, SqliteMessageQueue, AmqpMessageQueue) deliver raw payloads, so ParallelMessageQueue cannot detect their ordering keys. Those implementations handle ordering guarantees internally instead. Added documentation to both the class-level JSDoc and the #extractOrderingKey method to clarify this limitation. fedify-dev#540 (comment) Co-Authored-By: Claude <noreply@anthropic.com>
Updated the note about ParallelMessageQueue ordering to clarify that it only detects ordering keys from queue implementations that use wrapper format (DenoKvMessageQueue and WorkersMessageQueue). For other implementations, ordering is handled internally by the queue itself. fedify-dev#540 (comment) Co-Authored-By: Claude <noreply@anthropic.com>
This change exposes the orderingKey capability (added in PR fedify-dev#540) through the sendActivity() API, allowing applications to ensure ordered delivery of related activities. fedify-dev#536 Co-Authored-By: Claude <noreply@anthropic.com>
Summary
Implements
orderingKeysupport for all Fedify message queue implementations, ensuring messages with the same ordering key are processed in FIFO order.Closes #538.
Changes
orderingKeyproperty toMessageQueueEnqueueOptionsinterfaceorderingKeysupport for all message queue backends:InProcessMessageQueue- uses separate queues per ordering keyPostgresMessageQueue- uses database column and processing logicSqliteMessageQueue- uses database column and processing logicRedisMessageQueue- uses separate Redis lists per ordering keyDenoKvMessageQueue- uses KV prefix per ordering keyParallelMessageQueue- delegates to underlying queueAmqpMessageQueue- uses RabbitMQ'srabbitmq_consistent_hash_exchangepluginAmqpMessageQueue Implementation Notes
The AMQP implementation uses RabbitMQ's consistent hash exchange plugin (Tier 1, ships with RabbitMQ but not enabled by default):
x-consistent-hashexchangex-single-active-consumer: trueto ensure single consumer per partitionRequires enabling the plugin:
rabbitmq-plugins enable rabbitmq_consistent_hash_exchangeAI Disclosure
This PR was developed with assistance from Claude (claude-opus-4.5).