add kafka topic/partition/offset to the extension of event#896
add kafka topic/partition/offset to the extension of event#896lionelvillard merged 7 commits intocloudevents:mainfrom
Conversation
Signed-off-by: myan <myan@redhat.com>
|
Can you please update the PR to include topic? nit: you could slightly optimize this existing call to avoid additional allocations if current headers is headers := make(map[string][]byte, 3+len(cm.Headers)) |
Signed-off-by: myan <myan@redhat.com>
Thanks @embano1 ! The kafka topic has been added the in this pr and also optimization of the header initialization is done. |
|
@yanmxa can you look at the test failures? |
|
it might also be good to add some new ones |
Signed-off-by: myan <myan@redhat.com>
Error: Received unexpected error:
kafka server: In the middle of a leadership election, there is currently no leader for this partition and hence it is unavailable for writesProbably not related to this PR, but worth taking a look? |
|
I'm going to kick off the tests again, but here's a link to the current test run for reference: https://github.com/cloudevents/sdk-go/actions/runs/4986878143/jobs/8928440318?pr=896 |
|
Still seeming failures due to the new fields: |
Signed-off-by: myan <myan@redhat.com>
|
From the error message, it is caused by the Sarama binding integration test, and I add some code to fix the error. |
|
@embano1 @lionelvillard for another round of reviews. |
|
Thanks @duglin @embano1 @lionelvillard! PTAL~ |
| eventIn.SetExtension(KAFKA_OFFSET, got.Extensions()[KAFKA_OFFSET]) | ||
| eventIn.SetExtension(KAFKA_PARTITION, got.Extensions()[KAFKA_PARTITION]) | ||
| eventIn.SetExtension(KAFKA_TOPIC, got.Extensions()[KAFKA_TOPIC]) |
There was a problem hiding this comment.
Haven't looked deeper at the different test.<> functions, but was wondering what this is supposed to do here? You're setting extensions on the input event based on got and then asserting. Is this correctly exercising and asserting the changes here?
There was a problem hiding this comment.
There is another way we can remove the KAFKA_OFFSET , KAFKA_PARTITION, and KAFKA_TOPIC from got. Is this way better?
There was a problem hiding this comment.
I'm curious, how do we remove extension attributes from a CE? I didn't see that option in here: https://github.com/cloudevents/sdk-go/blob/main/v2/event/event_writer.go
I'm ok with copying things from got into eventIn so that the Assert works (or removing them from got if that's possible), but it seems to me that there should be a check of the values in got to make sure they're not blank - even if we need to do those manually instead via a compare with eventIn.
There was a problem hiding this comment.
@duglin Thanks for your advice! I have added the empty check for the event extensions.
Signed-off-by: myan <myan@redhat.com>
| extensions := got.Extensions() | ||
| require.NotEmpty(t, extensions[KAFKA_OFFSET]) | ||
| require.NotEmpty(t, extensions[KAFKA_PARTITION]) | ||
| require.NotEmpty(t, extensions[KAFKA_TOPIC]) |
There was a problem hiding this comment.
can we check their values? Should they match the constants defined above?
There was a problem hiding this comment.
Sure! But we can only verify the topic defined above. And the partition/offset isn't defined here.
Signed-off-by: myan <myan@redhat.com>
| func TestSendEvent(t *testing.T) { | ||
| test.EachEvent(t, test.Events(), func(t *testing.T, eventIn event.Event) { | ||
| eventIn = test.ConvertEventExtensionsToString(t, eventIn) | ||
| clienttest.SendReceive(t, func() interface{} { | ||
| return protocolFactory(t) | ||
| }, eventIn, func(e event.Event) { | ||
| test.AssertEventEquals(t, eventIn, test.ConvertEventExtensionsToString(t, e)) | ||
| got := test.ConvertEventExtensionsToString(t, e) | ||
|
|
||
| require.Equal(t, TopicName, got.Extensions()[KAFKA_TOPIC]) | ||
| require.NotNil(t, got.Extensions()[KAFKA_PARTITION]) | ||
| require.NotNil(t, got.Extensions()[KAFKA_OFFSET]) | ||
|
|
||
| eventIn.SetExtension(KAFKA_OFFSET, got.Extensions()[KAFKA_OFFSET]) | ||
| eventIn.SetExtension(KAFKA_PARTITION, got.Extensions()[KAFKA_PARTITION]) | ||
| eventIn.SetExtension(KAFKA_TOPIC, TopicName) | ||
| test.AssertEventEquals(t, eventIn, got) | ||
| }) | ||
| }) | ||
| } |
There was a problem hiding this comment.
Suggesting the following changes for more clarity of the test (also for the other integration test):
func TestSendEvent(t *testing.T) {
test.EachEvent(t, test.Events(), func(t *testing.T, eventIn event.Event) {
eventIn = test.ConvertEventExtensionsToString(t, eventIn)
clienttest.SendReceive(t, func() interface{} {
return protocolFactory(t)
}, eventIn, func(eventOut event.Event) {
eventOut = test.ConvertEventExtensionsToString(t, eventOut)
require.Equal(t, TopicName, eventOut.Extensions()[KAFKA_TOPIC])
require.NotNil(t, eventOut.Extensions()[KAFKA_PARTITION])
require.NotNil(t, eventOut.Extensions()[KAFKA_OFFSET])
test.AllOf(
test.HasExactlyAttributesEqualTo(eventIn.Context),
test.HasData(eventIn.Data()),
)
})
})
}There was a problem hiding this comment.
Renamed the variable names for more clarity and dropped extension comparison since we're not setting specific extensions here and have require... to assert on the specific extensions we want. Ideally we'd have a custom test.EventMatcher to assert on extensions but skip specific ones which get added at a later stage. I'll create tracking issue.
Signed-off-by: myan <myan@redhat.com>
|
@embano1 you good with the latest? |
|
LGTM, thank you! |
|
@yanmxa thanks for your patience. |
|
LGTM |
Signed-off-by: myan myan@redhat.com
Refers: #846