feat(router): provide an agnostic way of creating events in hooks (Cosmo Streams)#2306
Conversation
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughThis PR introduces an Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20–30 minutes
Possibly related PRs
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Router image scan passed✅ No security vulnerabilities found in image: |
…d-readwrite-events' into dominik/eng-8360-provide-an-agnostic-way-of-creating-events-in-hooks Already pull in the changes from another feature branch which will land on the topic branch shortly. The merge has to be done regardless.
…stic-way-of-creating-events-in-hooks
|
fyi merged #2304 into this and after that one got merged into the topic branch, I also merged that in here as well. This allowed me to work on this branch while the other was still in review. The merge into this branch causes some new commits to appear here because #2304 got squash-merged into the topic branch but when comparing the code diff from this branch against the topic base branch, it only contains the new additions from this PR. |
|
Based on @StarpTech ´s suggestion I renamed |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
router/core/subscriptions_modules.go (1)
112-114: Fix theSetDataimplementation to handle variable-length data correctly.The current
copy(e, data)only copiesmin(len(e), len(data))bytes. Ifdatais larger than the current slice capacity, it will be silently truncated. Ifdatais smaller, old data remains in the tail.Apply this diff to properly handle variable-length data:
-func (e MutableEngineEvent) SetData(data []byte) { - copy(e, data) -} +func (e *MutableEngineEvent) SetData(data []byte) { + *e = data +}Alternatively, if you need to preserve the underlying buffer:
func (e MutableEngineEvent) SetData(data []byte) { + if len(data) > cap(e) { + // Cannot fit in current capacity, would need reallocation + panic("SetData: data exceeds capacity") + } + e = e[:len(data)] copy(e, data) }Note: The first approach is simpler and more intuitive for a mutable event type.
🧹 Nitpick comments (2)
router/pkg/pubsub/datasource/subscription_datasource_test.go (1)
217-224: Exercise the injectedeventBuilderin the hook test.Right now the hook only flips a flag and ignores
eventBuilder, so the test would still pass if the datasource forgot to pass a builder (or passednil). Please invokeeventBuilder(and assert on the returned event) so the test fails when the wiring regresses. That gives us coverage for the main feature this PR is adding.router/pkg/pubsub/datasource/pubsubprovider_test.go (1)
233-238: Validate that hooks actually receive the builder.All publish-hook tests ignore the new
eventBuilderparameter, meaning they won’t catch a regression where the provider stops supplying it. Please update at least one hook here to calleventBuilder(and assert on the resulting event) so we genuinely exercise the new contract.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (17)
router-tests/modules/start_subscription_test.go(2 hunks)router/core/subscriptions_modules.go(12 hunks)router/pkg/pubsub/datasource/hooks.go(1 hunks)router/pkg/pubsub/datasource/pubsubprovider.go(3 hunks)router/pkg/pubsub/datasource/pubsubprovider_test.go(16 hunks)router/pkg/pubsub/datasource/subscription_datasource.go(4 hunks)router/pkg/pubsub/datasource/subscription_datasource_test.go(14 hunks)router/pkg/pubsub/datasource/subscription_event_updater.go(3 hunks)router/pkg/pubsub/datasource/subscription_event_updater_test.go(20 hunks)router/pkg/pubsub/kafka/engine_datasource_factory.go(1 hunks)router/pkg/pubsub/kafka/provider_builder.go(1 hunks)router/pkg/pubsub/nats/engine_datasource_factory.go(1 hunks)router/pkg/pubsub/nats/engine_datasource_factory_test.go(2 hunks)router/pkg/pubsub/nats/engine_datasource_test.go(1 hunks)router/pkg/pubsub/nats/provider_builder.go(1 hunks)router/pkg/pubsub/redis/engine_datasource_factory.go(1 hunks)router/pkg/pubsub/redis/provider_builder.go(1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-08-28T09:18:10.121Z
Learnt from: endigma
PR: wundergraph/cosmo#2141
File: router-tests/http_subscriptions_test.go:100-108
Timestamp: 2025-08-28T09:18:10.121Z
Learning: In router-tests/http_subscriptions_test.go heartbeat tests, the message ordering should remain strict with data messages followed by heartbeat messages, as the timing is deterministic and known by design in the Cosmo router implementation.
Applied to files:
router-tests/modules/start_subscription_test.gorouter/pkg/pubsub/datasource/subscription_event_updater_test.go
📚 Learning: 2025-10-01T20:39:16.113Z
Learnt from: SkArchon
PR: wundergraph/cosmo#2252
File: router-tests/telemetry/telemetry_test.go:9684-9693
Timestamp: 2025-10-01T20:39:16.113Z
Learning: Repo preference: In router-tests/telemetry/telemetry_test.go, keep strict > 0 assertions for request.operation.*Time (parsingTime, normalizationTime, validationTime, planningTime) in telemetry-related tests; do not relax to >= 0 unless CI flakiness is observed.
Applied to files:
router-tests/modules/start_subscription_test.go
🔇 Additional comments (6)
router/core/subscriptions_modules.go (2)
167-169: Consider validating the data length when creating MutableEngineEvent.The type conversion
MutableEngineEvent(data)directly aliases the input byte slice. This is efficient but means the caller retains a reference to the underlying data. Ensure this is the intended behavior and that callers understand mutations will affect the original slice.If independent ownership is required, consider:
func (c *engineSubscriptionOnStartHookContext) NewRawEvent(data []byte) datasource.MutableStreamEvent { - return MutableEngineEvent(data) + return MutableEngineEvent(slices.Clone(data)) }
29-30: Well-designed abstraction for event creation.The addition of
NewRawEventto hook contexts and the consistent threading ofeventBuilderthrough all hook signatures provides a clean abstraction. Hook authors can now create broker-specific events without knowing the underlying implementation details. The pattern is applied consistently across all hook types (subscription start, publish, receive).Also applies to: 62-64, 102-104, 187-187, 196-196, 234-235, 261-262, 279-279, 287-287, 325-327, 334-334, 342-342
router/pkg/pubsub/nats/engine_datasource_factory.go (1)
79-105: Clean separation of ID generation and event creation.The refactoring to separate
uniqueRequestIdFn(lines 79-97) andcreateEventFn(lines 99-101) improves code clarity and aligns with the broader design pattern. Error handling in the ID generation function is appropriate.router/pkg/pubsub/kafka/provider_builder.go (1)
162-166: Consistent eventBuilder integration.The eventBuilder function and its integration into
NewPubSubProviderfollows the same pattern as the NATS and Redis providers. The code duplication across providers is acceptable since each works with provider-specific event types.router/pkg/pubsub/redis/provider_builder.go (1)
74-78: Consistent eventBuilder integration across all providers.The Redis provider builder follows the same pattern as Kafka and NATS, ensuring uniformity in how events are constructed across different message brokers. This consistency makes the codebase easier to maintain.
router/pkg/pubsub/nats/engine_datasource_test.go (1)
196-196: testNatsEventBuilder is properly defined and accessible.The helper
testNatsEventBuilderis defined inengine_datasource_factory_test.goas a package-level function. Since both test files are in the samenatspackage, the usage at line 196 is valid. No issues found.
|
Adressed coderrabbits points, which all are valid with the exception of his hallucination. |
Checklist
Note
First code design idea as a basis for discussion. Once the design is settled I will edit and extend tests.
Motivation
Currently it is relatively "uncomfortable" to create new events in Cosmo Streams hooks.
You have to know what type of message broker is being at play when a hook is called on order to then create a broker-specific event like
kafka.Event,nats.Eventorredis.Event.While it's certainly possible to do this, it is not very user-friendly.
It would be great to have a way for hook users to create new events without needing to know what provider the hook is associated with.
Implementation Design
This PR extends the handler contexts with a new method:
NewEvent(data []byte) datasource.StreamEvent.It allows a hook user to create new events independently of providers:
The above code works for all providers. Depending on the provider being at play when the hook is called,
ctx.NewEventreturns either a concretekafka.Event,nats.Eventorredis.Event, hidden behind thedatasource.StreamEventinterface.Summary by CodeRabbit
Release Notes