Skip to content

refactor(dispatcher): replace errgroup with sync.WaitGroup for event processing#188

Merged
flc1125 merged 1 commit intov2.xfrom
concurrent-optimization
Nov 12, 2025
Merged

refactor(dispatcher): replace errgroup with sync.WaitGroup for event processing#188
flc1125 merged 1 commit intov2.xfrom
concurrent-optimization

Conversation

@flc1125
Copy link
Copy Markdown
Owner

@flc1125 flc1125 commented Nov 12, 2025

Summary by CodeRabbit

  • Refactor

    • Enhanced error aggregation and concurrency handling in dispatcher operations.
  • Chores

    • Refined dependency management to reflect actual usage patterns.

@dosubot dosubot bot added the size:M This PR changes 30-99 lines, ignoring generated files. label Nov 12, 2025
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Nov 12, 2025

Walkthrough

The changes refactor the dispatcher's per-listener concurrent execution from an errgroup-based pattern to manual goroutine management using WaitGroup and buffered channels for error aggregation. The go.mod update reflects this by moving the sync dependency to the indirect block.

Changes

Cohort / File(s) Summary
Concurrency refactoring
dispatcher.go
Replaced errgroup concurrency with manual goroutines, WaitGroup, and buffered channel for error collection. Added sync import; removed errgroup dependency and context-wrapped error handling.
Dependency adjustment
go.mod
Moved golang.org/x/sync v0.18.0 from direct require to indirect require block.

Sequence Diagram

sequenceDiagram
    participant Caller
    participant Dispatcher
    participant Listeners
    participant ErrChan as Error Channel
    participant WaitGroup

    rect rgba(200, 220, 255, 0.3)
    Note over Dispatcher: New Manual Pattern
    end

    Caller->>Dispatcher: Call with listeners
    alt No listeners
        Dispatcher-->>Caller: Return nil
    else Has listeners
        Dispatcher->>WaitGroup: Add(count)
        loop For each listener
            Dispatcher->>Dispatcher: Spawn goroutine
            par Concurrent listeners
                Dispatcher->>Listeners: Execute listener
                Listeners-->>ErrChan: Send error (if any)
            and Waiting
                WaitGroup->>WaitGroup: Wait()
            end
        end
        Dispatcher->>ErrChan: Close after WaitGroup done
        Dispatcher->>Dispatcher: errors.Join(collected errors)
        Dispatcher-->>Caller: Return aggregated error
    end
Loading

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~12 minutes

  • Concurrency logic verification: Ensure WaitGroup, goroutines, and buffered channel pattern correctly handles edge cases (empty listeners, error propagation, goroutine leaks).
  • Error aggregation: Verify that errors.Join properly combines all collected errors and maintains order/completeness.
  • Dependency consistency: Confirm sync import addition and that errgroup removal doesn't affect other parts of the codebase.

Possibly related PRs

  • PR #183: Similar concurrency refactoring in dispatcher.go; one PR introduces errgroup-based generic processEvent while this PR removes errgroup in favor of manual goroutines and WaitGroup—directly conflicting/related changes to the same component.

Suggested labels

go, size:L

Poem

🐰 No more groups, just goroutines that play,
With WaitGroup and channels leading the way,
Errors join together in harmony's dance,
The dispatcher now orchestrates with its own chance! 🎵

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main refactoring: replacing errgroup with sync.WaitGroup in the dispatcher for event processing, which aligns with the actual code changes.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch concurrent-optimization

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello @flc1125, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request undertakes a significant refactoring of the event dispatching mechanism. It transitions from using errgroup for concurrent event processing to a more explicit sync.WaitGroup and channel-based approach. This change aims to provide finer control over concurrency and error aggregation during event handling, while also streamlining the project's direct dependencies.

Highlights

  • Concurrency Mechanism Refactor: The processEvent function has been refactored to replace the use of golang.org/x/sync/errgroup with sync.WaitGroup for managing concurrent execution of event listeners.
  • Custom Error Aggregation: A custom error collection mechanism using a buffered channel and errors.Join has been implemented to gather all errors from concurrently executed event handlers, providing a comprehensive error report.
  • Dependency Management: The direct dependency on golang.org/x/sync has been removed from go.mod, reflecting the change in concurrency primitive usage. It now remains as an indirect dependency.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@codecov
Copy link
Copy Markdown

codecov bot commented Nov 12, 2025

Codecov Report

❌ Patch coverage is 60.00000% with 6 lines in your changes missing coverage. Please review.
✅ Project coverage is 92.00%. Comparing base (4528705) to head (f9082d6).
⚠️ Report is 1 commits behind head on v2.x.

Files with missing lines Patch % Lines
dispatcher.go 60.00% 3 Missing and 3 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             v2.x     #188      +/-   ##
==========================================
- Coverage   94.73%   92.00%   -2.74%     
==========================================
  Files           1        1              
  Lines         190      200      +10     
==========================================
+ Hits          180      184       +4     
- Misses          8       11       +3     
- Partials        2        5       +3     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the event processing logic to replace errgroup with a sync.WaitGroup. The goal is to ensure all event listeners are executed, even if some of them fail, and to collect all errors. While this is a good improvement in behavior, the new implementation introduces a critical concurrency bug by incorrectly capturing the loop variable within the goroutine closure. This will cause most, if not all, listeners to process the event for the last item in the listener slice, leading to incorrect behavior. I've provided a comment with a suggested fix.

