refactor(dispatcher): replace errgroup with sync.WaitGroup for event processing#188
refactor(dispatcher): replace errgroup with sync.WaitGroup for event processing#188
Conversation
WalkthroughThe changes refactor the dispatcher's per-listener concurrent execution from an errgroup-based pattern to manual goroutine management using WaitGroup and buffered channels for error aggregation. The go.mod update reflects this by moving the sync dependency to the indirect block. Changes
Sequence DiagramsequenceDiagram
participant Caller
participant Dispatcher
participant Listeners
participant ErrChan as Error Channel
participant WaitGroup
rect rgba(200, 220, 255, 0.3)
Note over Dispatcher: New Manual Pattern
end
Caller->>Dispatcher: Call with listeners
alt No listeners
Dispatcher-->>Caller: Return nil
else Has listeners
Dispatcher->>WaitGroup: Add(count)
loop For each listener
Dispatcher->>Dispatcher: Spawn goroutine
par Concurrent listeners
Dispatcher->>Listeners: Execute listener
Listeners-->>ErrChan: Send error (if any)
and Waiting
WaitGroup->>WaitGroup: Wait()
end
end
Dispatcher->>ErrChan: Close after WaitGroup done
Dispatcher->>Dispatcher: errors.Join(collected errors)
Dispatcher-->>Caller: Return aggregated error
end
Estimated code review effort🎯 2 (Simple) | ⏱️ ~12 minutes
Possibly related PRs
Suggested labels
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @flc1125, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request undertakes a significant refactoring of the event dispatching mechanism. It transitions from using Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## v2.x #188 +/- ##
==========================================
- Coverage 94.73% 92.00% -2.74%
==========================================
Files 1 1
Lines 190 200 +10
==========================================
+ Hits 180 184 +4
- Misses 8 11 +3
- Partials 2 5 +3 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Code Review
This pull request refactors the event processing logic to replace errgroup with a sync.WaitGroup. The goal is to ensure all event listeners are executed, even if some of them fail, and to collect all errors. While this is a good improvement in behavior, the new implementation introduces a critical concurrency bug by incorrectly capturing the loop variable within the goroutine closure. This will cause most, if not all, listeners to process the event for the last item in the listener slice, leading to incorrect behavior. I've provided a comment with a suggested fix.
| for _, listener := range listeners { | ||
| eg.Go(func() error { | ||
| return handler(listener, ctx, event) | ||
| }) | ||
| go func() { | ||
| defer wg.Done() | ||
| if err := handler(listener, ctx, event); err != nil { | ||
| errCh <- err | ||
| } | ||
| }() | ||
| } |
There was a problem hiding this comment.
This loop introduces a common concurrency bug in Go. The listener variable is being captured by the goroutine's closure. Because the goroutines are scheduled and may not run until after the loop has completed, it's highly likely that most or all of them will see the listener variable with the value of the last element from the listeners slice.
To fix this, you should pass the listener as an argument to the goroutine's function literal. This creates a new copy of the variable for each goroutine.
| for _, listener := range listeners { | |
| eg.Go(func() error { | |
| return handler(listener, ctx, event) | |
| }) | |
| go func() { | |
| defer wg.Done() | |
| if err := handler(listener, ctx, event); err != nil { | |
| errCh <- err | |
| } | |
| }() | |
| } | |
| for _, listener := range listeners { | |
| go func(l L) { | |
| defer wg.Done() | |
| if err := handler(l, ctx, event); err != nil { | |
| errCh <- err | |
| } | |
| }(listener) | |
| } |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| func processEvent[E any, L any](ctx context.Context, listeners []L, handler func(L, context.Context, E) error, event E) error { | ||
| eg, ctx := errgroup.WithContext(ctx) | ||
| if len(listeners) == 0 { | ||
| return nil | ||
| } | ||
|
|
||
| wg := sync.WaitGroup{} | ||
| wg.Add(len(listeners)) | ||
|
|
||
| errCh := make(chan error, len(listeners)) | ||
| for _, listener := range listeners { | ||
| eg.Go(func() error { | ||
| return handler(listener, ctx, event) | ||
| }) | ||
| go func() { | ||
| defer wg.Done() | ||
| if err := handler(listener, ctx, event); err != nil { | ||
| errCh <- err | ||
| } | ||
| }() | ||
| } | ||
| wg.Wait() | ||
|
|
||
| close(errCh) |
There was a problem hiding this comment.
Preserve context cancellation when any listener fails
Replacing errgroup.WithContext with a plain sync.WaitGroup drops the derived context that errgroup used to cancel the remaining goroutines as soon as one handler returns an error. Handlers that wait on <-ctx.Done() to abort early will now wait forever because the context is never cancelled, leaving wg.Wait() blocked and the dispatcher stuck whenever any listener fails before the others complete. Consider retaining a cancellable context and cancelling it on the first error to keep the previous behaviour.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
dispatcher.go(2 hunks)go.mod(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: go test (1.25.x, windows-latest)
🔇 Additional comments (1)
go.mod (1)
214-214: Dependency declaration looks correct.Moving
golang.org/x/syncto the indirect block matches the code change that stops importing it directly. 👍
| if len(listeners) == 0 { | ||
| return nil | ||
| } | ||
|
|
||
| wg := sync.WaitGroup{} | ||
| wg.Add(len(listeners)) | ||
|
|
||
| errCh := make(chan error, len(listeners)) | ||
| for _, listener := range listeners { | ||
| eg.Go(func() error { | ||
| return handler(listener, ctx, event) | ||
| }) | ||
| go func() { | ||
| defer wg.Done() | ||
| if err := handler(listener, ctx, event); err != nil { | ||
| errCh <- err | ||
| } | ||
| }() | ||
| } | ||
| wg.Wait() | ||
|
|
||
| close(errCh) | ||
| var err error | ||
| for e := range errCh { | ||
| err = errors.Join(err, e) |
There was a problem hiding this comment.
Restore context cancellation semantics lost in the refactor.
The old errgroup.WithContext path cancelled the derived context as soon as any listener returned a non-nil error (or once the group completed). After this refactor every handler receives the original ctx, so listeners that honour ctx.Done() will now run to completion even after a sibling fails. That’s a behavioural regression with real reliability impact, especially for long-running or expensive listeners. Please derive a cancellable child context and cancel it on the first error while still aggregating all failures.
func processEvent[E any, L any](ctx context.Context, listeners []L, handler func(L, context.Context, E) error, event E) error {
if len(listeners) == 0 {
return nil
}
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
wg := sync.WaitGroup{}
wg.Add(len(listeners))
errCh := make(chan error, len(listeners))
- for _, listener := range listeners {
- go func() {
+ for _, listener := range listeners {
+ listener := listener
+ go func(l L) {
defer wg.Done()
- if err := handler(listener, ctx, event); err != nil {
+ if err := handler(l, ctx, event); err != nil {
errCh <- err
+ cancel()
}
- }()
+ }(listener)
}
wg.Wait()
close(errCh)
var err error📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if len(listeners) == 0 { | |
| return nil | |
| } | |
| wg := sync.WaitGroup{} | |
| wg.Add(len(listeners)) | |
| errCh := make(chan error, len(listeners)) | |
| for _, listener := range listeners { | |
| eg.Go(func() error { | |
| return handler(listener, ctx, event) | |
| }) | |
| go func() { | |
| defer wg.Done() | |
| if err := handler(listener, ctx, event); err != nil { | |
| errCh <- err | |
| } | |
| }() | |
| } | |
| wg.Wait() | |
| close(errCh) | |
| var err error | |
| for e := range errCh { | |
| err = errors.Join(err, e) | |
| if len(listeners) == 0 { | |
| return nil | |
| } | |
| ctx, cancel := context.WithCancel(ctx) | |
| defer cancel() | |
| wg := sync.WaitGroup{} | |
| wg.Add(len(listeners)) | |
| errCh := make(chan error, len(listeners)) | |
| for _, listener := range listeners { | |
| listener := listener | |
| go func(l L) { | |
| defer wg.Done() | |
| if err := handler(l, ctx, event); err != nil { | |
| errCh <- err | |
| cancel() | |
| } | |
| }(listener) | |
| } | |
| wg.Wait() | |
| close(errCh) | |
| var err error | |
| for e := range errCh { | |
| err = errors.Join(err, e) |
Summary by CodeRabbit
Refactor
Chores