Skip to content

feat(router): provide an agnostic way of creating events in hooks (Cosmo Streams)#2306

Merged
dkorittki merged 22 commits into
topic/streams-v1from
dominik/eng-8360-provide-an-agnostic-way-of-creating-events-in-hooks
Oct 30, 2025
Merged

feat(router): provide an agnostic way of creating events in hooks (Cosmo Streams)#2306
dkorittki merged 22 commits into
topic/streams-v1from
dominik/eng-8360-provide-an-agnostic-way-of-creating-events-in-hooks

Conversation

@dkorittki

@dkorittki dkorittki commented Oct 27, 2025

Copy link
Copy Markdown
Contributor

Checklist

  • I have discussed my proposed changes in an issue and have received approval to proceed.
  • I have followed the coding standards of the project.
  • Tests or benchmarks have been added or updated.
  • Documentation has been updated on https://github.com/wundergraph/cosmo-docs.
  • I have read the Contributors Guide.

Note

First code design idea as a basis for discussion. Once the design is settled I will edit and extend tests.

Motivation

Currently it is relatively "uncomfortable" to create new events in Cosmo Streams hooks.
You have to know what type of message broker is being at play when a hook is called on order to then create a broker-specific event like kafka.Event, nats.Event or redis.Event.
While it's certainly possible to do this, it is not very user-friendly.
It would be great to have a way for hook users to create new events without needing to know what provider the hook is associated with.

Implementation Design

This PR extends the handler contexts with a new method: NewEvent(data []byte) datasource.StreamEvent.
It allows a hook user to create new events independently of providers:

func (m *CosmoStreamsModule) OnPublishEvents(ctx core.StreamPublishEventHandlerContext, events []datasource.StreamEvent) ([]datasource.StreamEvent, error) {
	newEvent := ctx.NewEvent([]byte(`{"__typename": "Employee", "id": 3}`))
	events = append(events, newEvent)
	return events, nil
}

func (m *CosmoStreamsModule) OnReceiveEvents(ctx core.StreamReceiveEventHandlerContext, events []datasource.StreamEvent) ([]datasource.StreamEvent, error) {
	newEvent := ctx.NewEvent([]byte(`{"__typename": "Employee", "id": 3}`))
	events = append(events, newEvent)
	return events, nil
}

func (m *CosmoStreamsModule) SubscriptionOnStart(ctx core.SubscriptionOnStartHandlerContext) error {
	firstEvent := ctx.NewEvent([]byte(`{"__typename": "Employee", "id": 3}`))
	ctx.WriteEvent(firstEvent)
	return nil
}

The above code works for all providers. Depending on the provider being at play when the hook is called, ctx.NewEvent returns either a concrete kafka.Event, nats.Event or redis.Event, hidden behind the datasource.StreamEvent interface.

Summary by CodeRabbit

Release Notes

  • Refactor
    • Updated event handling pipeline in subscription and publishing workflows with improved event construction capabilities.
    • Refined hook function signatures across multiple pub/sub implementations to support enhanced event creation.
    • Extended context interfaces with new methods for raw event construction.

@dkorittki dkorittki changed the base branch from main to topic/streams-v1 October 27, 2025 15:54
@coderabbitai

coderabbitai Bot commented Oct 27, 2025

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Walkthrough

This PR introduces an EventBuilderFn type and threads it through the pubsub datasource system. Hook signatures are updated to accept event builders as parameters. Handler contexts gain NewRawEvent methods. Event builders are propagated through providers, datasources, and all implementations (Kafka, NATS, Redis) to enable custom event construction from raw bytes.

Changes

