Skip to content

Commit b7a65db

Browse files
authored
add kafka topic/partition/offset to the extension of event (#896)
* add kafka partition/offset to the extension of event Signed-off-by: myan <myan@redhat.com> * initialize header with kafka information Signed-off-by: myan <myan@redhat.com> * fixed the integration test Signed-off-by: myan <myan@redhat.com> * fixed the sarama binding test issue Signed-off-by: myan <myan@redhat.com> * add check empty in the integration test Signed-off-by: myan <myan@redhat.com> * verfiy the kafka topic value Signed-off-by: myan <myan@redhat.com> * clarify the test code Signed-off-by: myan <myan@redhat.com> --------- Signed-off-by: myan <myan@redhat.com>
1 parent bc9170f commit b7a65db

File tree

3 files changed

+46
-15
lines changed

3 files changed

+46
-15
lines changed

protocol/kafka_sarama/v2/message.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package kafka_sarama
88
import (
99
"bytes"
1010
"context"
11+
"strconv"
1112
"strings"
1213

1314
"github.com/cloudevents/sdk-go/v2/binding"
@@ -35,22 +36,27 @@ type Message struct {
3536
}
3637

3738
// Check if http.Message implements binding.Message
38-
var _ binding.Message = (*Message)(nil)
39-
var _ binding.MessageMetadataReader = (*Message)(nil)
39+
var (
40+
_ binding.Message = (*Message)(nil)
41+
_ binding.MessageMetadataReader = (*Message)(nil)
42+
)
4043

4144
// NewMessageFromConsumerMessage returns a binding.Message that holds the provided ConsumerMessage.
4245
// The returned binding.Message *can* be read several times safely
4346
// This function *doesn't* guarantee that the returned binding.Message is always a kafka_sarama.Message instance
4447
func NewMessageFromConsumerMessage(cm *sarama.ConsumerMessage) *Message {
4548
var contentType string
46-
headers := make(map[string][]byte, len(cm.Headers))
49+
headers := make(map[string][]byte, len(cm.Headers)+3)
4750
for _, r := range cm.Headers {
4851
k := strings.ToLower(string(r.Key))
4952
if k == contentTypeHeader {
5053
contentType = string(r.Value)
5154
}
5255
headers[k] = r.Value
5356
}
57+
headers[prefix+"kafkaoffset"] = []byte(strconv.FormatInt(cm.Offset, 10))
58+
headers[prefix+"kafkapartition"] = []byte(strconv.FormatInt(int64(cm.Partition), 10))
59+
headers[prefix+"kafkatopic"] = []byte(cm.Topic)
5460
return NewMessage(cm.Value, contentType, headers)
5561
}
5662

test/integration/kafka_sarama/kafka_test.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,30 @@ import (
2121
)
2222

2323
const (
24-
TEST_GROUP_ID = "test_group_id"
24+
TEST_GROUP_ID = "test_group_id"
25+
KAFKA_OFFSET = "kafkaoffset"
26+
KAFKA_PARTITION = "kafkapartition"
27+
KAFKA_TOPIC = "kafkatopic"
2528
)
2629

30+
var TopicName = "test-ce-client-" + uuid.New().String()
31+
2732
func TestSendEvent(t *testing.T) {
2833
test.EachEvent(t, test.Events(), func(t *testing.T, eventIn event.Event) {
2934
eventIn = test.ConvertEventExtensionsToString(t, eventIn)
3035
clienttest.SendReceive(t, func() interface{} {
3136
return protocolFactory(t)
32-
}, eventIn, func(e event.Event) {
33-
test.AssertEventEquals(t, eventIn, test.ConvertEventExtensionsToString(t, e))
37+
}, eventIn, func(eventOut event.Event) {
38+
eventOut = test.ConvertEventExtensionsToString(t, eventOut)
39+
40+
require.Equal(t, TopicName, eventOut.Extensions()[KAFKA_TOPIC])
41+
require.NotNil(t, eventOut.Extensions()[KAFKA_PARTITION])
42+
require.NotNil(t, eventOut.Extensions()[KAFKA_OFFSET])
43+
44+
test.AllOf(
45+
test.HasExactlyAttributesEqualTo(eventIn.Context),
46+
test.HasData(eventIn.Data()),
47+
)
3448
})
3549
})
3650
}
@@ -60,11 +74,10 @@ func testClient(t testing.TB) sarama.Client {
6074
func protocolFactory(t testing.TB) *kafka_sarama.Protocol {
6175
client := testClient(t)
6276

63-
topicName := "test-ce-client-" + uuid.New().String()
6477
options := []kafka_sarama.ProtocolOptionFunc{
6578
kafka_sarama.WithReceiverGroupId(TEST_GROUP_ID),
6679
}
67-
p, err := kafka_sarama.NewProtocolFromClient(client, topicName, topicName, options...)
80+
p, err := kafka_sarama.NewProtocolFromClient(client, TopicName, TopicName, options...)
6881
require.NoError(t, err)
6982

7083
return p

test/integration/kafka_sarama_binding/kafka_test.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,14 @@ import (
2727
)
2828

2929
const (
30-
TEST_GROUP_ID = "test_group_id"
30+
TEST_GROUP_ID = "test_group_id"
31+
KAFKA_OFFSET = "kafkaoffset"
32+
KAFKA_PARTITION = "kafkapartition"
33+
KAFKA_TOPIC = "kafkatopic"
3134
)
3235

3336
func TestSendStructuredMessageToStructured(t *testing.T) {
34-
close, s, r := testSenderReceiver(t)
37+
close, s, r, _ := testSenderReceiver(t)
3538
defer close()
3639
EachEvent(t, Events(), func(t *testing.T, eventIn event.Event) {
3740
eventIn = ConvertEventExtensionsToString(t, eventIn)
@@ -46,7 +49,7 @@ func TestSendStructuredMessageToStructured(t *testing.T) {
4649
}
4750

4851
func TestSendBinaryMessageToBinary(t *testing.T) {
49-
close, s, r := testSenderReceiver(t)
52+
close, s, r, topicName := testSenderReceiver(t)
5053
defer close()
5154
EachEvent(t, Events(), func(t *testing.T, eventIn event.Event) {
5255
eventIn = ConvertEventExtensionsToString(t, eventIn)
@@ -55,7 +58,16 @@ func TestSendBinaryMessageToBinary(t *testing.T) {
5558
test.SendReceive(t, binding.WithPreferredEventEncoding(context.TODO(), binding.EncodingBinary), in, s, r, func(out binding.Message) {
5659
eventOut := MustToEvent(t, context.Background(), out)
5760
assert.Equal(t, binding.EncodingBinary, out.ReadEncoding())
58-
AssertEventEquals(t, eventIn, ConvertEventExtensionsToString(t, eventOut))
61+
eventOut = ConvertEventExtensionsToString(t, eventOut)
62+
63+
require.Equal(t, topicName, eventOut.Extensions()[KAFKA_TOPIC])
64+
require.NotNil(t, eventOut.Extensions()[KAFKA_PARTITION])
65+
require.NotNil(t, eventOut.Extensions()[KAFKA_OFFSET])
66+
67+
AllOf(
68+
HasExactlyAttributesEqualTo(eventIn.Context),
69+
HasData(eventIn.Data()),
70+
)
5971
})
6072
})
6173
}
@@ -82,7 +94,7 @@ func testClient(t testing.TB) sarama.Client {
8294
return client
8395
}
8496

85-
func testSenderReceiver(t testing.TB) (func(), bindings.Sender, bindings.Receiver) {
97+
func testSenderReceiver(t testing.TB) (func(), bindings.Sender, bindings.Receiver, string) {
8698
client := testClient(t)
8799

88100
topicName := "test-ce-client-" + uuid.New().String()
@@ -97,11 +109,11 @@ func testSenderReceiver(t testing.TB) (func(), bindings.Sender, bindings.Receive
97109
return func() {
98110
require.NoError(t, p.Close(context.TODO()))
99111
require.NoError(t, client.Close())
100-
}, p, p
112+
}, p, p, topicName
101113
}
102114

103115
func BenchmarkSendReceive(b *testing.B) {
104-
c, s, r := testSenderReceiver(b)
116+
c, s, r, _ := testSenderReceiver(b)
105117
defer c() // Cleanup
106118
test.BenchmarkSendReceive(b, s, r)
107119
}

0 commit comments

Comments
 (0)