Use sharded pub/sub for Redis Cluster to fix duplicate message delivery#141
Use sharded pub/sub for Redis Cluster to fix duplicate message delivery#141
Conversation
f001533 to
7527190
Compare
Co-authored-by: Cursor <cursoragent@cursor.com>
7527190 to
19c1e47
Compare
There was a problem hiding this comment.
Pull request overview
This PR fixes a critical duplicate message delivery bug in RedisMessageBus when deployed against Redis Cluster. The fix switches from standard pub/sub (PUBLISH/SUBSCRIBE) to sharded pub/sub (SPUBLISH/SSUBSCRIBE) for cluster deployments, ensuring exactly-once message delivery while preserving full fanout to all subscribers.
Changes:
- Implemented cluster detection using the
IsCluster()extension method, consistent with other Redis components in the codebase - Introduced
GetChannel()method to lazily resolve the appropriate Redis channel type (sharded for clusters, literal for standalone/sentinel) - Replaced direct
RedisChannel.Literal()calls withGetChannel()in both subscription and publish code paths
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Replace Lazy<RedisChannel> with RedisChannel? for zero-allocation lazy caching. Drop Redis version check since < 7.2 is EOL. The nullable struct is evaluated once on first access via ??= and cached for all subsequent calls, consistent with how RedisQueue caches IsCluster() in _listPrefix. Co-authored-by: Cursor <cursoragent@cursor.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 1 out of 1 changed files in this pull request and generated 1 comment.
Comments suppressed due to low confidence (2)
src/Foundatio.Redis/Messaging/RedisMessageBus.cs:63
- There are existing RedisMessageBus tests, but none appear to assert the cluster-specific behavior introduced here (no duplicates in cluster, sharded channel selection when cluster + Redis >= 7, and Literal fallback otherwise). Adding an integration test that detects duplicate delivery in cluster mode would help prevent regressions of this bug fix.
}
src/Foundatio.Redis/Messaging/RedisMessageBus.cs:62
- ResolveChannel() falls back to Literal only when it finds a connected primary with Version < 7.0, but if no primaries are connected at the moment this Lazy is first evaluated, it will default to RedisChannel.Sharded() and cache that choice. That can cause permanent SPUBLISH/SSUBSCRIBE usage (and runtime errors) against pre-7.0 clusters if the initial version probe happened before connections were established. Consider handling the “no connected primaries / version unknown” case explicitly (e.g., return Literal or avoid caching until a version can be confirmed).
return;
using (await _lock.LockAsync().AnyContext())
{
if (_isSubscribed)
return;
_logger.LogTrace("Subscribing to topic: {Topic}", _options.Topic);
_channelMessageQueue = await _options.Subscriber.SubscribeAsync(Channel).AnyContext();
_channelMessageQueue.OnMessage(OnMessage);
_isSubscribed = true;
_logger.LogTrace("Subscribed to topic: {Topic}", _options.Topic);
}
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Verification: Bug is real and reproducibleRan
Root Cause: Why Sharded is ~10x fasterThe test publishes 103 messages to 10 subscribers (expecting 1000 countdown signals). Here is why the routing matters: PUBLISH (Literal) in a 3-primary cluster:
SPUBLISH (Sharded):
SummaryThe performance difference is architectural: References:
|
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 1 out of 1 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…e condition - Add IsRedisCluster() extension that checks for actual Redis Cluster (ServerType.Cluster) only, unlike IsCluster() which also returns true for Twemproxy and sentinel-only configs. Sharded pub/sub (SPUBLISH/ SSUBSCRIBE) is a Redis Cluster-specific feature. - Update Channel property to use IsRedisCluster() instead of IsCluster() - Improve XML docs: clarify this prevents per-primary duplicate delivery, not end-to-end exactly-once semantics - Add remarks explaining why ??= race is benign for readonly struct - Fix RemoveIfEqualAsync_WithNonMatchingValue_DoesNotPublishInvalidation test race condition: SetAsync publishes an InvalidateCache message that can arrive at secondCache after the baseline counter is captured. Fixed by draining pending invalidations before populating secondCache. Co-authored-by: Cursor <cursoragent@cursor.com>
Addressed Copilot Review Feedback + Fixed Build FailureCopilot Review Comments Addressed1.
2. Thread safety of
3. XML doc "exactly-once" is misleading
Build Failure RCA:
|
… iteration - RedisCacheClient: add _isCluster field resolved in constructor, replacing 6 per-call IsCluster() checks on hot paths (RemoveAll, GetAll, SetAll, etc.) - RedisQueue: consolidate IsCluster() into single constructor local for _listPrefix and _topicChannel - RedisMessageBus: standardize on IsCluster(), remove IsRedisCluster(), lazy ??= for channel caching - Remove IsRedisCluster() extension to keep single API - Revert test overrides back to base class delegation Co-authored-by: Cursor <cursoragent@cursor.com>
|
I am not familiar with this lib, so I don't have much context: but, RedisChannnel.Literal optionally supports key-like routing, which gives you the same node-routing as the sharded variants without changing the message kind - If you're seeing a correctness bug: something is wrong. Literal should only connect to one node (whether random or key-like), unless something went very weird with the changes we made for the keyspace/keyevent notifications. As a side note: if this is dealing with hybrid-cache via SE.Redis... I'm kinda amazed I haven't seen it before 🙃 |
|
Observations on
|
Summary
SPUBLISH/SSUBSCRIBE) in Redis Cluster mode to fix duplicate message deliveryLazy<RedisChannel>withRedisChannel?nullable struct for zero-allocation lazy cachingProblem
In Redis Cluster mode,
PUBLISHbroadcasts messages to all nodes. StackExchange.Redis spreadsLiteralsubscriptions across nodes, so each subscriber receives the message once per primary node (3x in a typical 3-master cluster). This causedCanReceiveMessagesConcurrentlyAsyncto fail with too many signals.Solution
When
IsCluster()is true, useRedisChannel.Sharded()instead ofRedisChannel.Literal(). Sharded pub/sub routes all operations for a given channel through a single shard, ensuring exactly-once delivery while preserving full fanout to all subscribers on that channel.The channel type is resolved lazily on first access and cached via
RedisChannel? _channelwith??=. This is:RedisChannelis a struct,Nullable<RedisChannel>is also a value typeIsCluster()is only called on first subscribe/publish, not in the constructorRedisQueue._listPrefixwhich cachesIsCluster()once and never resets (topology does not change on reconnect)References
Test plan
RedisMessageBus.cschanged — no test harness or config modifications