Cohort / File(s) Change Summary
Hook type signatures
router/pkg/pubsub/datasource/hooks.go
Updated SubscriptionOnStartFn, OnPublishEventsFn, and OnReceiveEventsFn type signatures to accept additional eventBuilder EventBuilderFn parameter; order varies per hook (e.g., OnReceiveEventsFn places it between config and events).
Event builder type definition
router/pkg/pubsub/datasource/subscription_datasource.go
Added new public type EventBuilderFn func(data []byte) MutableStreamEvent; introduced eventBuilder field to PubSubSubscriptionDataSource; updated constructor and SubscriptionOnStart flow to accept and propagate builder.
Subscription datasource tests
router/pkg/pubsub/datasource/subscription_datasource_test.go
Added test helper testSubscriptionDataSourceEventBuilder; updated NewPubSubSubscriptionDataSource constructor calls and hook signatures to include eventBuilder parameter.
Subscription event updater
router/pkg/pubsub/datasource/subscription_event_updater.go
Extended NewSubscriptionEventUpdater signature with eventBuilder parameter; updated hook invocation to pass builder to each hook.
Subscription event updater tests
router/pkg/pubsub/datasource/subscription_event_updater_test.go
Introduced testEventBuilder helper; updated all test hooks and NewSubscriptionEventUpdater calls to accept and pass eventBuilder parameter.
PubSub provider
router/pkg/pubsub/datasource/pubsubprovider.go
Added eventBuilder field to PubSubProvider; updated NewPubSubProvider signature and applyPublishEventHooks to accept and propagate builder; wired builder through Publish path.
PubSub provider tests
router/pkg/pubsub/datasource/pubsubprovider_test.go
Introduced testPubSubEventBuilder helper; updated NewPubSubProvider and applyPublishEventHooks calls throughout tests to pass eventBuilder parameter.
Router core integration
router/core/subscriptions_modules.go
Added NewRawEvent([]byte) datasource.MutableStreamEvent method to handler contexts (pubSubSubscriptionOnStartHookContext, pubSubPublishEventHookContext, pubSubStreamReceiveEventHookContext, engineSubscriptionOnStartHookContext); added eventBuilder field to context structs; updated hook factory signatures to accept and inject builder.
Kafka implementation
router/pkg/pubsub/kafka/engine_datasource_factory.go, provider_builder.go
Extracted uniqueRequestIdFn and eventCreateFn functions; updated NewPubSubSubscriptionDataSource call with new parameters; introduced eventBuilder in NewPubSubProvider call.
NATS implementation
router/pkg/pubsub/nats/engine_datasource_factory.go, engine_datasource_test.go, provider_builder.go, engine_datasource_factory_test.go
Extracted uniqueRequestIdFn and createEventFn functions in factory; introduced testNatsEventBuilder in tests; updated NewPubSubSubscriptionDataSource and NewPubSubProvider calls to include eventBuilder.
Redis implementation
router/pkg/pubsub/redis/engine_datasource_factory.go, provider_builder.go
Refactored subscription data source instantiation with separate uniqueRequestIdFn and eventCreateFn; introduced eventBuilder in NewPubSubProvider call.
Subscription test
router-tests/modules/start_subscription_test.go
Replaced MutableEvent/MutableEngineEvent construction with new raw event flow using ctx.NewRawEvent([]byte(...)) and ctx.WriteEvent(evt).

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20–30 minutes

  • Complexity drivers: Threading of eventBuilder parameter across multiple hook types with varying parameter positions; consistent pattern application across 15+ files reduces cognitive load; wiring through multiple provider/datasource constructor chains.
  • Attention areas:
    • Parameter order inconsistency: OnReceiveEventsFn places eventBuilder between config and events, while others place it at the end—verify this is intentional.
    • Verify NewRawEvent implementations correctly delegate to the injected eventBuilder across all context types.
    • Ensure test helpers (testPubSubEventBuilder, testSubscriptionDataSourceEventBuilder, testNatsEventBuilder, etc.) are properly wired in all test scenarios.
    • Cross-implementation consistency in Kafka, NATS, and Redis datasource factories regarding uniqueRequestIdFn and event creation functions.

Possibly related PRs

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 11.11% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The PR title "provide an agnostic way of creating events in hooks (Cosmo Streams)" directly and clearly describes the main objective of the changeset. The implementation introduces a NewRawEvent([]byte) method across handler contexts that returns broker-specific events hidden behind a common interface, enabling hook authors to create events without knowledge of the underlying message broker. This is precisely what the title conveys, and it is concise, specific, and free of vague terminology. A developer scanning commit history would immediately understand that this PR adds a broker-agnostic event creation mechanism for hook authors.

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Comment thread router/core/subscriptions_modules.go Outdated
Comment thread router/pkg/pubsub/datasource/subscription_datasource.go Outdated
@github-actions

github-actions Bot commented Oct 30, 2025

Copy link
Copy Markdown

Router image scan passed

✅ No security vulnerabilities found in image:

