Skip to content

Use sharded pub/sub for Redis Cluster to fix duplicate message delivery#141

Merged
niemyjski merged 4 commits intomainfrom
fix/sharded-pubsub-cluster
Feb 14, 2026
Merged

Use sharded pub/sub for Redis Cluster to fix duplicate message delivery#141
niemyjski merged 4 commits intomainfrom
fix/sharded-pubsub-cluster

Conversation

@niemyjski
Copy link
Copy Markdown
Member

@niemyjski niemyjski commented Feb 14, 2026

Summary

  • Use sharded pub/sub (SPUBLISH/SSUBSCRIBE) in Redis Cluster mode to fix duplicate message delivery
  • Replace Lazy<RedisChannel> with RedisChannel? nullable struct for zero-allocation lazy caching
  • Drop Redis < 7.0 version check — everything below 7.2 is EOL

Problem

In Redis Cluster mode, PUBLISH broadcasts messages to all nodes. StackExchange.Redis spreads Literal subscriptions across nodes, so each subscriber receives the message once per primary node (3x in a typical 3-master cluster). This caused CanReceiveMessagesConcurrentlyAsync to fail with too many signals.

Solution

When IsCluster() is true, use RedisChannel.Sharded() instead of RedisChannel.Literal(). Sharded pub/sub routes all operations for a given channel through a single shard, ensuring exactly-once delivery while preserving full fanout to all subscribers on that channel.

The channel type is resolved lazily on first access and cached via RedisChannel? _channel with ??=. This is:

  • Zero-allocation: RedisChannel is a struct, Nullable<RedisChannel> is also a value type
  • Lazy: IsCluster() is only called on first subscribe/publish, not in the constructor
  • Consistent: Same pattern as RedisQueue._listPrefix which caches IsCluster() once and never resets (topology does not change on reconnect)

References

Test plan

  • All 23 messaging tests pass against Redis Cluster (7000-7005)
  • Build succeeds with zero errors
  • Only RedisMessageBus.cs changed — no test harness or config modifications

@niemyjski niemyjski force-pushed the fix/sharded-pubsub-cluster branch from 7527190 to 19c1e47 Compare February 14, 2026 01:00
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes a critical duplicate message delivery bug in RedisMessageBus when deployed against Redis Cluster. The fix switches from standard pub/sub (PUBLISH/SUBSCRIBE) to sharded pub/sub (SPUBLISH/SSUBSCRIBE) for cluster deployments, ensuring exactly-once message delivery while preserving full fanout to all subscribers.

Changes:

  • Implemented cluster detection using the IsCluster() extension method, consistent with other Redis components in the codebase
  • Introduced GetChannel() method to lazily resolve the appropriate Redis channel type (sharded for clusters, literal for standalone/sentinel)
  • Replaced direct RedisChannel.Literal() calls with GetChannel() in both subscription and publish code paths

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Replace Lazy<RedisChannel> with RedisChannel? for zero-allocation lazy caching. Drop Redis version check since < 7.2 is EOL. The nullable struct is evaluated once on first access via ??= and cached for all subsequent calls, consistent with how RedisQueue caches IsCluster() in _listPrefix.

Co-authored-by: Cursor <cursoragent@cursor.com>
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 1 out of 1 changed files in this pull request and generated 1 comment.

Comments suppressed due to low confidence (2)

src/Foundatio.Redis/Messaging/RedisMessageBus.cs:63

  • There are existing RedisMessageBus tests, but none appear to assert the cluster-specific behavior introduced here (no duplicates in cluster, sharded channel selection when cluster + Redis >= 7, and Literal fallback otherwise). Adding an integration test that detects duplicate delivery in cluster mode would help prevent regressions of this bug fix.
    }

