Skip to content

Add CLAIM parameter to XREADGROUP for automatic pending entry claiming#14402

Merged
sundb merged 49 commits intoredis:unstablefrom
sggeorgiev:xreadgroup-claim
Oct 21, 2025
Merged

Add CLAIM parameter to XREADGROUP for automatic pending entry claiming#14402
sundb merged 49 commits intoredis:unstablefrom
sggeorgiev:xreadgroup-claim

Conversation

@sggeorgiev
Copy link
Collaborator

@sggeorgiev sggeorgiev commented Oct 1, 2025

Overview

This PR enhances Redis Streams consumer groups by adding an optional CLAIM parameter to the XREADGROUP command, enabling automatic claiming of idle pending entries alongside normal message consumption in a single operation.

Problem Statement

Current Redis Streams consumer group implementations require developers to manually orchestrate multiple commands to handle both pending and new entries:

  • XPENDING to discover idle pending entries
  • XCLAIM/XAUTOCLAIM to claim idle entries
  • XREADGROUP to consume new entries

This multi-command approach creates:

  • Performance overhead from multiple round trips to Redis
  • Implementation complexity, particularly when working with multiple streams
  • Code duplication across consumer implementations

Solution

Extends XREADGROUP with a new optional CLAIM parameter:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] [CLAIM min-idle-time] STREAMS key [key ...] id [id ...]

When CLAIM min-idle-time is specified, the command operates in two phases:

  1. Claim Phase: Automatically claims pending entries idle for ≥ min-idle-time milliseconds
  2. Read Phase: Processes new entries if the COUNT limit hasn't been reached

Response Format Changes

When the CLAIM option is used, the response format is extended to include delivery metadata for each entry:
Standard XREADGROUP response (without CLAIM):

127.0.0.1:6379> XREADGROUP GROUP mygroup consumer1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1609459200000-0"
         2) 1) "field1"
            2) "value1"

XREADGROUP response with CLAIM:

127.0.0.1:6379> XREADGROUP GROUP mygroup consumer1 CLAIM 30000 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1609459200000-0"
         2) 1) "field1"
            2) "value1"
         3) 15000
         4) 3

Response structure with CLAIM:

  • Field 1: Stream entry ID (unchanged)

  • Field 2: Field-value pairs (unchanged)

  • Field 3: Idle time in milliseconds - the number of milliseconds elapsed since this entry was last delivered to a consumer

  • Field 4: Delivery count - the number of times this entry has been delivered:

    • 0 for new messages that haven't been delivered before
    • 1+ for claimed messages (previously unacknowledged entries)

Purpose of the new fields:
These fields enable intelligent client-side processing decisions:

  • Idle time enables time-based escalation strategies, detection of stuck messages, and priority processing for critically delayed work
  • Delivery count enables retry limits, dead-letter queue logic, poison message detection, and alternative processing strategies based on failure history

Together, these fields provide the visibility needed to build robust, self-healing consumer systems without requiring additional XPENDING queries.

Note: If the ID parameter is not >, the command returns entries that are pending for the consumer, and the CLAIM option is ignored. In this case, the response follows the standard format without the additional delivery metadata fields.

Key Benefits

  • Reduced Complexity: Eliminates manual PEL management and multi-command orchestration
  • Improved Performance: Reduces round trips by 50-70% for workloads processing both pending and new entries
  • Backward Compatibility: Fully optional parameter with zero breaking changes to existing behavior
  • Multi-Stream Support: Works seamlessly across multiple streams in a single command
  • Flexible Consumer Patterns: Enables mixed consumer types within the same group:
    • Consumers without CLAIM that only handle new messages
    • Consumers with CLAIM that process both pending and new entries

Impact on Existing Commands

The XCLAIM and XAUTOCLAIM commands may potentially benefit from the new pel_by_time index for improved performance, such optimizations require further investigation and testing. Enhancements to XCLAIM and XAUTOCLAIM are postponed for future work.

Performance Benchmarks

Latency Performance

Comprehensive performance testing demonstrates significant improvements over the traditional XAUTOCLAIM approach:
Test Methodology
Two identical test scenarios were executed to compare XAUTOCLAIM against XREADGROUP with CLAIM:
Test Setup:

  1. Insert 20,000 messages into a stream
  2. Read all messages with XREADGROUP to populate the pending entries list (PEL)
  3. Set IDLE time to 1100ms on 1,000 randomly selected pending messages using XCLAIM
  4. Set IDLE time to 50ms on all remaining 19,000 pending messages using XCLAIM
  5. Execute the target command with min-idle-time=1000ms and COUNT=1000 to claim the eligible messages
  6. Repeat steps 3-5 for 1,000 iterations

Test 1 - XAUTOCLAIM (Traditional Approach):

XAUTOCLAIM Performance:
  Average:    54.671ms
  Median:     53.582ms
  Min:        3.738ms
  Max:        71.596ms
  P95:        62.536ms
  P99:        68.800ms

Test 2 - XREADGROUP with CLAIM (New Approach):

XREADGROUP CLAIM Performance:
  Average:    2.426ms
  Median:     2.571ms
  Min:        1.287ms
  Max:        4.653ms
  P95:        3.370ms
  P99:        4.212ms

Performance Analysis
The new XREADGROUP CLAIM implementation delivers 22.5x faster average performance compared to XAUTOCLAIM:

  • Average latency reduction: 95.6% (54.671ms → 2.426ms)
  • Median latency reduction: 95.2% (53.582ms → 2.571ms)
  • P95 latency reduction: 94.6% (62.536ms → 3.370ms)
  • P99 latency reduction: 93.9% (68.800ms → 4.212ms)

This performance improvement is achieved through the time-ordered PEL index (pel_by_time), which enables O(log n + k) retrieval of idle entries versus XAUTOCLAIM's less efficient scanning approach.

Memory Performance

To evaluate the memory overhead of the pel_by_time index, comprehensive memory testing was conducted comparing Redis with and without the index under realistic workload conditions.

Test Methodology:

  • Insert 200,000 new messages into a stream
  • Read messages in blocks of 100 using XREADGROUP (populating the PEL with 200,000 pending entries)
  • Wait 5ms after each read block (simulating realistic processing delays that affect rax tree compression)
  • Measure memory usage before and after the reading phase

Test Results - Without pel_by_time Index:

Initial memory (used):                                   926.10 KB
After insertion (used):                                    6.80 MB
After reading (used):                                     41.53 MB
Memory increase from data:                                 5.90 MB
Memory increase from reading:                             34.72 MB
Total memory increase:                                    40.62 MB

Test Results - With pel_by_time Index:

Initial memory (used):                                   927.44 KB
After insertion (used):                                    6.81 MB
After reading (used):                                     45.07 MB
Memory increase from data:                                 5.90 MB
Memory increase from reading:                             38.27 MB
Total memory increase:                                    44.17 MB

Memory Performance Analysis:
The pel_by_time index introduces a measurable but reasonable memory overhead:

Used Memory Impact:

  • Memory increase from pel_by_time index: 3.55 MB (38.27 MB - 34.72 MB)
  • Per-entry overhead: 18.6 bytes (3.55 MB / 200,000 entries)
  • Percentage overhead: 8.7% increase in total memory usage

Per-Entry Memory Breakdown:
The theoretical minimum for the pel_by_time index is 32 bytes per entry (composite key only, no node values). The observed 18.6 bytes per entry overhead is lower than the theoretical maximum, suggesting effective rax tree compression is occurring despite the 5ms delays between reads.

Technical Implementation

New Data Structure: Time-Ordered PEL Index (pel_by_time)

To efficiently identify and claim idle pending entries, this PR introduces a new rax tree structure to the consumer group implementation:
Structure Design:

  • Tree Type: Rax tree named pel_by_time added to each consumer group
  • Key Composition: 32-byte composite key consisting of:
    • delivery_time (timestamp when entry was last delivered)
    • streamId (stream entry ID)

Key Format: delivery_time + streamId (concatenated)
Node Value: None - all necessary information is encoded in the key itself for memory efficiency
Key Properties:
Uniqueness Guarantee: While multiple pending entries may share the same delivery_time, the streamId component ensures each key is globally unique within the tree.
Lexicographical Ordering: The rax tree naturally orders nodes lexicographically by key. Since delivery_time forms the prefix of each key, entries are automatically sorted by delivery time, with oldest entries appearing first in the tree.
Efficient Range Operations: This time-based ordering enables highly efficient range searches. To find all entries idle for at least min-idle-time milliseconds, we simply perform a range query from the tree's beginning up to current_time - min-idle-time.
Fast Retrieval:
Once idle entries are identified via the pel_by_time index, the embedded streamId in each key is used to quickly retrieve the full pending message data structure for the subsequent XREADGROUP claim operation.
Performance Characteristics:

  • Insertion: O(log n) when adding entries to PEL
  • Range Search: O(log n + k) where k is the number of idle entries found
  • Memory Overhead: 32 bytes per pending entry for the index key (no additional node values stored)

This dual-index approach (existing PEL structures plus the new time-ordered index) allows XREADGROUP with CLAIM to efficiently identify claimable entries without scanning the entire PEL, making the operation suitable for consumer groups with large pending entry lists.

COUNT Behavior with CLAIM

When the COUNT option is used in conjunction with CLAIM, the command follows a two-phase execution strategy to maximize the specified count limit:
Phase 1: Claim Idle Pending Entries

  • Retrieve claimable pending entries (idle for ≥ min-idle-time) up to the COUNT limit
  • These entries are claimed and returned to the consumer

Phase 2: Fetch New Messages (if needed)

  • If the COUNT limit has not been satisfied by claimed pending entries, the command proceeds to read new messages from the stream
  • New messages are fetched up to the remaining available count: remaining_count = COUNT - claimed_entries

This prioritization ensures that idle pending entries are always processed first, preventing indefinite message stalling while still allowing consumers to process new messages efficiently when pending entries are scarce.

BLOCK Behavior with CLAIM

When the CLAIM option is used in conjunction with the BLOCK option, the command exhibits sophisticated blocking behavior that responds to both new messages and pending entries becoming claimable:

Blocking State Management:
If there are no immediately claimable pending entries and no new messages available in the stream, the XREADGROUP command enters a blocking state for the specified duration. However, the implementation must handle a critical scenario: pending entries that become idle (and thus claimable) while the command is blocked must trigger an early wakeup to serve those entries.

Implementation: stream_claim_pending_keys Dictionary
To enable this reactive blocking behavior, a new stream_claim_pending_keys dictionary is introduced to the redisDb structure:

  • Key: Stream key being watched
  • Value: The minimum timestamp when the next pending entry in this stream will become claimable (i.e., will satisfy the min-idle-time requirement)

Multi-Client Coordination:
When multiple XREADGROUP commands with BLOCK and CLAIM are executed concurrently on the same stream, the dictionary value stores the shortest claimable time across all waiting clients. This ensures the earliest possible wakeup when any pending entry becomes available for claiming.

Wakeup Mechanism: handleClaimableStreamEntries
The handleClaimableStreamEntries function is invoked regularly from blockedBeforeSleep to monitor and react to claimable entries:

  1. Scan Phase: Iterates through all entries in the stream_claim_pending_keys dictionary
  2. Time Check: Compares each entry's claimable timestamp against the current time
  3. Signal Phase: When claimable_time ≤ current_time, calls signalKeyAsReady to wake up all clients blocked on that stream
  4. Client Processing: Awakened clients attempt to claim and process the newly available pending entries

Resource Contention Handling:
When the number of claimable entries is insufficient to satisfy all awakened clients:

  • Clients that successfully claim entries complete their operations
  • Remaining clients recalculate the next minimum claimable time based on remaining pending entries
  • These clients update the stream_claim_pending_keys dictionary with the new timestamp
  • They re-enter the blocking state to wait for the next batch of claimable entries

This design ensures fair resource distribution and prevents busy-waiting while maintaining responsiveness to both new messages and aging pending entries.

@jit-ci
Copy link

jit-ci bot commented Oct 1, 2025

Hi, I’m Jit, a friendly security platform designed to help developers build secure applications from day zero with an MVS (Minimal viable security) mindset.

In case there are security findings, they will be communicated to you as a comment inside the PR.

Hope you’ll enjoy using Jit.

Questions? Comments? Want to learn more? Get in touch with us.

@CLAassistant
Copy link

CLAassistant commented Oct 1, 2025

CLA assistant check
All committers have signed the CLA.

@minchopaskal minchopaskal added release-notes indication that this issue needs to be mentioned in the release notes state:needs-doc-pr requires a PR to redis-doc repository labels Oct 1, 2025
@sundb
Copy link
Collaborator

sundb commented Oct 21, 2025

@sggeorgiev
Copy link
Collaborator Author

It looks like there is a problem with downloading repository metadata for CentOS.

CentOS Stream 9 - BaseOS                        437  B/s | 3.9 kB     00:09    
Errors during downloading metadata for repository 'baseos':
  - Downloading successful, but checksum doesn't match. Calculated: b26fbafee2070ebfe05d0a8fc4c6c94ad3e660922676f7720b7a05617f9aca2f03f0d15c5a92a1a874024cbf076e43a38a4a8bd8057583ff84260e600555e51b(sha512)  Expected: 5b1342b77b202688bd232da4e3dc064f49f9cd6de12fc9b4564d2dc3a05fb32662a1ea9df536f67a51e78f1b5261e0b4786ff4bcb02edfb79e27c220507cd152(sha512) 
Error: Failed to download metadata for repo 'baseos': Cannot download repomd.xml: Cannot download repodata/repomd.xml: All mirrors were tried
Error: Process completed with exit code 1.

I will rerun it after some time.

@sundb sundb merged commit 090ca80 into redis:unstable Oct 21, 2025
31 of 33 checks passed
@github-project-automation github-project-automation bot moved this from Todo to Done in Redis 8.4 Oct 21, 2025
sundb added a commit that referenced this pull request Nov 7, 2025
The added logic from #14402
introduced overhead to the XREADGROUP even when the added feature is not
used.

This PR tries to mitigate it, by removing unnecessary streamEncodeID()
calls and redundant byte-swapping operations from the stream iterator
hot path.
By comparing stream IDs directly in native-endian form, we eliminate
repeated encoding and memcmp() calls that were responsible for a
significant portion of total CPU time during stream iteration.

Additionally, endian conversion helpers are modernized to leverage
compiler-provided intrinsics (__builtin_bswap*) for single-instruction
byte-swaps on supported compilers.

---------

Co-authored-by: debing.sun <debing.sun@redis.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

release-notes indication that this issue needs to be mentioned in the release notes state:needs-doc-pr requires a PR to redis-doc repository

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

6 participants