Skip to content

feat: add ShardedPubSub for multi-shard channel subscriptions#3717

Open
veeceey wants to merge 15 commits into
redis:masterfrom
veeceey:fix/issue-3133-sharded-pubsub-multi-shard
Open

feat: add ShardedPubSub for multi-shard channel subscriptions#3717
veeceey wants to merge 15 commits into
redis:masterfrom
veeceey:fix/issue-3133-sharded-pubsub-multi-shard

Conversation

@veeceey

@veeceey veeceey commented Feb 23, 2026

Copy link
Copy Markdown
Contributor

Fixes #3133

When using sharded pub/sub in a Redis cluster, channels hash to different slots served by different nodes. The current SSubscribe creates a single PubSub connection that only connects to the shard of the first channel, so messages from channels on other shards are never received.

This adds a ShardedPubSub type and a ClusterClient.SSubscribeSharded method that properly handles this. It:

  • Groups channels by their hash slot to determine which cluster node serves each
  • Maintains one PubSub connection per shard node
  • Multiplexes messages from all shards into a single Channel() for consumption
sps := rdb.SSubscribeSharded(ctx, "ch1", "ch2", "ch3")
defer sps.Close()

// Add more channels later - routed to correct shard automatically
sps.SSubscribe(ctx, "ch4", "ch5")

for msg := range sps.Channel() {
    fmt.Println(msg.Channel, msg.Payload)
}

The existing SSubscribe returning *PubSub is unchanged for backward compatibility. Added a doc comment noting the single-shard limitation and pointing to SSubscribeSharded.

Tests confirm ch1-ch9 hash to 9 different slots (matching the bug report).


Note

Medium Risk
New concurrent cluster pub/sub paths and topology migration logic can affect subscription correctness during resharding; existing SSubscribe API behavior is preserved but sharded command batching changes failure/partial-success semantics.

Overview
Adds ShardedPubSub and ClusterClient.SSubscribeSharded so cluster sharded pub/sub can subscribe to channels on multiple hash slots/nodes at once. The type keeps one PubSub per shard, routes channels by slot, merges delivery through a single Channel(), and migrates subscriptions on cluster topology reload (wired via clusterStateHolder.onReload and coalesced notifyShardedPubSubs).

PubSub sharded subscribe/unsubscribe now batches channels per slot (avoids CROSSSLOT) and only updates local schannels for slot groups that succeeded on the server. SSubscribe on ClusterClient is unchanged but documented as single-shard; callers needing cross-shard coverage should use SSubscribeSharded.

Also adds an example/sharded-pubsub cluster demo (publish/receive stats, topology monitoring) plus unit and integration tests for multi-shard delivery and error/bookkeeping paths.

Reviewed by Cursor Bugbot for commit e57a51a. Bugbot is set up for automated code reviews on this repo. Configure here.

@veeceey

veeceey commented Feb 23, 2026

Copy link
Copy Markdown
Contributor Author

Test results:

$ go build ./...
(clean, no errors)

$ go vet ./...
(clean)

$ go test -run "TestShardedPubSub" -count=1 -v .
=== RUN   TestShardedPubSubChannelGrouping
    sharded_pubsub_test.go:25: channels hash to 9 different slots:
    map[1548:[ch7] 1672:[ch3] 5677:[ch6] 5801:[ch2] 9806:[ch5] 9930:[ch1] 10178:[ch9] 13935:[ch4] 14307:[ch8]]
--- PASS: TestShardedPubSubChannelGrouping (0.00s)
=== RUN   TestShardedPubSubNewAndClose
--- PASS: TestShardedPubSubNewAndClose (0.00s)
=== RUN   TestShardedPubSubSSubscribeWhenClosed
--- PASS: TestShardedPubSubSSubscribeWhenClosed (0.00s)
PASS

$ go test -run "TestUnit" -count=1 ./...
(all existing tests pass)

The slot grouping test confirms that ch1-ch9 all hash to different slots, matching the original bug report. With ShardedPubSub, each gets its own connection to the correct node.

@ndyakov

ndyakov commented Feb 23, 2026

Copy link
Copy Markdown
Member

Thank you @veeceey, we planned to work on the pubsub soon and this comes at just the right time, will review your PR later this week!

@ndyakov ndyakov left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@veeceey I did leave a short review and would like for us to discuss and improve this implementation a bit. Let me know if you need any help in doing so.

Comment thread sharded_pubsub.go
Comment on lines +106 to +108
if err := ps.SSubscribe(ctx, chs...); err != nil {
return err
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this error can be related to a network issue, do you think it will be better to find a way to reinitialise the pubsub if that is the case.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point — you're right that a network error here could be transient and we shouldn't just surface it and give up.

I think the approach would be to catch network-level errors (connection refused, timeouts, EOF, etc.) during SSubscribe on the underlying PubSub, and when that happens:

  1. Close the failed PubSub for that shard address
  2. Remove it from the shards map so the next attempt gets a fresh connection
  3. Re-resolve the node address for the affected channels (since the cluster state may have changed)
  4. Retry the subscribe with a backoff, reusing the cluster's retryBackoff settings for consistency

This way transient network blips get retried transparently, and if the node is genuinely down the cluster state refresh in step 3 should route us to the new primary.

For the forwardMessages goroutine, I'd add similar logic — if the channel read returns an error/closes unexpectedly, attempt to re-establish that shard's PubSub and re-subscribe to the channels that were on it, rather than silently dropping messages.

Happy to implement this — I'll model the retry logic after how the existing PubSub.reconnect works internally so it stays consistent with the rest of the library.

Comment thread sharded_pubsub.go
func (s *ShardedPubSub) getOrCreateShard(addr string) *PubSub {
if ps, ok := s.shards[addr]; ok {
return ps
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without any type of healthcheck here we cannot be sure if this pubsub connection is still opened.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a valid concern. Right now we create the PubSub via s.cluster.pubSub() and hand it off, but there's no ongoing verification that the connection is still alive.

I think the right approach is to add a Ping method on ShardedPubSub that iterates over all shard connections and pings each one. Something like:

func (s *ShardedPubSub) Ping(ctx context.Context) error {
    s.mu.Lock()
    defer s.mu.Unlock()

    for addr, ps := range s.shards {
        if err := ps.Ping(ctx); err != nil {
            // connection is dead — trigger reconnect for this shard
            return fmt.Errorf("shard %s health check failed: %w", addr, err)
        }
    }
    return nil
}

On top of that, I could add an optional background health check loop (controlled by an option, disabled by default to avoid surprises) that periodically pings each shard and triggers reinitialization for any that have gone stale. This would pair well with the reconnect logic from the error handling comment — if a health check fails, we close the dead PubSub, re-resolve the node, and re-subscribe.

Makes sense to have at least the manual Ping method in this PR. Want me to add both the manual and the optional background health check, or keep the background one for a follow-up?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can skip the health check for now and fail (change the underlying connection) if SSUBSCRIBE fails. In practice, making the "healtcheck" reactive than proactive.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense -- went with the reactive approach. if SSubscribe fails, we close the broken shard, re-resolve the channels, and retry on a fresh connection. no background pinging.

Comment thread sharded_pubsub.go
Comment on lines +11 to +17
// ShardedPubSub wraps multiple PubSub connections to support sharded pub/sub
// across different cluster nodes. In a Redis cluster, channels hash to
// different slots which may be served by different nodes. A single connection
// can only receive messages for channels on the shard it is connected to.
// ShardedPubSub transparently manages one PubSub per shard and multiplexes
// all messages into a single Go channel.
type ShardedPubSub struct {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently I don't see how this ShardedPubSub will react on topology changes (e.g. nodes are moved => shards are on a different address / new nodes are added => shards now split between nodes). Any plans or ideas how to address that with the current implementation?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question — this is the piece I knew would come up and it's definitely the trickiest part.

Currently the shards map is keyed by the node address resolved at subscribe time, and that mapping is completely static after that. If a slot migrates to a different node, we'd keep reading from the old node (which would stop receiving messages for those channels) and never know about it.

Here's my plan for handling this:

1. Hook into cluster state refresh
The ClusterClient already periodically reloads cluster state via state.Get(). After each topology refresh, ShardedPubSub should re-resolve every channel's node address and compare against the current mapping. For any channel whose node has changed:

  • Subscribe to the channel on the new node's PubSub
  • Unsubscribe from the old node's PubSub
  • If an old node's PubSub has no remaining channels, close it and remove it from the map

2. Listen for MOVED responses
If any operation returns a MOVED error, that's an immediate signal that topology has shifted. This could trigger an on-demand re-resolve for the affected shard rather than waiting for the next periodic refresh.

3. Notification channel for resharding events
This might be useful for consumers who want to know when migrations happened, but that feels like it could be a follow-up.

I think items 1 and 2 are important to get right in this PR. The implementation would look something like adding a refreshTopology(ctx) method that gets called either periodically (piggy-backing on the cluster's reload interval) or reactively on errors. I'll work on this — would appreciate your input on whether you'd prefer it as a periodic goroutine or event-driven off the cluster state reload.

@ndyakov ndyakov Feb 28, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@veeceey I do think that we should have the ClusterClient as the source of truth for the topology and that ShardedPubSub should update (or just recalculate, if an update is not needed) its map when ClusterClient updates its topology. Hence, something like an event driven approach would fit best here.

Since the ShardedPubSub is a specific tool used only in the ClusterClient feel free to have a set (slice) of all ShardedPubSubs somehowe registered on the ClusterClient and trigger their topology changes when the ClusterClient updates its topology. It would be way simpler, somehowe limiting (to just covering this case, instead of generic event channel), but a good balance in my mind.

Let me know if you find any problems with this approach or thing another one fits better, I am willing to review alternatives as well.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes a lot of sense — having the ClusterClient own the topology and push updates to registered ShardedPubSubs is way cleaner than each pubsub polling independently. I like the simplicity of a slice of registered instances on the ClusterClient.

I'm thinking something like:

  • add a shardedPubSubs []*ShardedPubSub field on ClusterClient (protected by the existing mutex)
  • ShardedPubSub registers itself on creation, deregisters on Close
  • when ClusterClient refreshes topology (in reloadState or wherever that happens), it calls a onTopologyChange() method on each registered ShardedPubSub
  • that method re-resolves channels to slots/nodes and migrates subscriptions as needed

I don't see any issues with this approach — it keeps things scoped to exactly this use case without over-engineering a generic event system. I'll start working on it and push an update. appreciate the guidance!

@veeceey

veeceey commented Mar 12, 2026

Copy link
Copy Markdown
Contributor Author

friendly ping on this PR

@ndyakov

ndyakov commented Mar 17, 2026

Copy link
Copy Markdown
Member

@veeceey Thank you for pinging me. I do think the discussed approaches sound good. Would you like to implement them in this PR? Maybe I am missing something, I am not seeing them committed here?

@veeceey

veeceey commented Mar 18, 2026

Copy link
Copy Markdown
Contributor Author

Hey @ndyakov, just pushed the implementation! Here's what's in:

Topology change handling:

  • ShardedPubSub registers itself with the ClusterClient on creation, deregisters on Close
  • Added an onReload callback to clusterStateHolder — after each successful state reload, ClusterClient calls onTopologyChange() on all registered ShardedPubSubs
  • onTopologyChange() re-resolves every channel's node address and migrates subscriptions when slots have moved (unsubscribe from old node, subscribe on new, close empty shards)

Reactive reconnect (no proactive health check, as you suggested):

  • If SSubscribe fails on a shard, it closes the failed connection, re-resolves the channels, and retries on a fresh connection
  • No background pinging — we only reconnect when something actually fails

Tests:

  • Added TestShardedPubSubRegistration — verifies register on create, deregister on close, and correct bookkeeping

All existing tests pass, go vet clean. Let me know if you'd like anything adjusted!

Comment thread sharded_pubsub.go
Comment thread sharded_pubsub.go
Comment thread sharded_pubsub.go
@veeceey

veeceey commented Mar 18, 2026

Copy link
Copy Markdown
Contributor Author

Good catches from the bugbot review — just pushed fixes for all three issues:

Close race (send-on-closed-channel): Added a sync.WaitGroup to track forwarder goroutines. Close() now waits for all forwarders to exit before closing msgCh, preventing the panic.

Channel opts ignored: Channel(opts...) now extracts the configured chanSize from the options and forwards them to each underlying PubSub shard's Channel() call. Options are also stored so newly created shards get the same config.

Deadlock in SSubscribe/onTopologyChange: Moved nodeAddrForChannel calls outside the mutex in both SSubscribe and onTopologyChange. Address resolution can trigger state.Get -> Reload -> onReload -> notifyShardedPubSubs -> onTopologyChange, which needs s.mu — holding it during resolution would deadlock. Both methods now resolve addresses first, then acquire the lock for state mutations.

All existing tests pass, go vet clean.

ndyakov
ndyakov previously approved these changes Mar 18, 2026

@ndyakov ndyakov left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@veeceey Thank you! Overall this PR looks good. I will do a more in-depth review in the following days.

Comment thread osscluster.go
Comment on lines +1199 to +1215
func (c *ClusterClient) registerShardedPubSub(sps *ShardedPubSub) {
c.spsMu.Lock()
defer c.spsMu.Unlock()
c.shardedPubSubs = append(c.shardedPubSubs, sps)
}

// deregisterShardedPubSub removes a ShardedPubSub from the notification list.
func (c *ClusterClient) deregisterShardedPubSub(sps *ShardedPubSub) {
c.spsMu.Lock()
defer c.spsMu.Unlock()
for i, s := range c.shardedPubSubs {
if s == sps {
c.shardedPubSubs = append(c.shardedPubSubs[:i], c.shardedPubSubs[i+1:]...)
return
}
}
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a slight optimization with a map (or even sync.Map) can be done here, but I think this is fine for now. I don't think we can expect such number of sharded pubsubs that looping over them may be a problem.

veeceey added 3 commits March 24, 2026 21:34
Add ShardedPubSub type that manages multiple PubSub connections to
support subscribing to shard channels across different cluster nodes.

In Redis cluster, SSUBSCRIBE channels hash to different slots which may
be served by different nodes. The existing PubSub uses a single
connection and only receives messages from the first channel's shard.
ShardedPubSub groups channels by slot, maintains one PubSub per node,
and multiplexes messages into a single Go channel.

Usage:
  sps := rdb.SSubscribeSharded(ctx, "ch1", "ch2", "ch3")
  defer sps.Close()
  for msg := range sps.Channel() { ... }

The existing SSubscribe method is unchanged for backward compatibility.

Fixes redis#3133
…ubSub

- ShardedPubSub instances register with ClusterClient and get notified
  on topology changes (via clusterStateHolder.onReload callback)
- onTopologyChange re-resolves all channel->node mappings and migrates
  subscriptions when slots move to different nodes
- Reactive reconnect on SSubscribe failure: closes failed shard, re-resolves
  channels, and retries on fresh connection (skipping proactive health checks)
- ShardedPubSub deregisters from ClusterClient on Close
- Fix send-on-closed-channel panic in Close(): use sync.WaitGroup to
  wait for all forwarder goroutines to exit before closing msgCh
- Fix deadlock: move nodeAddrForChannel calls outside mutex in both
  SSubscribe and onTopologyChange, since address resolution can trigger
  cluster state reload -> onReload -> onTopologyChange -> s.mu.Lock
- Fix Channel() ignoring ChannelOption parameters: forward opts to
  underlying PubSub shards and use configured chanSize for buffer
@veeceey veeceey force-pushed the fix/issue-3133-sharded-pubsub-multi-shard branch from cca6caa to c5cde5d Compare March 25, 2026 04:34
Comment thread sharded_pubsub.go

@ndyakov ndyakov left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@veeceey Thank you for working on this and being patient about my review. It took some time to get my head around what we would like to do for v10 and how this will fit. I did leave some comments (few of which I do consider blockers). Let me know what you think and if you would like to continue working on it.

Comment thread sharded_pubsub.go
ps := s.getOrCreateShard(addr)
if err := ps.SSubscribe(ctx, chs...); err != nil {
// Reactive reconnect: close failed shard, re-resolve, and retry once.
if reconnErr := s.resubscribeShard(ctx, addr); reconnErr != nil {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resubscribeShard is called while holding s.mu, but internally it calls nodeAddrForChannel, which calls slotMasterNode, which calls state.Get.

  • If the cluster state is nil, Get calls Reload which will eventually call notifyShardedPubSubs, which calls onTopologyChange trying to acquire s.mu.

Release the lock before the retry, similar to how the main SSubscribe flow already does it for address resolution.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch — you're right, resubscribeShard holding s.mu while calling nodeAddrForChannel creates the same deadlock path as the original SSubscribe issue. I'll release the lock before the retry and re-acquire after, same pattern as the main SSubscribe flow.

Comment thread sharded_pubsub.go
Comment on lines +258 to +260
// Apply migrations under the lock.
s.mu.Lock()
defer s.mu.Unlock()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we will hold the lock for applying the migration, does it make sense to verify the channels still exist when subscribing?

    if _, exists := s.chanShard[ch]; exists {
        s.chanShard[ch] = newAddr
    }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, makes total sense — if someone unsubscribed while we were resolving, we shouldn't silently re-add it. will add the existence check before updating chanShard.

Comment thread sharded_pubsub.go Outdated
Comment on lines +349 to +350
// Deregister from the cluster client.
s.cluster.deregisterShardedPubSub(s)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can deregister before acquiring the s.mu.Lock()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed — deregistering doesn't need the lock and doing it first avoids holding s.mu longer than necessary. will move it up.

Comment thread osscluster.go
Comment on lines +1064 to +1066
if c.onReload != nil {
c.onReload()
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be async? maybe with validating that it is not triggered multiple times. If you look at LazyReload it has a mechanism for executing the latest reload, maybe something similar here would help.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, making it async would avoid blocking the reload path. I'll look at LazyReload's dedup mechanism and model it similarly — signal a goroutine to do the migration instead of doing it inline, with a check to skip if one's already in-flight.

Comment thread sharded_pubsub.go
// can only receive messages for channels on the shard it is connected to.
// ShardedPubSub transparently manages one PubSub per shard and multiplexes
// all messages into a single Go channel.
type ShardedPubSub struct {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing PubSub supports Ping, ReceiveMessage and ChannelWithSubscriptions. ShardedPubSub only exposes Channel(). Can you either add them for feature parity or document that they are intentionally not supported? I would like to have a PubSub interface in the future (for v10 for example), where we can return the same Interface for the pubsub and sharded pubsub.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a fair point for v10 interface parity. I'll add Ping and ReceiveMessage methods on ShardedPubSub — Ping iterates over shards and pings each, ReceiveMessage reads from the merged channel. ChannelWithSubscriptions I can stub for now with a TODO since the subscription tracking is a bit different with sharded channels. Will document what's supported vs not.

Comment thread sharded_pubsub.go
Comment on lines +48 to +58
if s.cluster.opt.ReadOnly {
state, err := s.cluster.state.Get(ctx)
if err != nil {
return "", err
}
node, err := s.cluster.slotReadOnlyNode(state, slot)
if err != nil {
return "", err
}
return node.Client.opt.Addr, nil
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While theoretically we should connect to the replica on read only, I am wondering if we want to? The master will receive the message first. I will validate if this is needed at all or should be changed, but for now let's keep it as it is.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, I'll leave it as-is for now then. makes sense to validate whether replica reads are actually useful for pubsub before changing it.

Comment thread sharded_pubsub.go Outdated
Comment on lines +269 to +280
if ps, ok := s.shards[oldAddr]; ok {
_ = ps.SUnsubscribe(ctx, channels...)
}

// Subscribe on new shard.
ps := s.getOrCreateShard(newAddr)
_ = ps.SSubscribe(ctx, channels...)

// Update channel->shard mapping.
for _, ch := range channels {
s.chanShard[ch] = newAddr
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If SSubscribe fails (error is swallowed with _), the channel is now:

  • Unsubscribed from the old shard
  • NOT subscribed on the new shard
  • BUT chanShard is updated to point to the new shard
    The channel is effectively lost - no messages will be received, and the code thinks everything is fine.

Can we update the mapping ONLY on successful SSubscribe here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right, that's a nasty silent failure. I'll restructure so we only update chanShard[ch] = newAddr after SSubscribe succeeds. if it fails, we keep the old mapping so at least the channel isn't orphaned — and we can log or return the error so it's visible.

…rface parity

- resubscribeShard: release s.mu before nodeAddrForChannel to avoid
  deadlock (state.Get -> Reload -> onReload -> onTopologyChange -> s.mu)
- onTopologyChange migration: only update chanShard on successful
  SSubscribe, check channel still exists before updating mapping
- Close: deregister from ClusterClient before acquiring s.mu
- notifyShardedPubSubs: make async with CAS guard to avoid blocking
  the reload path, similar to LazyReload's dedup mechanism
- Add Ping, ReceiveMessage, ReceiveTimeout methods for interface
  parity with PubSub (prep for v10 PubSub interface)
@veeceey

veeceey commented Apr 9, 2026

Copy link
Copy Markdown
Contributor Author

pushed all the fixes from the review:

  • resubscribeShard deadlock: releases s.mu before nodeAddrForChannel, re-acquires after — same pattern as main SSubscribe flow
  • chanShard race in migration: added existence check before updating, and only update on successful SSubscribe
  • Close ordering: deregister from ClusterClient before acquiring s.mu
  • async topology notification: notifyShardedPubSubs now runs in a goroutine with a CAS guard so it doesn't block the reload path
  • interface parity: added Ping, ReceiveMessage, and ReceiveTimeout methods on ShardedPubSub

all tests pass, go vet clean. let me know if there's anything else!

Comment thread sharded_pubsub.go
Comment thread sharded_pubsub.go
Comment thread sharded_pubsub.go
Comment thread sharded_pubsub.go
@ndyakov

ndyakov commented May 7, 2026

Copy link
Copy Markdown
Member

@veeceey Hey, do you expect to continue the work on this one or we should take over?

@ndyakov ndyakov self-assigned this Jun 8, 2026
Comment thread sharded_pubsub.go
Comment thread sharded_pubsub.go
Comment thread sharded_pubsub.go Outdated
Comment thread sharded_pubsub.go
Comment thread osscluster.go
…mapping, partial resubscribe orphans, coalesced topology notifications)
Comment thread sharded_pubsub.go
Comment thread sharded_pubsub.go
Comment thread sharded_pubsub.go

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds first-class support for sharded pub/sub subscriptions that span multiple hash slots/nodes in Redis Cluster by introducing a ShardedPubSub multiplexer and wiring it into ClusterClient topology reloads, while keeping existing SSubscribe behavior intact for backward compatibility.

Changes:

  • Introduces ShardedPubSub to manage one underlying PubSub per shard node and merge all messages into a single Channel().
  • Adds ClusterClient.SSubscribeSharded plus a coalesced async notifier triggered after successful cluster state reloads to migrate subscriptions on topology changes.
  • Adds unit tests covering lifecycle, buffering options, registration/deregistration, and notifier coalescing behavior.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.

File Description
sharded_pubsub.go New ShardedPubSub implementation: per-node shard connections, message multiplexing, and topology-change migration logic.
sharded_pubsub_test.go New unit tests for ShardedPubSub lifecycle, options, bookkeeping, and notifier coalescing.
osscluster.go Adds cluster reload hook + sharded-pubsub registration/notification plumbing; documents SSubscribe limitation and introduces SSubscribeSharded.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread sharded_pubsub.go
Comment on lines +186 to +203
func (s *ShardedPubSub) SUnsubscribe(ctx context.Context, channels ...string) error {
s.mu.Lock()
defer s.mu.Unlock()

if s.closed {
return pool.ErrClosed
}

// Group channels by their known shard. The mapping is intentionally left
// in place here and only removed after a successful unsubscribe below, so
// that a failed SUnsubscribe does not desync our bookkeeping from the
// server (where the subscription and its forwarder may still be active).
groups := make(map[string][]string)
for _, ch := range channels {
if addr, ok := s.chanShard[ch]; ok {
groups[addr] = append(groups[addr], ch)
}
}
Comment thread sharded_pubsub.go
Comment on lines +227 to +234
s.mu.Lock()
defer s.mu.Unlock()

// Determine the channel buffer size. We default to 100 but respect
// WithChannelSize if provided. We apply all options to a single probe
// (matching newChannel in pubsub.go) so later options don't reset the
// values set by earlier ones.
size := 100
Comment thread sharded_pubsub.go
Comment on lines +334 to +338
// Now that the new shard is subscribed, unsubscribe from the old
// one so we never have a window with no active subscription.
if oldPS, ok := s.shards[oldAddr]; ok {
_ = oldPS.SUnsubscribe(ctx, channels...)
}
Comment thread sharded_pubsub.go
Comment on lines +211 to +216
// Drop the mapping only after the unsubscribe succeeded (or when there
// is no live shard connection to unsubscribe from).
for _, ch := range chs {
delete(s.chanShard, ch)
}
}
Comment thread osscluster.go
Comment on lines +2360 to +2364
// SSubscribeSharded subscribes the client to the specified shard channels,
// automatically managing connections to all relevant cluster nodes.
// Unlike SSubscribe, this correctly handles channels that hash to different
// slots by maintaining one connection per shard node.
func (c *ClusterClient) SSubscribeSharded(ctx context.Context, channels ...string) *ShardedPubSub {
ndyakov added 2 commits June 8, 2026 18:28
…fecycle

- Group SSUBSCRIBE/SUNSUBSCRIBE by hash slot to avoid CROSSSLOT (fixes redis#3133)
- Tolerate per-channel resolution failure in SSubscribe
- Fix Close()/Channel() double-close race and ReceiveMessage msgCh data race
- Move Ping network I/O off the mutex
- Add cross-slot cluster integration test; document health/reconnect + initial-error semantics
Comment thread pubsub.go Outdated
Comment thread sharded_pubsub.go
ndyakov added 2 commits June 9, 2026 11:36
resubscribeShard aborted on the first nodeAddrForChannel error, abandoning
every other channel that was on the failed shard (the SSubscribe caller then
dropped all their mappings). Mirror the tolerance pattern already used in
SSubscribe: record the first resolution error, continue, and re-subscribe
every channel that did resolve, returning the first error to the caller.
Comment thread sharded_pubsub.go Outdated
Comment thread sharded_pubsub.go
Addresses unresolved review comments on redis#3717:

- PubSub.SSubscribe/SUnsubscribe now only update schannels for the
  slot-groups whose SSUBSCRIBE/SUNSUBSCRIBE command actually succeeded.
  subscribeSharded reports the failed channels so a partial multi-slot
  batch no longer desyncs local state from the server (Cursor: partial
  slot schannels mismatch).

- ShardedPubSub.SUnsubscribe attempts every shard group instead of
  returning on the first failure, so a partial multi-shard unsubscribe
  no longer leaves later groups subscribed-and-tracked while earlier
  groups are gone, and reports the first error (Cursor: partial
  multi-shard unsubscribe failure).

- resubscribeShard drops the mappings of channels it could not
  re-subscribe (failed resolution or failed SSUBSCRIBE) instead of
  leaving them pointing at the removed shard with no live connection,
  fixing orphaned channels from earlier SSubscribe calls (Cursor:
  orphan channels after resubscribe failure).

- onTopologyChange rebuilds the old shard when its SUnsubscribe fails
  instead of leaving duplicate active subscriptions on old+new nodes
  (Copilot: onTopologyChange ignores old SUnsubscribe errors).

Adds unit tests covering partial-failure SUnsubscribe across shards and
schannels tracking on subscribe/unsubscribe success and failure.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Reviewed by Cursor Bugbot for commit 0913edd. Configure here.

Comment thread sharded_pubsub.go Outdated
The cleanup sweep after a failed resubscribeShard deleted every chanShard
entry pointing at failedAddr. Because resubscribeShard releases s.mu while
resolving node addresses, a concurrent onTopologyChange/SSubscribe can
re-add or migrate channels (or reuse failedAddr for brand-new channels)
during that window; the blanket sweep would silently orphan those.

Scope the sweep to the channels this call collected, skip the ones it
recovered, and only drop entries that still point at failedAddr.
@ndyakov ndyakov force-pushed the fix/issue-3133-sharded-pubsub-multi-shard branch from 0913edd to 55f8a03 Compare June 10, 2026 14:55

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 7 out of 8 changed files in this pull request and generated 2 comments.

Comment thread sharded_pubsub_test.go
newConn: func(ctx context.Context, addr string, channels []string) (*pool.Conn, error) {
return cn, nil
},
closeConn: func(c *pool.Conn) error { return clientConn.Close() },
Comment thread sharded_pubsub_test.go
}

func TestPubSubSUnsubscribeDropsSchannelsOnSuccess(t *testing.T) {
ps := newNoopPubSub()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Sharded pubsub subscriber cannot get messages from all subscribed channels

3 participants