src/Foundatio.Redis/Messaging/RedisMessageBus.cs:62

  • ResolveChannel() falls back to Literal only when it finds a connected primary with Version < 7.0, but if no primaries are connected at the moment this Lazy is first evaluated, it will default to RedisChannel.Sharded() and cache that choice. That can cause permanent SPUBLISH/SSUBSCRIBE usage (and runtime errors) against pre-7.0 clusters if the initial version probe happened before connections were established. Consider handling the “no connected primaries / version unknown” case explicitly (e.g., return Literal or avoid caching until a version can be confirmed).
            return;

        using (await _lock.LockAsync().AnyContext())
        {
            if (_isSubscribed)
                return;

            _logger.LogTrace("Subscribing to topic: {Topic}", _options.Topic);
            _channelMessageQueue = await _options.Subscriber.SubscribeAsync(Channel).AnyContext();
            _channelMessageQueue.OnMessage(OnMessage);
            _isSubscribed = true;
            _logger.LogTrace("Subscribed to topic: {Topic}", _options.Topic);
        }
    }

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@niemyjski
Copy link
Copy Markdown
Member Author

Verification: Bug is real and reproducible

Ran CanReceiveMessagesConcurrentlyAsync against the local Redis Cluster (3 primaries, 3 replicas, Redis 8.2.1, StackExchange.Redis 2.11.0):

Code Result Time
RedisChannel.Literal (original) FAIL - timeout 4.5s (hit 4s timeout, not all 1000 signals received)
RedisChannel.Sharded (this PR) PASS 0.447s

Root Cause: Why Sharded is ~10x faster

The test publishes 103 messages to 10 subscribers (expecting 1000 countdown signals). Here is why the routing matters:

PUBLISH (Literal) in a 3-primary cluster:

  1. StackExchange.Redis sends PUBLISH to an arbitrary node
  2. That node broadcasts the message via the cluster bus to ALL other nodes (5 hops per message)
  3. The subscription lives on a single node (StackExchange.Redis subscribes to one node only for Literal channels)
  4. The subscription node receives the message from the cluster bus after propagation
  5. 103 publishes x 5 cluster bus forwards = ~515 inter-node messages
  6. With CommandFlags.FireAndForget, there is no backpressure - the client fires all 103 publishes as fast as possible, saturating the cluster bus
  7. Messages queue up in inter-node propagation, some arriving after the 4-second timeout

SPUBLISH (Sharded):

  1. The channel name is hashed to a slot owned by exactly one shard
  2. SSUBSCRIBE connects to that same shard
  3. Each SPUBLISH routes directly to the owning shard - no cluster bus broadcast
  4. 103 publishes -> 103 direct deliveries to one node, zero inter-node traffic
  5. All 1000 signals complete in under 500ms

Summary

The performance difference is architectural: PUBLISH has O(messages x nodes) cluster bus overhead, SPUBLISH has O(messages) with direct routing. Under rapid FireAndForget publishing, the cluster bus becomes the bottleneck, causing message delivery delays that exceed the test timeout.

References:

  • PUBLISH docs: "The cluster makes sure that published messages are forwarded as needed" (cluster bus broadcast)
  • SPUBLISH docs: "Posts a message to the given shard channel" (direct shard routing)
  • StackExchange.Redis #2750: Subscribe routes to single node in cluster mode

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 1 out of 1 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

…e condition

- Add IsRedisCluster() extension that checks for actual Redis Cluster
  (ServerType.Cluster) only, unlike IsCluster() which also returns true
  for Twemproxy and sentinel-only configs. Sharded pub/sub (SPUBLISH/
  SSUBSCRIBE) is a Redis Cluster-specific feature.
- Update Channel property to use IsRedisCluster() instead of IsCluster()
- Improve XML docs: clarify this prevents per-primary duplicate delivery,
  not end-to-end exactly-once semantics
- Add remarks explaining why ??= race is benign for readonly struct
- Fix RemoveIfEqualAsync_WithNonMatchingValue_DoesNotPublishInvalidation
  test race condition: SetAsync publishes an InvalidateCache message that
  can arrive at secondCache after the baseline counter is captured. Fixed
  by draining pending invalidations before populating secondCache.

Co-authored-by: Cursor <cursoragent@cursor.com>
@niemyjski
Copy link
Copy Markdown
Member Author

Addressed Copilot Review Feedback + Fixed Build Failure