ghcr.io/wundergraph/cosmo/router:sha-68210d0490226a01d5a9268268edabdd717704dd

…d-readwrite-events' into dominik/eng-8360-provide-an-agnostic-way-of-creating-events-in-hooks

Already pull in the changes from another feature branch which will land on the topic branch shortly. The merge has to be done
regardless.
@dkorittki

Copy link
Copy Markdown
Contributor Author

fyi merged #2304 into this and after that one got merged into the topic branch, I also merged that in here as well. This allowed me to work on this branch while the other was still in review. The merge into this branch causes some new commits to appear here because #2304 got squash-merged into the topic branch but when comparing the code diff from this branch against the topic base branch, it only contains the new additions from this PR.

@dkorittki

Copy link
Copy Markdown
Contributor Author

Based on @StarpTech ´s suggestion I renamed NewEvent to NewRawEvent. The idea is to have a NewEvent method in the future, which accepts some sort of type instead of raw byte data to create an event.

@dkorittki dkorittki marked this pull request as ready for review October 30, 2025 09:42
@dkorittki

Copy link
Copy Markdown
Contributor Author

@coderabbitai review

@coderabbitai

coderabbitai Bot commented Oct 30, 2025

Copy link
Copy Markdown
Contributor
✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
router/core/subscriptions_modules.go (1)

112-114: Fix the SetData implementation to handle variable-length data correctly.

The current copy(e, data) only copies min(len(e), len(data)) bytes. If data is larger than the current slice capacity, it will be silently truncated. If data is smaller, old data remains in the tail.

Apply this diff to properly handle variable-length data:

-func (e MutableEngineEvent) SetData(data []byte) {
-	copy(e, data)
-}
+func (e *MutableEngineEvent) SetData(data []byte) {
+	*e = data
+}

Alternatively, if you need to preserve the underlying buffer:

 func (e MutableEngineEvent) SetData(data []byte) {
+	if len(data) > cap(e) {
+		// Cannot fit in current capacity, would need reallocation
+		panic("SetData: data exceeds capacity")
+	}
+	e = e[:len(data)]
 	copy(e, data)
 }

Note: The first approach is simpler and more intuitive for a mutable event type.

🧹 Nitpick comments (2)
router/pkg/pubsub/datasource/subscription_datasource_test.go (1)

217-224: Exercise the injected eventBuilder in the hook test.

Right now the hook only flips a flag and ignores eventBuilder, so the test would still pass if the datasource forgot to pass a builder (or passed nil). Please invoke eventBuilder (and assert on the returned event) so the test fails when the wiring regresses. That gives us coverage for the main feature this PR is adding.

router/pkg/pubsub/datasource/pubsubprovider_test.go (1)

233-238: Validate that hooks actually receive the builder.

All publish-hook tests ignore the new eventBuilder parameter, meaning they won’t catch a regression where the provider stops supplying it. Please update at least one hook here to call eventBuilder (and assert on the resulting event) so we genuinely exercise the new contract.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4c7687d and 411b3d0.

