Add XNACK command for releasing stream messages back to the group#14797
Merged
Conversation
shahsb
reviewed
Feb 19, 2026
shahsb
left a comment
There was a problem hiding this comment.
The bugbot pointed out a potential use-after-free in the active defragmentation path for NACKed entries. I'm trying to understand how the current defrag design handles entries that are not owned by any consumer, and whether there's a broader pattern we could adopt to avoid similar issues in the future.
sundb
requested changes
Mar 30, 2026
sundb
requested changes
Mar 30, 2026
Collaborator
|
forgot to remove PendingEntryContext diff --git a/src/defrag.c b/src/defrag.c
index b3fcabfa6..5bf433a44 100644
--- a/src/defrag.c
+++ b/src/defrag.c
@@ -855,19 +855,14 @@ void defragRadixTree(rax **raxref, int defrag_data, raxDefragFunction *element_c
raxStop(&ri);
}
-typedef struct {
- streamCG *cg;
- streamConsumer *c;
-} PendingEntryContext;
-
void* defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata) {
- PendingEntryContext *ctx = privdata;
+ streamConsumer *c = privdata;
streamNACK *nack = ri->data;
/* NACKs are already defragged by the CG PEL walk (defragStreamCGPendingEntry).
* cgroup_ref_node->value is also updated there for all NACKs (including
* unowned NACK-zone entries that have no consumer PEL walk).
* Here we only fix up the back-pointer to the possibly-relocated consumer. */
- nack->consumer = ctx->c;
+ nack->consumer = c;
return NULL;
}
@@ -924,8 +919,7 @@ void* defragStreamConsumer(raxIterator *ri, void *privdata) {
if (c->pel) {
/* Update pel back-pointer to new stream */
c->pel->alloc_size = &s->alloc_size;
- PendingEntryContext pel_ctx = {cg, c};
- defragRadixTree(&c->pel, 0, defragStreamConsumerPendingEntry, &pel_ctx);
+ defragRadixTree(&c->pel, 0, defragStreamConsumerPendingEntry, c);
}
return newc; /* returns NULL if c was not defragged */
} |
sundb
reviewed
Mar 30, 2026
Collaborator
Author
|
PendingEntryContext is removed. |
sundb
reviewed
Mar 31, 2026
sundb
reviewed
Apr 2, 2026
sundb
reviewed
Apr 2, 2026
sundb
reviewed
Apr 2, 2026
sundb
approved these changes
Apr 7, 2026
pierluigilenoci
pushed a commit
to pierluigilenoci/redis
that referenced
this pull request
Apr 16, 2026
…dis#14797) ### Overview This PR enhances Redis Streams consumer groups by adding a new `XNACK` command that allows consumers to explicitly release pending messages back to the group without acknowledging them. Released (NACKed) entries become immediately available for re-delivery to other consumers, eliminating the idle-timeout delay currently required for message recovery. The command supports three modes — SILENT, FAIL, and FATAL — giving consumers fine-grained control over delivery counter semantics to handle graceful shutdowns, transient failures, and poison messages respectively. ### Problem Statement For developers using Redis Streams with consumer groups, there are several common scenarios where a consumer needs to release a message it has claimed without acknowledging it: 1. **Transient internal failures**: A consumer may fail to process a message because of problems unrelated to the message itself — for example, it cannot connect to an external service to fetch required context. The message is perfectly valid and should be retried promptly by another consumer. 2. **Resource pressure**: A consumer under resource stress (low CPU, low memory) may be unable to handle a specific message (e.g., a complex or large message) within acceptable QoS. It should leave the opportunity to other consumers in the group, with minimal delay. 3. **Graceful shutdown**: A consumer about to shut down would like to immediately release all unprocessed messages it has claimed, so they can be picked up by remaining consumers without waiting for idle timeouts. 4. **Poison / malicious messages**: A consumer may detect or suspect that a claimed message is invalid or malicious and wants to mark it as permanently failed (for dead-letter queue processing when available). **Currently, a consumer cannot NACK a message.** It can either: - **XACK** it — marks it as "processed" and removes it from the PEL entirely, losing the ability to redeliver it - **Leave it pending** — requires other consumers to discover it via `XPENDING` and claim it via `XCLAIM`/`XAUTOCLAIM` or `XREADGROUP CLAIM` after the idle timeout expires, introducing a long, unnecessary delay In all these cases, the logic that applications must implement introduces **message handling delays**, **implementation complexity**, and **code duplication** across consumer implementations. ### Solution Introduces a new `XNACK` (Negative ACKnowledge) command that explicitly releases pending messages from their owning consumer back to the group's PEL, making them immediately claimable via `XCLAIM` and `XAUTOCLAIM`, and prioritized for re-delivery in `XREADGROUP CLAIM`: ``` XNACK key group <SILENT|FAIL|FATAL> IDS numids id [id ...] [RETRYCOUNT count] [FORCE] ``` When executed, the command: 1. **Disassociates** the entry from its owning consumer (`consumer = NULL`) 2. **Repositions** the entry to the head of the PEL time-ordered list (`delivery_time = 0`), making it immediately claimable with any `min-idle-time` threshold 3. **Adjusts the delivery counter** based on the specified mode, giving consumers fine-grained control over retry semantics 4. **Returns** the count of successfully NACKed entries **Mode** controls the delivery counter adjustment and communicates the reason for the NACK: | Mode | Delivery Counter Behavior | Use Case | |----------|---------------------------------------------------|---------------------------------------------| | `SILENT` | Decrement by 1 (undo the delivery increment) | Consumer shutdown / transient internal error — the delivery "didn't count" | | `FAIL` | No change (keep the incremented value) | Message too complex for this consumer, but may work for others — count this as an attempt | | `FATAL` | Set to `LLONG_MAX` | Invalid / suspected malicious message — mark as permanently failed | The three modes map directly to the real-world scenarios above: - **SILENT** for graceful shutdown or transient failures unrelated to the message - **FAIL** for resource-constrained consumers that cannot handle a specific message - **FATAL** for poison message detection and dead-letter queue integration **Optional parameters:** - **`RETRYCOUNT count`**: Directly sets `delivery_count` to the specified value, overriding the mode-based adjustment - **`FORCE`**: Creates new unowned PEL entries for IDs that are not already in the group PEL (the entry must exist in the stream). When `FORCE` creates an entry, the delivery counter is set to `0` (or to `RETRYCOUNT` if specified, or to `LLONG_MAX` if mode is `FATAL`). This is used internally for AOF rewrite and replication. ### Response Format The command returns an integer — the number of messages successfully NACKed (released back to the group PEL): ``` 127.0.0.1:6379> XADD mystream 1-0 f v1 "1-0" 127.0.0.1:6379> XADD mystream 2-0 f v2 "2-0" 127.0.0.1:6379> XGROUP CREATE mystream grp 0 OK 127.0.0.1:6379> XREADGROUP GROUP grp c1 STREAMS mystream > 1) 1) "mystream" 2) 1) 1) "1-0" 2) 1) "f" 2) "v1" 2) 1) "2-0" 2) 1) "f" 2) "v2" 127.0.0.1:6379> XNACK mystream grp FAIL IDS 2 1-0 2-0 (integer) 2 ``` After XNACK, the entries appear with an empty consumer in XPENDING output: ``` 127.0.0.1:6379> XPENDING mystream grp - + 10 1) 1) "1-0" 2) "" 3) (integer) -1 4) (integer) 1 2) 1) "2-0" 2) "" 3) (integer) -1 4) (integer) 1 ``` ### NACK Zone: Data Structure Extension To support unowned PEL entries and ensure they are prioritized for re-delivery, a **NACK zone** is introduced at the head of the existing PEL time-ordered doubly-linked list. A new `pel_nack_tail` pointer is added to the `streamCG` structure: **PEL ordering:** ``` [pel_time_head] <-> ... <-> [pel_nack_tail] <-> [owned entries...] <-> [pel_time_tail] |_____________ NACK zone ______________| |_______ normal PEL ________| ``` The head of the PEL contains all NACKed messages (FIFO-ordered), followed by all delivered messages that were not NACKed (same order as today). This ensures NACKed messages are always prioritized over idle pending messages. The delivery order for `XREADGROUP` is therefore: 1. If `CLAIM` was specified: first deliver NACKed messages, then deliver due pending messages (current behavior) 2. Deliver new entries after the group's last-delivered-id (current behavior) **Structure Design:** - NACKed entries occupy positions from `pel_time_head` to `pel_nack_tail` in the time-ordered list - Their `delivery_time` is set to `0`, ensuring they always appear "oldest" and are immediately claimable - Their `consumer` pointer is set to `NULL`, marking them as unowned - `pel_nack_tail` is `NULL` when no NACKed entries exist **Key Properties:** - **O(1) insertion**: New NACKed entries are inserted right after `pel_nack_tail` (or at the list head if the zone is empty) - **FIFO ordering** among NACKed entries: entries are NACKed in the order they are released - **Immediate claimability**: Since `delivery_time = 0`, NACKed entries have maximum idle time and satisfy any `min-idle-time` threshold in `XCLAIM` and `XAUTOCLAIM`, In `XREADGROUP CLAIM`, NACKed entries are also prioritized over other pending entries due to their position at the head of the PEL. - **Zone integrity**: The `pelListInsertSorted` function is updated to stop scanning at the `pel_nack_tail` boundary, ensuring owned entries are never placed inside the NACK zone ### Impact on Existing Commands All commands that interact with the PEL are updated to handle unowned (`consumer = NULL`) entries: - **XPENDING**: Shows NACKed entries with an empty consumer name - **XCLAIM / XAUTOCLAIM**: Can claim NACKed entries (they satisfy any min-idle-time since `delivery_time = 0`) - **XREADGROUP CLAIM**: NACKed entries are picked up by the claim phase - **XACK**: Works correctly on NACKed entries (removes from group PEL) - **XINFO STREAM FULL**: Displays NACKed entries with an empty consumer name - **XGROUP DELCONSUMER**: Unaffected — NACKed entries are not in any consumer's PEL Propagation is also updated: when `XCLAIM` or `XAUTOCLAIM` encounters a deleted stream entry for an unowned NACK, it propagates `XACK` (instead of `XCLAIM`) to replicas and AOF, since there is no source consumer to reference. ### Persistence **RDB:** - A new RDB type `RDB_TYPE_STREAM_LISTPACKS_5` (type 27) is introduced - After saving consumer PEL entries, the NACK zone stream IDs are saved separately (count + encoded IDs) - On load, NACK zone entries are reconstructed by looking them up in the group PEL, unlinking from their sorted position, and re-inserting into the NACK zone via `pelListInsertNacked` - Backward compatibility is preserved: old RDB types continue to load with the existing validation (all entries must have consumers) **AOF:** - AOF rewrite emits `XNACK <key> <group> FAIL IDS <n> <id...> RETRYCOUNT <cnt> FORCE` commands for entries in the NACK zone - Consecutive entries with the same `delivery_count` are batched into a single command (up to `AOF_REWRITE_ITEMS_PER_CMD` IDs per command) ### Defragmentation The defragmentation logic is restructured to handle unowned entries: - **`defragStreamCGPendingEntry`** (new): Walks the group-level PEL rax, defragments each NACK, updates the doubly-linked list pointers (`pel_prev`, `pel_next`), `pel_time_head`, `pel_time_tail`, `pel_nack_tail`, and the consumer PEL back-pointer for owned entries - **`defragStreamConsumerPendingEntry`** (simplified): Only fixes up back-pointers to the possibly-relocated consumer and CG, since actual defragmentation is now done at the group-level walk. Unowned (NACK zone) entries have no consumer PEL walk, so the group-level pass is their only chance ### Key Benefits - **Immediate re-delivery**: NACKed entries are instantly claimable by other consumers via `XCLAIM` and `XAUTOCLAIM` (since `delivery_time = 0` satisfies any `min-idle-time`), and prioritized for re-delivery in `XREADGROUP CLAIM`, eliminating idle-time delays that can range from seconds to minutes - **Explicit release semantics**: Consumers can release messages intentionally, with fine-grained control over retry behavior — a capability that exists in competing systems like RabbitMQ - **Flexible retry control**: Three modes (SILENT, FAIL, FATAL) plus RETRYCOUNT cover the full spectrum of failure handling strategies, from graceful shutdown to poison message detection - **Reduced application complexity**: Eliminates the need for application-level workarounds involving XPENDING polling, arbitrary idle timeouts, and manual XCLAIM orchestration - **Dead-letter queue readiness**: FATAL mode + delivery count enables straightforward poison message detection and future DLQ integration - **Backward compatibility**: Fully optional new command with zero breaking changes to existing behavior
This was referenced Apr 23, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.