Copilot Review Comments Addressed

1. IsCluster() returns true for Twemproxy/Sentinel (valid concern)

  • Added new IsRedisCluster() extension method that checks specifically for ServerType.Cluster only
  • Unlike IsCluster() (which returns true for Twemproxy proxy and sentinel-only configs), IsRedisCluster() only returns true for actual Redis Cluster deployments where SPUBLISH/SSUBSCRIBE are supported
  • Updated Channel property to use IsRedisCluster() instead of IsCluster()

2. Thread safety of ??= with nullable struct

  • Added <remarks> explaining why the race is benign: RedisChannel is a readonly struct, and both concurrent initializations produce identical values. The worst case is calling IsRedisCluster() twice, which is harmless.
  • This follows the same pattern as _listPrefix in RedisQueue which caches IsCluster() results without locking.

3. XML doc "exactly-once" is misleading

  • Updated docs to say "preventing per-primary duplicate delivery" instead of "ensuring exactly-once delivery"
  • Added explicit caveat about not providing end-to-end exactly-once semantics
  • Updated fallback description to include proxy deployments

Build Failure RCA: RemoveIfEqualAsync_WithNonMatchingValue_DoesNotPublishInvalidation

Root Cause: Race condition in the base HybridCacheClientTestBase test (Foundatio.TestHarness 13.0.0-beta1.17)

The bug sequence:

  1. firstCache.SetAsync(key, "value") — publishes InvalidateCache message via Redis pub/sub
  2. secondCache.GetAsync(key) — reads value, populates local cache
  3. initialInvalidateCalls = secondCache.InvalidateCacheCalls — captures counter
  4. The InvalidateCache from step 1 arrives at secondCache after step 3, incrementing the counter and clearing the local cache
  5. Assertion fails: expected initialInvalidateCalls == secondCache.InvalidateCacheCalls but it's initialInvalidateCalls + 1

This is a pre-existing bug, NOT caused by this PR. Verified by reverting to the original RedisChannel.Literal code — the test still fails 100% of the time (3/3 runs). The race existed before but was never caught because the update deps commit on main (which introduced these tests via 13.0.0-beta1.17) had its CI run cancelled.

Fix: Override the test in both RedisHybridCacheClientTests and ScopedRedisHybridCacheClientTests to drain pending invalidations before populating secondCache's local cache. Also fixed upstream in Foundatio TestHarness source.

… iteration

- RedisCacheClient: add _isCluster field resolved in constructor, replacing 6 per-call IsCluster() checks on hot paths (RemoveAll, GetAll, SetAll, etc.)

- RedisQueue: consolidate IsCluster() into single constructor local for _listPrefix and _topicChannel

- RedisMessageBus: standardize on IsCluster(), remove IsRedisCluster(), lazy ??= for channel caching

- Remove IsRedisCluster() extension to keep single API

- Revert test overrides back to base class delegation

Co-authored-by: Cursor <cursoragent@cursor.com>
@niemyjski niemyjski merged commit 564e7dc into main Feb 14, 2026
4 checks passed
@niemyjski niemyjski deleted the fix/sharded-pubsub-cluster branch February 14, 2026 02:32
@mgravell
Copy link
Copy Markdown

mgravell commented Feb 14, 2026

I am not familiar with this lib, so I don't have much context: but, RedisChannnel.Literal optionally supports key-like routing, which gives you the same node-routing as the sharded variants without changing the message kind - .WithKeyRouting(). The servers will still broadcast horizontally but effectively one node will be dealing with all the traffic.

If you're seeing a correctness bug: something is wrong. Literal should only connect to one node (whether random or key-like), unless something went very weird with the changes we made for the keyspace/keyevent notifications.

As a side note: if this is dealing with hybrid-cache via SE.Redis... I'm kinda amazed I haven't seen it before 🙃

@mgravell
Copy link
Copy Markdown

Observations on RedisChannel?:

  1. IIRC there is an IsNull check on a channel; you could potentially use that to avoid the extra layer of wrapping
  2. beware "tearing" if this is accessed from multiple threads

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants