Conversation
89003b4 to
0720342
Compare
46c30f4 to
59d889f
Compare
6aa7ef0 to
45121a9
Compare
214b7f1 to
b311157
Compare
18b590b to
dfdce0d
Compare
20164d8 to
79af645
Compare
Implement configurable delayed message retry for quorum queues. When
messages are returned (via reject, nack, or modify), they can be held
in a delayed queue before becoming available again, implementing
exponential backoff based on delivery count.
Configuration options (via queue arguments or policy):
- type: disabled | all | failed | returned
- all: delay all returned messages
- failed: delay only messages with incremented delivery_count
- returned: delay only messages without incremented delivery_count
- min_delay: minimum delay in milliseconds
- max_delay: maximum delay (delay = min(min_delay * delivery_count, max_delay))
Key implementation details:
- Delayed messages stored in a gb_tree keyed by (ready_at_timestamp, raft_index)
- Next ready message cached for O(1) access during checkout
- Delayed messages count towards queue limits (max_length, max_bytes)
but not towards messages_ready count
- Timer-based promotion of ready messages to returns queue via expire_msgs
- Monotonic timestamps used to prevent message reordering when Ra
timestamps go backwards
Additional features:
- Support for x-opt-delivery-time annotation to specify explicit future
delivery time
- Support for x-opt-deferral-token annotation to identify deferred messages
- New delayed_cmd command with operations:
- {retry, all | N}: immediately retry delayed messages
- {assign_deferred, ConsumerKey, Tokens}: assign specific deferred
messages to a consumer by token
- Client APIs: retry_delayed/2 and assign_deferred/3 in rabbit_fifo_client,
retry_delayed/2 and assign_deferred/4 in rabbit_quorum_queue
Includes unit tests in rabbit_fifo_SUITE and integration tests in
quorum_queue_SUITE.
Co-authored-by: Cursor <cursoragent@cursor.com>
And display delayed message counts and delayed retry configurations.
`DiscBytes` stands for "discarded bytes" which was for the old field name `discarded_bytes` before it got renamed to `reclaimable_bytes`.
Previously, when a quorum queue consumer timed out, RabbitMQ detached the AMQP 1.0 link with an amqp:resource-limit-exceeded error. This is a harsh response: the client loses the link and must re-attach to continue consuming. Instead, follow the AMQP 1.0 spec [2.6.12] which allows the source to spontaneously transition a delivery to a terminal state. When a consumer times out, RabbitMQ now sends DISPOSITION(role=sender, state=released, settled=false) for the timed-out deliveries. The client is expected to confirm by sending DISPOSITION(role=receiver, settled=true) back. The Erlang AMQP 1.0 client (amqp10_client) is updated to handle this new DISPOSITION(role=sender) frame: it removes the affected delivery IDs from the link's incoming_unsettled map, auto-settles by replying with DISPOSITION(role=receiver, settled=true) and notifies the application. On the broker side, when the client settles back with the released outcome, the normal requeue path (rabbit_queue_type:settle/5 with requeue op) is followed. To make this work end-to-end, rabbit_fifo's return() function now calls maybe_untimeout/2, clearing the consumer's timeout state so it can receive messages again. Previously maybe_untimeout was only called from complete_and_checkout (the settle/complete path), meaning consumers could never recover from a timeout via the requeue/return path. The integration test verifies the full flow: 1. Messages are sent and received but not settled 2. Consumer timeout fires, client receives the released disposition 3. Client lib auto-settles, consumer timeout state is cleared 4. Timed-out messages are redelivered, new messages are received 5. Redelivered messages have correct headers (delivery_count=0, first_acquirer=false, x-opt-acquired-count=1)
Log a warning once per outgoing link when a consumer timeout fires, including the connection name, link name, handle, AMQP container ID, and queue name. The warning is emitted only on the first timeout for each link to avoid log noise. Expose the consumer timeout state in the Management UI by adding a "Consumer timeout" boolean column to the outgoing links table. The cell is highlighted with a yellow background when true, consistent with how other notable conditions (e.g. zero link credit) are displayed. A help icon explains the field.
x-acquired-count is more symmetry to x-delivery-count. (Renaming x-delivery-count would be a breaking change.)
This is for consistency with the other stashed_* fields instead and not really for performance.
Snapshot frequency was too high for shallow, fast-flowing queues because the reclaimable-bytes threshold was proportional to the (small) approximate snapshot size. With many queues sharing a WAL, this resulted in multiple snapshots per queue per WAL cycle. Scale the minimum reclaimable threshold inversely with the WAL fill ratio so that snapshots are deferred until the WAL is sufficiently full, yielding roughly one snapshot per queue per WAL cycle.
This is a very small space-saving measure. `{ReadyAtMillis, MsgIdx}`
becomes `[ReadyAtMillis | MsgIdx]` for the `#delayed{}` state, saving a
small number of bytes per key when serialized. This optimised tuple type
is already used in a few other places in `rabbit_fifo` for a similar
space-saving effect.
`gb_trees:size/1` is constant time - the size is stored at the top-level
tuple.
-doc "Returns the number of nodes in `Tree`.".
-spec size(Tree) -> non_neg_integer() when
Tree :: tree().
size({Size, _}) when is_integer(Size), Size >= 0 ->
Size.
So we don't need to manually track the tree size with a len field. This
change removes the field.
rabbit_fifo: Use optimised tuple for delayed keys
ansd
approved these changes
Mar 9, 2026
acogoluegnes
added a commit
to rabbitmq/rabbitmq-jms-client
that referenced
this pull request
Mar 13, 2026
For delivery count JMS property, instead of x-delivery-count header. x-acquired-count is the one to use as of RabbitMQ 4.3. References rabbitmq/rabbitmq-server#13885
This was referenced Mar 13, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
PR to test Ra v3 changes
Snapshotting
The state machine needs to start tracking the approximate size of all commands that do not need to be kept (anything but enqueues effective). After this it should be as simple as requesting a snapshot every so many reclaimed bytes. This number may be scaled according to the number of messages on queue as this affects the snapshot size.
1M messages creates snapshots in the region of 30MB. We probably dont want to create a snapshot until at least twice the snapshot size can be potentially reclaimed. Num msgs * 64 bytes of reclaimable command data.