Comment on lines 373 to +380
for _, listener := range listeners {
eg.Go(func() error {
return handler(listener, ctx, event)
})
go func() {
defer wg.Done()
if err := handler(listener, ctx, event); err != nil {
errCh <- err
}
}()
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This loop introduces a common concurrency bug in Go. The listener variable is being captured by the goroutine's closure. Because the goroutines are scheduled and may not run until after the loop has completed, it's highly likely that most or all of them will see the listener variable with the value of the last element from the listeners slice.

To fix this, you should pass the listener as an argument to the goroutine's function literal. This creates a new copy of the variable for each goroutine.

Suggested change
for _, listener := range listeners {
eg.Go(func() error {
return handler(listener, ctx, event)
})
go func() {
defer wg.Done()
if err := handler(listener, ctx, event); err != nil {
errCh <- err
}
}()
}
for _, listener := range listeners {
go func(l L) {
defer wg.Done()
if err := handler(l, ctx, event); err != nil {
errCh <- err
}
}(listener)
}

@dosubot dosubot bot added the go Pull requests that update Go code label Nov 12, 2025
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines 364 to +383
func processEvent[E any, L any](ctx context.Context, listeners []L, handler func(L, context.Context, E) error, event E) error {
eg, ctx := errgroup.WithContext(ctx)
if len(listeners) == 0 {
return nil
}

wg := sync.WaitGroup{}
wg.Add(len(listeners))

errCh := make(chan error, len(listeners))
for _, listener := range listeners {
eg.Go(func() error {
return handler(listener, ctx, event)
})
go func() {
defer wg.Done()
if err := handler(listener, ctx, event); err != nil {
errCh <- err
}
}()
}
wg.Wait()

close(errCh)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve context cancellation when any listener fails

Replacing errgroup.WithContext with a plain sync.WaitGroup drops the derived context that errgroup used to cancel the remaining goroutines as soon as one handler returns an error. Handlers that wait on <-ctx.Done() to abort early will now wait forever because the context is never cancelled, leaving wg.Wait() blocked and the dispatcher stuck whenever any listener fails before the others complete. Consider retaining a cancellable context and cancelling it on the first error to keep the previous behaviour.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4528705 and f9082d6.

📒 Files selected for processing (2)
  • dispatcher.go (2 hunks)
  • go.mod (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: go test (1.25.x, windows-latest)
🔇 Additional comments (1)
go.mod (1)

214-214: Dependency declaration looks correct.

Moving golang.org/x/sync to the indirect block matches the code change that stops importing it directly. 👍

Comment on lines +365 to +386
if len(listeners) == 0 {
return nil
}

wg := sync.WaitGroup{}
wg.Add(len(listeners))

errCh := make(chan error, len(listeners))
for _, listener := range listeners {
eg.Go(func() error {
return handler(listener, ctx, event)
})
go func() {
defer wg.Done()
if err := handler(listener, ctx, event); err != nil {
errCh <- err
}
}()
}
wg.Wait()

close(errCh)
var err error
for e := range errCh {
err = errors.Join(err, e)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Restore context cancellation semantics lost in the refactor.

The old errgroup.WithContext path cancelled the derived context as soon as any listener returned a non-nil error (or once the group completed). After this refactor every handler receives the original ctx, so listeners that honour ctx.Done() will now run to completion even after a sibling fails. That’s a behavioural regression with real reliability impact, especially for long-running or expensive listeners. Please derive a cancellable child context and cancel it on the first error while still aggregating all failures.

 func processEvent[E any, L any](ctx context.Context, listeners []L, handler func(L, context.Context, E) error, event E) error {
 	if len(listeners) == 0 {
 		return nil
 	}
 
+	ctx, cancel := context.WithCancel(ctx)
+	defer cancel()
+
 	wg := sync.WaitGroup{}
 	wg.Add(len(listeners))
 
 	errCh := make(chan error, len(listeners))
-	for _, listener := range listeners {
-		go func() {
+	for _, listener := range listeners {
+		listener := listener
+		go func(l L) {
 			defer wg.Done()
-			if err := handler(listener, ctx, event); err != nil {
+			if err := handler(l, ctx, event); err != nil {
 				errCh <- err
+				cancel()
 			}
-		}()
+		}(listener)
 	}
 	wg.Wait()
 
 	close(errCh)
 	var err error
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if len(listeners) == 0 {
return nil
}
wg := sync.WaitGroup{}
wg.Add(len(listeners))
errCh := make(chan error, len(listeners))
for _, listener := range listeners {
eg.Go(func() error {
return handler(listener, ctx, event)
})
go func() {
defer wg.Done()
if err := handler(listener, ctx, event); err != nil {
errCh <- err
}
}()
}
wg.Wait()
close(errCh)
var err error
for e := range errCh {
err = errors.Join(err, e)
if len(listeners) == 0 {
return nil
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
wg := sync.WaitGroup{}
wg.Add(len(listeners))
errCh := make(chan error, len(listeners))
for _, listener := range listeners {
listener := listener
go func(l L) {
defer wg.Done()
if err := handler(l, ctx, event); err != nil {
errCh <- err
cancel()
}
}(listener)
}
wg.Wait()
close(errCh)
var err error
for e := range errCh {
err = errors.Join(err, e)

@flc1125 flc1125 merged commit 1dabb20 into v2.x Nov 12, 2025
10 of 12 checks passed
@flc1125 flc1125 deleted the concurrent-optimization branch November 12, 2025 06:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

go Pull requests that update Go code size:M This PR changes 30-99 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant