feat: add ShardedPubSub for multi-shard channel subscriptions#3717
feat: add ShardedPubSub for multi-shard channel subscriptions#3717veeceey wants to merge 15 commits into
Conversation
|
Test results: The slot grouping test confirms that ch1-ch9 all hash to different slots, matching the original bug report. With |
|
Thank you @veeceey, we planned to work on the pubsub soon and this comes at just the right time, will review your PR later this week! |
| if err := ps.SSubscribe(ctx, chs...); err != nil { | ||
| return err | ||
| } |
There was a problem hiding this comment.
this error can be related to a network issue, do you think it will be better to find a way to reinitialise the pubsub if that is the case.
There was a problem hiding this comment.
Good point — you're right that a network error here could be transient and we shouldn't just surface it and give up.
I think the approach would be to catch network-level errors (connection refused, timeouts, EOF, etc.) during SSubscribe on the underlying PubSub, and when that happens:
- Close the failed
PubSubfor that shard address - Remove it from the
shardsmap so the next attempt gets a fresh connection - Re-resolve the node address for the affected channels (since the cluster state may have changed)
- Retry the subscribe with a backoff, reusing the cluster's
retryBackoffsettings for consistency
This way transient network blips get retried transparently, and if the node is genuinely down the cluster state refresh in step 3 should route us to the new primary.
For the forwardMessages goroutine, I'd add similar logic — if the channel read returns an error/closes unexpectedly, attempt to re-establish that shard's PubSub and re-subscribe to the channels that were on it, rather than silently dropping messages.
Happy to implement this — I'll model the retry logic after how the existing PubSub.reconnect works internally so it stays consistent with the rest of the library.
| func (s *ShardedPubSub) getOrCreateShard(addr string) *PubSub { | ||
| if ps, ok := s.shards[addr]; ok { | ||
| return ps | ||
| } |
There was a problem hiding this comment.
Without any type of healthcheck here we cannot be sure if this pubsub connection is still opened.
There was a problem hiding this comment.
That's a valid concern. Right now we create the PubSub via s.cluster.pubSub() and hand it off, but there's no ongoing verification that the connection is still alive.
I think the right approach is to add a Ping method on ShardedPubSub that iterates over all shard connections and pings each one. Something like:
func (s *ShardedPubSub) Ping(ctx context.Context) error {
s.mu.Lock()
defer s.mu.Unlock()
for addr, ps := range s.shards {
if err := ps.Ping(ctx); err != nil {
// connection is dead — trigger reconnect for this shard
return fmt.Errorf("shard %s health check failed: %w", addr, err)
}
}
return nil
}On top of that, I could add an optional background health check loop (controlled by an option, disabled by default to avoid surprises) that periodically pings each shard and triggers reinitialization for any that have gone stale. This would pair well with the reconnect logic from the error handling comment — if a health check fails, we close the dead PubSub, re-resolve the node, and re-subscribe.
Makes sense to have at least the manual Ping method in this PR. Want me to add both the manual and the optional background health check, or keep the background one for a follow-up?
There was a problem hiding this comment.
I think we can skip the health check for now and fail (change the underlying connection) if SSUBSCRIBE fails. In practice, making the "healtcheck" reactive than proactive.
There was a problem hiding this comment.
makes sense -- went with the reactive approach. if SSubscribe fails, we close the broken shard, re-resolve the channels, and retry on a fresh connection. no background pinging.
| // ShardedPubSub wraps multiple PubSub connections to support sharded pub/sub | ||
| // across different cluster nodes. In a Redis cluster, channels hash to | ||
| // different slots which may be served by different nodes. A single connection | ||
| // can only receive messages for channels on the shard it is connected to. | ||
| // ShardedPubSub transparently manages one PubSub per shard and multiplexes | ||
| // all messages into a single Go channel. | ||
| type ShardedPubSub struct { |
There was a problem hiding this comment.
Currently I don't see how this ShardedPubSub will react on topology changes (e.g. nodes are moved => shards are on a different address / new nodes are added => shards now split between nodes). Any plans or ideas how to address that with the current implementation?
There was a problem hiding this comment.
Great question — this is the piece I knew would come up and it's definitely the trickiest part.
Currently the shards map is keyed by the node address resolved at subscribe time, and that mapping is completely static after that. If a slot migrates to a different node, we'd keep reading from the old node (which would stop receiving messages for those channels) and never know about it.
Here's my plan for handling this:
1. Hook into cluster state refresh
The ClusterClient already periodically reloads cluster state via state.Get(). After each topology refresh, ShardedPubSub should re-resolve every channel's node address and compare against the current mapping. For any channel whose node has changed:
- Subscribe to the channel on the new node's
PubSub - Unsubscribe from the old node's
PubSub - If an old node's
PubSubhas no remaining channels, close it and remove it from the map
2. Listen for MOVED responses
If any operation returns a MOVED error, that's an immediate signal that topology has shifted. This could trigger an on-demand re-resolve for the affected shard rather than waiting for the next periodic refresh.
3. Notification channel for resharding events
This might be useful for consumers who want to know when migrations happened, but that feels like it could be a follow-up.
I think items 1 and 2 are important to get right in this PR. The implementation would look something like adding a refreshTopology(ctx) method that gets called either periodically (piggy-backing on the cluster's reload interval) or reactively on errors. I'll work on this — would appreciate your input on whether you'd prefer it as a periodic goroutine or event-driven off the cluster state reload.
There was a problem hiding this comment.
@veeceey I do think that we should have the ClusterClient as the source of truth for the topology and that ShardedPubSub should update (or just recalculate, if an update is not needed) its map when ClusterClient updates its topology. Hence, something like an event driven approach would fit best here.
Since the ShardedPubSub is a specific tool used only in the ClusterClient feel free to have a set (slice) of all ShardedPubSubs somehowe registered on the ClusterClient and trigger their topology changes when the ClusterClient updates its topology. It would be way simpler, somehowe limiting (to just covering this case, instead of generic event channel), but a good balance in my mind.
Let me know if you find any problems with this approach or thing another one fits better, I am willing to review alternatives as well.
There was a problem hiding this comment.
that makes a lot of sense — having the ClusterClient own the topology and push updates to registered ShardedPubSubs is way cleaner than each pubsub polling independently. I like the simplicity of a slice of registered instances on the ClusterClient.
I'm thinking something like:
- add a
shardedPubSubs []*ShardedPubSubfield on ClusterClient (protected by the existing mutex) - ShardedPubSub registers itself on creation, deregisters on Close
- when ClusterClient refreshes topology (in
reloadStateor wherever that happens), it calls aonTopologyChange()method on each registered ShardedPubSub - that method re-resolves channels to slots/nodes and migrates subscriptions as needed
I don't see any issues with this approach — it keeps things scoped to exactly this use case without over-engineering a generic event system. I'll start working on it and push an update. appreciate the guidance!
|
friendly ping on this PR |
|
@veeceey Thank you for pinging me. I do think the discussed approaches sound good. Would you like to implement them in this PR? Maybe I am missing something, I am not seeing them committed here? |
|
Hey @ndyakov, just pushed the implementation! Here's what's in: Topology change handling:
Reactive reconnect (no proactive health check, as you suggested):
Tests:
All existing tests pass, |
|
Good catches from the bugbot review — just pushed fixes for all three issues: Close race (send-on-closed-channel): Added a Channel opts ignored: Deadlock in SSubscribe/onTopologyChange: Moved All existing tests pass, |
| func (c *ClusterClient) registerShardedPubSub(sps *ShardedPubSub) { | ||
| c.spsMu.Lock() | ||
| defer c.spsMu.Unlock() | ||
| c.shardedPubSubs = append(c.shardedPubSubs, sps) | ||
| } | ||
|
|
||
| // deregisterShardedPubSub removes a ShardedPubSub from the notification list. | ||
| func (c *ClusterClient) deregisterShardedPubSub(sps *ShardedPubSub) { | ||
| c.spsMu.Lock() | ||
| defer c.spsMu.Unlock() | ||
| for i, s := range c.shardedPubSubs { | ||
| if s == sps { | ||
| c.shardedPubSubs = append(c.shardedPubSubs[:i], c.shardedPubSubs[i+1:]...) | ||
| return | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
a slight optimization with a map (or even sync.Map) can be done here, but I think this is fine for now. I don't think we can expect such number of sharded pubsubs that looping over them may be a problem.
Add ShardedPubSub type that manages multiple PubSub connections to
support subscribing to shard channels across different cluster nodes.
In Redis cluster, SSUBSCRIBE channels hash to different slots which may
be served by different nodes. The existing PubSub uses a single
connection and only receives messages from the first channel's shard.
ShardedPubSub groups channels by slot, maintains one PubSub per node,
and multiplexes messages into a single Go channel.
Usage:
sps := rdb.SSubscribeSharded(ctx, "ch1", "ch2", "ch3")
defer sps.Close()
for msg := range sps.Channel() { ... }
The existing SSubscribe method is unchanged for backward compatibility.
Fixes redis#3133
…ubSub - ShardedPubSub instances register with ClusterClient and get notified on topology changes (via clusterStateHolder.onReload callback) - onTopologyChange re-resolves all channel->node mappings and migrates subscriptions when slots move to different nodes - Reactive reconnect on SSubscribe failure: closes failed shard, re-resolves channels, and retries on fresh connection (skipping proactive health checks) - ShardedPubSub deregisters from ClusterClient on Close
- Fix send-on-closed-channel panic in Close(): use sync.WaitGroup to wait for all forwarder goroutines to exit before closing msgCh - Fix deadlock: move nodeAddrForChannel calls outside mutex in both SSubscribe and onTopologyChange, since address resolution can trigger cluster state reload -> onReload -> onTopologyChange -> s.mu.Lock - Fix Channel() ignoring ChannelOption parameters: forward opts to underlying PubSub shards and use configured chanSize for buffer
cca6caa to
c5cde5d
Compare
ndyakov
left a comment
There was a problem hiding this comment.
@veeceey Thank you for working on this and being patient about my review. It took some time to get my head around what we would like to do for v10 and how this will fit. I did leave some comments (few of which I do consider blockers). Let me know what you think and if you would like to continue working on it.
| ps := s.getOrCreateShard(addr) | ||
| if err := ps.SSubscribe(ctx, chs...); err != nil { | ||
| // Reactive reconnect: close failed shard, re-resolve, and retry once. | ||
| if reconnErr := s.resubscribeShard(ctx, addr); reconnErr != nil { |
There was a problem hiding this comment.
resubscribeShard is called while holding s.mu, but internally it calls nodeAddrForChannel, which calls slotMasterNode, which calls state.Get.
- If the cluster state is
nil, Get callsReloadwhich will eventually callnotifyShardedPubSubs, which callsonTopologyChangetrying to acquires.mu.
Release the lock before the retry, similar to how the main SSubscribe flow already does it for address resolution.
There was a problem hiding this comment.
good catch — you're right, resubscribeShard holding s.mu while calling nodeAddrForChannel creates the same deadlock path as the original SSubscribe issue. I'll release the lock before the retry and re-acquire after, same pattern as the main SSubscribe flow.
| // Apply migrations under the lock. | ||
| s.mu.Lock() | ||
| defer s.mu.Unlock() |
There was a problem hiding this comment.
since we will hold the lock for applying the migration, does it make sense to verify the channels still exist when subscribing?
if _, exists := s.chanShard[ch]; exists {
s.chanShard[ch] = newAddr
}
There was a problem hiding this comment.
yep, makes total sense — if someone unsubscribed while we were resolving, we shouldn't silently re-add it. will add the existence check before updating chanShard.
| // Deregister from the cluster client. | ||
| s.cluster.deregisterShardedPubSub(s) |
There was a problem hiding this comment.
I think we can deregister before acquiring the s.mu.Lock()
There was a problem hiding this comment.
agreed — deregistering doesn't need the lock and doing it first avoids holding s.mu longer than necessary. will move it up.
| if c.onReload != nil { | ||
| c.onReload() | ||
| } |
There was a problem hiding this comment.
Can this be async? maybe with validating that it is not triggered multiple times. If you look at LazyReload it has a mechanism for executing the latest reload, maybe something similar here would help.
There was a problem hiding this comment.
yeah, making it async would avoid blocking the reload path. I'll look at LazyReload's dedup mechanism and model it similarly — signal a goroutine to do the migration instead of doing it inline, with a check to skip if one's already in-flight.
| // can only receive messages for channels on the shard it is connected to. | ||
| // ShardedPubSub transparently manages one PubSub per shard and multiplexes | ||
| // all messages into a single Go channel. | ||
| type ShardedPubSub struct { |
There was a problem hiding this comment.
The existing PubSub supports Ping, ReceiveMessage and ChannelWithSubscriptions. ShardedPubSub only exposes Channel(). Can you either add them for feature parity or document that they are intentionally not supported? I would like to have a PubSub interface in the future (for v10 for example), where we can return the same Interface for the pubsub and sharded pubsub.
There was a problem hiding this comment.
that's a fair point for v10 interface parity. I'll add Ping and ReceiveMessage methods on ShardedPubSub — Ping iterates over shards and pings each, ReceiveMessage reads from the merged channel. ChannelWithSubscriptions I can stub for now with a TODO since the subscription tracking is a bit different with sharded channels. Will document what's supported vs not.
| if s.cluster.opt.ReadOnly { | ||
| state, err := s.cluster.state.Get(ctx) | ||
| if err != nil { | ||
| return "", err | ||
| } | ||
| node, err := s.cluster.slotReadOnlyNode(state, slot) | ||
| if err != nil { | ||
| return "", err | ||
| } | ||
| return node.Client.opt.Addr, nil | ||
| } |
There was a problem hiding this comment.
While theoretically we should connect to the replica on read only, I am wondering if we want to? The master will receive the message first. I will validate if this is needed at all or should be changed, but for now let's keep it as it is.
There was a problem hiding this comment.
sounds good, I'll leave it as-is for now then. makes sense to validate whether replica reads are actually useful for pubsub before changing it.
| if ps, ok := s.shards[oldAddr]; ok { | ||
| _ = ps.SUnsubscribe(ctx, channels...) | ||
| } | ||
|
|
||
| // Subscribe on new shard. | ||
| ps := s.getOrCreateShard(newAddr) | ||
| _ = ps.SSubscribe(ctx, channels...) | ||
|
|
||
| // Update channel->shard mapping. | ||
| for _, ch := range channels { | ||
| s.chanShard[ch] = newAddr | ||
| } |
There was a problem hiding this comment.
If SSubscribe fails (error is swallowed with _), the channel is now:
- Unsubscribed from the old shard
- NOT subscribed on the new shard
- BUT chanShard is updated to point to the new shard
The channel is effectively lost - no messages will be received, and the code thinks everything is fine.
Can we update the mapping ONLY on successful SSubscribe here?
There was a problem hiding this comment.
you're right, that's a nasty silent failure. I'll restructure so we only update chanShard[ch] = newAddr after SSubscribe succeeds. if it fails, we keep the old mapping so at least the channel isn't orphaned — and we can log or return the error so it's visible.
…rface parity - resubscribeShard: release s.mu before nodeAddrForChannel to avoid deadlock (state.Get -> Reload -> onReload -> onTopologyChange -> s.mu) - onTopologyChange migration: only update chanShard on successful SSubscribe, check channel still exists before updating mapping - Close: deregister from ClusterClient before acquiring s.mu - notifyShardedPubSubs: make async with CAS guard to avoid blocking the reload path, similar to LazyReload's dedup mechanism - Add Ping, ReceiveMessage, ReceiveTimeout methods for interface parity with PubSub (prep for v10 PubSub interface)
|
pushed all the fixes from the review:
all tests pass, go vet clean. let me know if there's anything else! |
|
@veeceey Hey, do you expect to continue the work on this one or we should take over? |
# Conflicts: # pubsub.go
…mapping, partial resubscribe orphans, coalesced topology notifications)
…l lifecycle, and notify coalescing
There was a problem hiding this comment.
Pull request overview
Adds first-class support for sharded pub/sub subscriptions that span multiple hash slots/nodes in Redis Cluster by introducing a ShardedPubSub multiplexer and wiring it into ClusterClient topology reloads, while keeping existing SSubscribe behavior intact for backward compatibility.
Changes:
- Introduces
ShardedPubSubto manage one underlyingPubSubper shard node and merge all messages into a singleChannel(). - Adds
ClusterClient.SSubscribeShardedplus a coalesced async notifier triggered after successful cluster state reloads to migrate subscriptions on topology changes. - Adds unit tests covering lifecycle, buffering options, registration/deregistration, and notifier coalescing behavior.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| sharded_pubsub.go | New ShardedPubSub implementation: per-node shard connections, message multiplexing, and topology-change migration logic. |
| sharded_pubsub_test.go | New unit tests for ShardedPubSub lifecycle, options, bookkeeping, and notifier coalescing. |
| osscluster.go | Adds cluster reload hook + sharded-pubsub registration/notification plumbing; documents SSubscribe limitation and introduces SSubscribeSharded. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| func (s *ShardedPubSub) SUnsubscribe(ctx context.Context, channels ...string) error { | ||
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
|
|
||
| if s.closed { | ||
| return pool.ErrClosed | ||
| } | ||
|
|
||
| // Group channels by their known shard. The mapping is intentionally left | ||
| // in place here and only removed after a successful unsubscribe below, so | ||
| // that a failed SUnsubscribe does not desync our bookkeeping from the | ||
| // server (where the subscription and its forwarder may still be active). | ||
| groups := make(map[string][]string) | ||
| for _, ch := range channels { | ||
| if addr, ok := s.chanShard[ch]; ok { | ||
| groups[addr] = append(groups[addr], ch) | ||
| } | ||
| } |
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
|
|
||
| // Determine the channel buffer size. We default to 100 but respect | ||
| // WithChannelSize if provided. We apply all options to a single probe | ||
| // (matching newChannel in pubsub.go) so later options don't reset the | ||
| // values set by earlier ones. | ||
| size := 100 |
| // Now that the new shard is subscribed, unsubscribe from the old | ||
| // one so we never have a window with no active subscription. | ||
| if oldPS, ok := s.shards[oldAddr]; ok { | ||
| _ = oldPS.SUnsubscribe(ctx, channels...) | ||
| } |
| // Drop the mapping only after the unsubscribe succeeded (or when there | ||
| // is no live shard connection to unsubscribe from). | ||
| for _, ch := range chs { | ||
| delete(s.chanShard, ch) | ||
| } | ||
| } |
| // SSubscribeSharded subscribes the client to the specified shard channels, | ||
| // automatically managing connections to all relevant cluster nodes. | ||
| // Unlike SSubscribe, this correctly handles channels that hash to different | ||
| // slots by maintaining one connection per shard node. | ||
| func (c *ClusterClient) SSubscribeSharded(ctx context.Context, channels ...string) *ShardedPubSub { |
…fecycle - Group SSUBSCRIBE/SUNSUBSCRIBE by hash slot to avoid CROSSSLOT (fixes redis#3133) - Tolerate per-channel resolution failure in SSubscribe - Fix Close()/Channel() double-close race and ReceiveMessage msgCh data race - Move Ping network I/O off the mutex - Add cross-slot cluster integration test; document health/reconnect + initial-error semantics
resubscribeShard aborted on the first nodeAddrForChannel error, abandoning every other channel that was on the failed shard (the SSubscribe caller then dropped all their mappings). Mirror the tolerance pattern already used in SSubscribe: record the first resolution error, continue, and re-subscribe every channel that did resolve, returning the first error to the caller.
Addresses unresolved review comments on redis#3717: - PubSub.SSubscribe/SUnsubscribe now only update schannels for the slot-groups whose SSUBSCRIBE/SUNSUBSCRIBE command actually succeeded. subscribeSharded reports the failed channels so a partial multi-slot batch no longer desyncs local state from the server (Cursor: partial slot schannels mismatch). - ShardedPubSub.SUnsubscribe attempts every shard group instead of returning on the first failure, so a partial multi-shard unsubscribe no longer leaves later groups subscribed-and-tracked while earlier groups are gone, and reports the first error (Cursor: partial multi-shard unsubscribe failure). - resubscribeShard drops the mappings of channels it could not re-subscribe (failed resolution or failed SSUBSCRIBE) instead of leaving them pointing at the removed shard with no live connection, fixing orphaned channels from earlier SSubscribe calls (Cursor: orphan channels after resubscribe failure). - onTopologyChange rebuilds the old shard when its SUnsubscribe fails instead of leaving duplicate active subscriptions on old+new nodes (Copilot: onTopologyChange ignores old SUnsubscribe errors). Adds unit tests covering partial-failure SUnsubscribe across shards and schannels tracking on subscribe/unsubscribe success and failure. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Reviewed by Cursor Bugbot for commit 0913edd. Configure here.
The cleanup sweep after a failed resubscribeShard deleted every chanShard entry pointing at failedAddr. Because resubscribeShard releases s.mu while resolving node addresses, a concurrent onTopologyChange/SSubscribe can re-add or migrate channels (or reuse failedAddr for brand-new channels) during that window; the blanket sweep would silently orphan those. Scope the sweep to the channels this call collected, skip the ones it recovered, and only drop entries that still point at failedAddr.
0913edd to
55f8a03
Compare
| newConn: func(ctx context.Context, addr string, channels []string) (*pool.Conn, error) { | ||
| return cn, nil | ||
| }, | ||
| closeConn: func(c *pool.Conn) error { return clientConn.Close() }, |
| } | ||
|
|
||
| func TestPubSubSUnsubscribeDropsSchannelsOnSuccess(t *testing.T) { | ||
| ps := newNoopPubSub() |

Fixes #3133
When using sharded pub/sub in a Redis cluster, channels hash to different slots served by different nodes. The current
SSubscribecreates a singlePubSubconnection that only connects to the shard of the first channel, so messages from channels on other shards are never received.This adds a
ShardedPubSubtype and aClusterClient.SSubscribeShardedmethod that properly handles this. It:PubSubconnection per shard nodeChannel()for consumptionThe existing
SSubscribereturning*PubSubis unchanged for backward compatibility. Added a doc comment noting the single-shard limitation and pointing toSSubscribeSharded.Tests confirm ch1-ch9 hash to 9 different slots (matching the bug report).
Note
Medium Risk
New concurrent cluster pub/sub paths and topology migration logic can affect subscription correctness during resharding; existing SSubscribe API behavior is preserved but sharded command batching changes failure/partial-success semantics.
Overview
Adds
ShardedPubSubandClusterClient.SSubscribeShardedso cluster sharded pub/sub can subscribe to channels on multiple hash slots/nodes at once. The type keeps onePubSubper shard, routes channels by slot, merges delivery through a singleChannel(), and migrates subscriptions on cluster topology reload (wired viaclusterStateHolder.onReloadand coalescednotifyShardedPubSubs).PubSubsharded subscribe/unsubscribe now batches channels per slot (avoids CROSSSLOT) and only updates localschannelsfor slot groups that succeeded on the server.SSubscribeonClusterClientis unchanged but documented as single-shard; callers needing cross-shard coverage should useSSubscribeSharded.Also adds an
example/sharded-pubsubcluster demo (publish/receive stats, topology monitoring) plus unit and integration tests for multi-shard delivery and error/bookkeeping paths.Reviewed by Cursor Bugbot for commit e57a51a. Bugbot is set up for automated code reviews on this repo. Configure here.