@@ -13,6 +13,7 @@ import io.sentry.kafka.SentryKafkaProducerInterceptor
1313import io.sentry.test.initForTest
1414import java.nio.ByteBuffer
1515import java.nio.charset.StandardCharsets
16+ import java.util.Optional
1617import kotlin.test.AfterTest
1718import kotlin.test.BeforeTest
1819import kotlin.test.Test
@@ -22,6 +23,7 @@ import kotlin.test.assertTrue
2223import org.apache.kafka.clients.consumer.Consumer
2324import org.apache.kafka.clients.consumer.ConsumerRecord
2425import org.apache.kafka.common.header.internals.RecordHeaders
26+ import org.apache.kafka.common.record.TimestampType
2527import org.mockito.kotlin.any
2628import org.mockito.kotlin.mock
2729import org.mockito.kotlin.never
@@ -72,10 +74,21 @@ class SentryKafkaRecordInterceptorTest {
7274 private fun createRecord (
7375 topic : String = "my-topic",
7476 headers : RecordHeaders = RecordHeaders (),
77+ serializedValueSize : Int = -1,
7578 ): ConsumerRecord <String , String > {
76- val record = ConsumerRecord <String , String >(topic, 0 , 0L , " key" , " value" )
77- headers.forEach { record.headers().add(it) }
78- return record
79+ return ConsumerRecord (
80+ topic,
81+ 0 ,
82+ 0L ,
83+ System .currentTimeMillis(),
84+ TimestampType .CREATE_TIME ,
85+ 3 ,
86+ serializedValueSize,
87+ " key" ,
88+ " value" ,
89+ headers,
90+ Optional .empty(),
91+ )
7992 }
8093
8194 private fun createRecordWithHeaders (
@@ -164,6 +177,26 @@ class SentryKafkaRecordInterceptorTest {
164177 )
165178 }
166179
180+ @Test
181+ fun `sets body size from serializedValueSize` () {
182+ val interceptor = SentryKafkaRecordInterceptor <String , String >(scopes)
183+ val record = createRecord(serializedValueSize = 42 )
184+
185+ interceptor.intercept(record, consumer)
186+
187+ assertEquals(42 , transaction.data?.get(SpanDataConvention .MESSAGING_MESSAGE_BODY_SIZE ))
188+ }
189+
190+ @Test
191+ fun `does not set body size when serializedValueSize is negative` () {
192+ val interceptor = SentryKafkaRecordInterceptor <String , String >(scopes)
193+ val record = createRecord(serializedValueSize = - 1 )
194+
195+ interceptor.intercept(record, consumer)
196+
197+ assertNull(transaction.data?.get(SpanDataConvention .MESSAGING_MESSAGE_BODY_SIZE ))
198+ }
199+
167200 @Test
168201 fun `sets retry count from delivery attempt header` () {
169202 val interceptor = SentryKafkaRecordInterceptor <String , String >(scopes)
0 commit comments