Overview
This PR enhances Redis Streams consumer groups by adding a new
XNACKcommand that allows consumers to explicitly release pending messages back to the group without acknowledging them. Released (NACKed) entries become immediately available for re-delivery to other consumers, eliminating the idle-timeout delay currently required for message recovery. The command supports three modes — SILENT, FAIL, and FATAL — giving consumers fine-grained control over delivery counter semantics to handle graceful shutdowns, transient failures, and poison messages respectively.Problem Statement
For developers using Redis Streams with consumer groups, there are several common scenarios where a consumer needs to release a message it has claimed without acknowledging it:
Transient internal failures: A consumer may fail to process a message because of problems unrelated to the message itself — for example, it cannot connect to an external service to fetch required context. The message is perfectly valid and should be retried promptly by another consumer.
Resource pressure: A consumer under resource stress (low CPU, low memory) may be unable to handle a specific message (e.g., a complex or large message) within acceptable QoS. It should leave the opportunity to other consumers in the group, with minimal delay.
Graceful shutdown: A consumer about to shut down would like to immediately release all unprocessed messages it has claimed, so they can be picked up by remaining consumers without waiting for idle timeouts.
Poison / malicious messages: A consumer may detect or suspect that a claimed message is invalid or malicious and wants to mark it as permanently failed (for dead-letter queue processing when available).
Currently, a consumer cannot NACK a message. It can either:
XPENDINGand claim it viaXCLAIM/XAUTOCLAIMorXREADGROUP CLAIMafter the idle timeout expires, introducing a long, unnecessary delayIn all these cases, the logic that applications must implement introduces message handling delays, implementation complexity, and code duplication across consumer implementations.
Solution
Introduces a new
XNACK(Negative ACKnowledge) command that explicitly releases pending messages from their owning consumer back to the group's PEL, making them immediately claimable viaXCLAIMandXAUTOCLAIM, and prioritized for re-delivery inXREADGROUP CLAIM:When executed, the command:
consumer = NULL)delivery_time = 0), making it immediately claimable with anymin-idle-timethresholdMode controls the delivery counter adjustment and communicates the reason for the NACK:
SILENTFAILFATALLLONG_MAXThe three modes map directly to the real-world scenarios above:
Optional parameters:
RETRYCOUNT count: Directly setsdelivery_countto the specified value, overriding the mode-based adjustmentFORCE: Creates new unowned PEL entries for IDs that are not already in the group PEL (the entry must exist in the stream). WhenFORCEcreates an entry, the delivery counter is set to0(or toRETRYCOUNTif specified, or toLLONG_MAXif mode isFATAL). This is used internally for AOF rewrite and replication.Response Format
The command returns an integer — the number of messages successfully NACKed (released back to the group PEL):
After XNACK, the entries appear with an empty consumer in XPENDING output:
NACK Zone: Data Structure Extension
To support unowned PEL entries and ensure they are prioritized for re-delivery, a NACK zone is introduced at the head of the existing PEL time-ordered doubly-linked list. A new
pel_nack_tailpointer is added to thestreamCGstructure:PEL ordering:
The head of the PEL contains all NACKed messages (FIFO-ordered), followed by all delivered messages that were not NACKed (same order as today). This ensures NACKed messages are always prioritized over idle pending messages.
The delivery order for
XREADGROUPis therefore:CLAIMwas specified: first deliver NACKed messages, then deliver due pending messages (current behavior)Structure Design:
pel_time_headtopel_nack_tailin the time-ordered listdelivery_timeis set to0, ensuring they always appear "oldest" and are immediately claimableconsumerpointer is set toNULL, marking them as unownedpel_nack_tailisNULLwhen no NACKed entries existKey Properties:
pel_nack_tail(or at the list head if the zone is empty)delivery_time = 0, NACKed entries have maximum idle time and satisfy anymin-idle-timethreshold inXCLAIMandXAUTOCLAIM, InXREADGROUP CLAIM, NACKed entries are also prioritized over other pending entries due to their position at the head of the PEL.pelListInsertSortedfunction is updated to stop scanning at thepel_nack_tailboundary, ensuring owned entries are never placed inside the NACK zoneImpact on Existing Commands
All commands that interact with the PEL are updated to handle unowned (
consumer = NULL) entries:delivery_time = 0)Propagation is also updated: when
XCLAIMorXAUTOCLAIMencounters a deleted stream entry for an unowned NACK, it propagatesXACK(instead ofXCLAIM) to replicas and AOF, since there is no source consumer to reference.Persistence
RDB:
RDB_TYPE_STREAM_LISTPACKS_5(type 27) is introducedpelListInsertNackedAOF:
XNACK <key> <group> FAIL IDS <n> <id...> RETRYCOUNT <cnt> FORCEcommands for entries in the NACK zonedelivery_countare batched into a single command (up toAOF_REWRITE_ITEMS_PER_CMDIDs per command)Defragmentation
The defragmentation logic is restructured to handle unowned entries:
defragStreamCGPendingEntry(new): Walks the group-level PEL rax, defragments each NACK, updates the doubly-linked list pointers (pel_prev,pel_next),pel_time_head,pel_time_tail,pel_nack_tail, and the consumer PEL back-pointer for owned entriesdefragStreamConsumerPendingEntry(simplified): Only fixes up back-pointers to the possibly-relocated consumer and CG, since actual defragmentation is now done at the group-level walk. Unowned (NACK zone) entries have no consumer PEL walk, so the group-level pass is their only chanceKey Benefits
XCLAIMandXAUTOCLAIM(sincedelivery_time = 0satisfies anymin-idle-time), and prioritized for re-delivery inXREADGROUP CLAIM, eliminating idle-time delays that can range from seconds to minutesNote
High Risk
High risk because it changes core Streams consumer-group PEL semantics and modifies persistence/replication paths (new RDB type/version, AOF rewrite emission) which can impact data integrity and backward/forward compatibility.
Overview
Adds a new
XNACKStreams command to explicitly release pending consumer-group entries back to the group as unowned PEL items, with configurable delivery-count semantics (SILENT/FAIL/FATAL, optionalRETRYCOUNT) and optionalFORCEcreation for missing PEL entries.Introduces a consumer-group “NACK zone” in the time-ordered PEL (new
pel_nack_tailplus helpers) and updates related commands/paths (XPENDING,XINFO STREAM FULL,XCLAIM/XAUTOCLAIM/XREADGROUP CLAIM,XACK, defrag) to tolerateconsumer == NULL, prevent overflow ofdelivery_count, and propagate cleanup viaXACKwhen unowned pending entries reference deleted stream items.Extends persistence and rewrite tooling to preserve NACKed entries: new stream RDB encoding
RDB_TYPE_STREAM_LISTPACKS_5withRDB_VERSIONbump, plus AOF rewrite emission of batchedXNACK ... FORCEcommands; command/docs tables and extensive tests are added/updated accordingly.Reviewed by Cursor Bugbot for commit be491de. Bugbot is set up for automated code reviews on this repo. Configure here.