📒 Files selected for processing (17)
  • router-tests/modules/start_subscription_test.go (2 hunks)
  • router/core/subscriptions_modules.go (12 hunks)
  • router/pkg/pubsub/datasource/hooks.go (1 hunks)
  • router/pkg/pubsub/datasource/pubsubprovider.go (3 hunks)
  • router/pkg/pubsub/datasource/pubsubprovider_test.go (16 hunks)
  • router/pkg/pubsub/datasource/subscription_datasource.go (4 hunks)
  • router/pkg/pubsub/datasource/subscription_datasource_test.go (14 hunks)
  • router/pkg/pubsub/datasource/subscription_event_updater.go (3 hunks)
  • router/pkg/pubsub/datasource/subscription_event_updater_test.go (20 hunks)
  • router/pkg/pubsub/kafka/engine_datasource_factory.go (1 hunks)
  • router/pkg/pubsub/kafka/provider_builder.go (1 hunks)
  • router/pkg/pubsub/nats/engine_datasource_factory.go (1 hunks)
  • router/pkg/pubsub/nats/engine_datasource_factory_test.go (2 hunks)
  • router/pkg/pubsub/nats/engine_datasource_test.go (1 hunks)
  • router/pkg/pubsub/nats/provider_builder.go (1 hunks)
  • router/pkg/pubsub/redis/engine_datasource_factory.go (1 hunks)
  • router/pkg/pubsub/redis/provider_builder.go (1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-08-28T09:18:10.121Z
Learnt from: endigma
PR: wundergraph/cosmo#2141
File: router-tests/http_subscriptions_test.go:100-108
Timestamp: 2025-08-28T09:18:10.121Z
Learning: In router-tests/http_subscriptions_test.go heartbeat tests, the message ordering should remain strict with data messages followed by heartbeat messages, as the timing is deterministic and known by design in the Cosmo router implementation.

Applied to files:

  • router-tests/modules/start_subscription_test.go
  • router/pkg/pubsub/datasource/subscription_event_updater_test.go
📚 Learning: 2025-10-01T20:39:16.113Z
Learnt from: SkArchon
PR: wundergraph/cosmo#2252
File: router-tests/telemetry/telemetry_test.go:9684-9693
Timestamp: 2025-10-01T20:39:16.113Z
Learning: Repo preference: In router-tests/telemetry/telemetry_test.go, keep strict > 0 assertions for request.operation.*Time (parsingTime, normalizationTime, validationTime, planningTime) in telemetry-related tests; do not relax to >= 0 unless CI flakiness is observed.

Applied to files:

  • router-tests/modules/start_subscription_test.go
🔇 Additional comments (6)
router/core/subscriptions_modules.go (2)

167-169: Consider validating the data length when creating MutableEngineEvent.

The type conversion MutableEngineEvent(data) directly aliases the input byte slice. This is efficient but means the caller retains a reference to the underlying data. Ensure this is the intended behavior and that callers understand mutations will affect the original slice.

If independent ownership is required, consider:

 func (c *engineSubscriptionOnStartHookContext) NewRawEvent(data []byte) datasource.MutableStreamEvent {
-	return MutableEngineEvent(data)
+	return MutableEngineEvent(slices.Clone(data))
 }

29-30: Well-designed abstraction for event creation.

The addition of NewRawEvent to hook contexts and the consistent threading of eventBuilder through all hook signatures provides a clean abstraction. Hook authors can now create broker-specific events without knowing the underlying implementation details. The pattern is applied consistently across all hook types (subscription start, publish, receive).

Also applies to: 62-64, 102-104, 187-187, 196-196, 234-235, 261-262, 279-279, 287-287, 325-327, 334-334, 342-342

router/pkg/pubsub/nats/engine_datasource_factory.go (1)

79-105: Clean separation of ID generation and event creation.

The refactoring to separate uniqueRequestIdFn (lines 79-97) and createEventFn (lines 99-101) improves code clarity and aligns with the broader design pattern. Error handling in the ID generation function is appropriate.

router/pkg/pubsub/kafka/provider_builder.go (1)

162-166: Consistent eventBuilder integration.

The eventBuilder function and its integration into NewPubSubProvider follows the same pattern as the NATS and Redis providers. The code duplication across providers is acceptable since each works with provider-specific event types.

router/pkg/pubsub/redis/provider_builder.go (1)

74-78: Consistent eventBuilder integration across all providers.

The Redis provider builder follows the same pattern as Kafka and NATS, ensuring uniformity in how events are constructed across different message brokers. This consistency makes the codebase easier to maintain.

router/pkg/pubsub/nats/engine_datasource_test.go (1)

196-196: testNatsEventBuilder is properly defined and accessible.

The helper testNatsEventBuilder is defined in engine_datasource_factory_test.go as a package-level function. Since both test files are in the same nats package, the usage at line 196 is valid. No issues found.

Comment thread router/pkg/pubsub/datasource/subscription_event_updater_test.go
@dkorittki

Copy link
Copy Markdown
Contributor Author

Adressed coderrabbits points, which all are valid with the exception of his hallucination.

@dkorittki dkorittki requested review from alepane21 and removed request for Noroth, StarpTech, devsergiy, endigma and jensneuse October 30, 2025 11:37
Comment thread router/core/subscriptions_modules.go Outdated
@dkorittki dkorittki merged commit d61e56b into topic/streams-v1 Oct 30, 2025
34 of 35 checks passed
@dkorittki dkorittki deleted the dominik/eng-8360-provide-an-agnostic-way-of-creating-events-in-hooks branch October 30, 2025 17:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants