Skip to content

Ra v3#13885

Merged
kjnilsson merged 80 commits intomainfrom
ra-v3
Mar 10, 2026
Merged

Ra v3#13885
kjnilsson merged 80 commits intomainfrom
ra-v3

Conversation

@kjnilsson
Copy link
Copy Markdown
Contributor

@kjnilsson kjnilsson commented May 12, 2025

PR to test Ra v3 changes

  • Integrate Ra v3 from Ra branch
  • Use snapshotting instead of checkpointing
  • Refine snapshotting method
  • create new QQ modules and refactor / tidy up
  • Optimise per message memory use
  • Experiement with ways to trigger major compactions (Also a Ra task)
  • Only return messages for "suspected down" consumers after a configurable timeout.
  • Create new modules for QQ v8
  • Enable unlimited returns by basing delivery limit on deliver count instead of acquired count
  • Strict priorities
  • Refactor how expiration is done with priorities.
  • Consumer timeouts in queue
  • Delayed retries in queue
  • Purge discarded messages

Snapshotting

The state machine needs to start tracking the approximate size of all commands that do not need to be kept (anything but enqueues effective). After this it should be as simple as requesting a snapshot every so many reclaimed bytes. This number may be scaled according to the number of messages on queue as this affects the snapshot size.

1M messages creates snapshots in the region of 30MB. We probably dont want to create a snapshot until at least twice the snapshot size can be potentially reclaimed. Num msgs * 64 bytes of reclaimable command data.

@mergify mergify bot added the make label May 12, 2025
@kjnilsson kjnilsson force-pushed the ra-v3 branch 2 times, most recently from 89003b4 to 0720342 Compare September 5, 2025 14:51
@kjnilsson kjnilsson force-pushed the ra-v3 branch 5 times, most recently from 46c30f4 to 59d889f Compare September 19, 2025 11:52
@kjnilsson kjnilsson force-pushed the ra-v3 branch 6 times, most recently from 6aa7ef0 to 45121a9 Compare September 25, 2025 09:37
@kjnilsson kjnilsson force-pushed the ra-v3 branch 5 times, most recently from 214b7f1 to b311157 Compare October 10, 2025 12:32
@kjnilsson kjnilsson force-pushed the ra-v3 branch 6 times, most recently from 18b590b to dfdce0d Compare November 7, 2025 16:26
@kjnilsson kjnilsson force-pushed the ra-v3 branch 4 times, most recently from 20164d8 to 79af645 Compare November 11, 2025 14:36
kjnilsson and others added 17 commits March 6, 2026 14:11
Implement configurable delayed message retry for quorum queues. When
messages are returned (via reject, nack, or modify), they can be held
in a delayed queue before becoming available again, implementing
exponential backoff based on delivery count.

Configuration options (via queue arguments or policy):
- type: disabled | all | failed | returned
  - all: delay all returned messages
  - failed: delay only messages with incremented delivery_count
  - returned: delay only messages without incremented delivery_count
- min_delay: minimum delay in milliseconds
- max_delay: maximum delay (delay = min(min_delay * delivery_count, max_delay))

Key implementation details:
- Delayed messages stored in a gb_tree keyed by (ready_at_timestamp, raft_index)
- Next ready message cached for O(1) access during checkout
- Delayed messages count towards queue limits (max_length, max_bytes)
  but not towards messages_ready count
- Timer-based promotion of ready messages to returns queue via expire_msgs
- Monotonic timestamps used to prevent message reordering when Ra
  timestamps go backwards

Additional features:
- Support for x-opt-delivery-time annotation to specify explicit future
  delivery time
- Support for x-opt-deferral-token annotation to identify deferred messages
- New delayed_cmd command with operations:
  - {retry, all | N}: immediately retry delayed messages
  - {assign_deferred, ConsumerKey, Tokens}: assign specific deferred
    messages to a consumer by token
- Client APIs: retry_delayed/2 and assign_deferred/3 in rabbit_fifo_client,
  retry_delayed/2 and assign_deferred/4 in rabbit_quorum_queue

Includes unit tests in rabbit_fifo_SUITE and integration tests in
quorum_queue_SUITE.

Co-authored-by: Cursor <cursoragent@cursor.com>
And display delayed message counts and delayed retry configurations.
`DiscBytes` stands for "discarded bytes" which was for the old field
name `discarded_bytes` before it got renamed to `reclaimable_bytes`.
Previously, when a quorum queue consumer timed out, RabbitMQ detached
the AMQP 1.0 link with an amqp:resource-limit-exceeded error. This
is a harsh response: the client loses the link and must re-attach to
continue consuming.

Instead, follow the AMQP 1.0 spec [2.6.12] which allows the source
to spontaneously transition a delivery to a terminal state. When a
consumer times out, RabbitMQ now sends DISPOSITION(role=sender,
state=released, settled=false) for the timed-out deliveries. The
client is expected to confirm by sending DISPOSITION(role=receiver,
settled=true) back.

The Erlang AMQP 1.0 client (amqp10_client) is updated to handle
this new DISPOSITION(role=sender) frame: it removes the affected
delivery IDs from the link's incoming_unsettled map, auto-settles
by replying with DISPOSITION(role=receiver, settled=true) and notifies
the application.

On the broker side, when the client settles back with the released
outcome, the normal requeue path (rabbit_queue_type:settle/5 with
requeue op) is followed. To make this work end-to-end, rabbit_fifo's
return() function now calls maybe_untimeout/2, clearing the
consumer's timeout state so it can receive messages again. Previously
maybe_untimeout was only called from complete_and_checkout (the
settle/complete path), meaning consumers could never recover from a
timeout via the requeue/return path.

The integration test verifies the full flow:
1. Messages are sent and received but not settled
2. Consumer timeout fires, client receives the released disposition
3. Client lib auto-settles, consumer timeout state is cleared
4. Timed-out messages are redelivered, new messages are received
5. Redelivered messages have correct headers (delivery_count=0,
   first_acquirer=false, x-opt-acquired-count=1)
Log a warning once per outgoing link when a consumer timeout fires,
including the connection name, link name, handle, AMQP container ID,
and queue name. The warning is emitted only on the first timeout for
each link to avoid log noise.

Expose the consumer timeout state in the Management UI by adding a
"Consumer timeout" boolean column to the outgoing links table. The
cell is highlighted with a yellow background when true, consistent
with how other notable conditions (e.g. zero link credit) are
displayed. A help icon explains the field.
x-acquired-count is more symmetry to x-delivery-count.
(Renaming x-delivery-count would be a breaking change.)
This is for consistency with the other stashed_* fields instead and not
really for performance.
Snapshot frequency was too high for shallow, fast-flowing queues
because the reclaimable-bytes threshold was proportional to the
(small) approximate snapshot size. With many queues sharing a WAL,
this resulted in multiple snapshots per queue per WAL cycle.

Scale the minimum reclaimable threshold inversely with the WAL fill
ratio so that snapshots are deferred until the WAL is sufficiently
full, yielding roughly one snapshot per queue per WAL cycle.
the-mikedavis and others added 4 commits March 6, 2026 10:33
This is a very small space-saving measure. `{ReadyAtMillis, MsgIdx}`
becomes `[ReadyAtMillis | MsgIdx]` for the `#delayed{}` state, saving a
small number of bytes per key when serialized. This optimised tuple type
is already used in a few other places in `rabbit_fifo` for a similar
space-saving effect.
`gb_trees:size/1` is constant time - the size is stored at the top-level
tuple.

    -doc "Returns the number of nodes in `Tree`.".
    -spec size(Tree) -> non_neg_integer() when
          Tree :: tree().
    size({Size, _}) when is_integer(Size), Size >= 0 ->
        Size.

So we don't need to manually track the tree size with a len field. This
change removes the field.
rabbit_fifo: Use optimised tuple for delayed keys
@kjnilsson kjnilsson marked this pull request as ready for review March 9, 2026 11:10
@kjnilsson kjnilsson merged commit d56a4e6 into main Mar 10, 2026
354 of 355 checks passed
@kjnilsson kjnilsson deleted the ra-v3 branch March 10, 2026 09:12
acogoluegnes added a commit to rabbitmq/rabbitmq-jms-client that referenced this pull request Mar 13, 2026
For delivery count JMS property, instead of x-delivery-count header.
x-acquired-count is the one to use as of RabbitMQ 4.3.

References rabbitmq/rabbitmq-server#13885
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants