Skip to content

Commit f4d8e76

Browse files
authored
Merge 4a48e54 into e0bb87f
2 parents e0bb87f + 4a48e54 commit f4d8e76

2 files changed

Lines changed: 41 additions & 3 deletions

File tree

sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,11 @@ private boolean isIgnored() {
177177
transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_ID, messageId);
178178
}
179179

180+
final int bodySize = record.serializedValueSize();
181+
if (bodySize >= 0) {
182+
transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_BODY_SIZE, bodySize);
183+
}
184+
180185
final @Nullable Integer retryCount = retryCount(record);
181186
if (retryCount != null) {
182187
transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT, retryCount);

sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import io.sentry.kafka.SentryKafkaProducerInterceptor
1313
import io.sentry.test.initForTest
1414
import java.nio.ByteBuffer
1515
import java.nio.charset.StandardCharsets
16+
import java.util.Optional
1617
import kotlin.test.AfterTest
1718
import kotlin.test.BeforeTest
1819
import kotlin.test.Test
@@ -22,6 +23,7 @@ import kotlin.test.assertTrue
2223
import org.apache.kafka.clients.consumer.Consumer
2324
import org.apache.kafka.clients.consumer.ConsumerRecord
2425
import org.apache.kafka.common.header.internals.RecordHeaders
26+
import org.apache.kafka.common.record.TimestampType
2527
import org.mockito.kotlin.any
2628
import org.mockito.kotlin.mock
2729
import org.mockito.kotlin.never
@@ -72,10 +74,21 @@ class SentryKafkaRecordInterceptorTest {
7274
private fun createRecord(
7375
topic: String = "my-topic",
7476
headers: RecordHeaders = RecordHeaders(),
77+
serializedValueSize: Int = -1,
7578
): ConsumerRecord<String, String> {
76-
val record = ConsumerRecord<String, String>(topic, 0, 0L, "key", "value")
77-
headers.forEach { record.headers().add(it) }
78-
return record
79+
return ConsumerRecord(
80+
topic,
81+
0,
82+
0L,
83+
System.currentTimeMillis(),
84+
TimestampType.CREATE_TIME,
85+
3,
86+
serializedValueSize,
87+
"key",
88+
"value",
89+
headers,
90+
Optional.empty(),
91+
)
7992
}
8093

8194
private fun createRecordWithHeaders(
@@ -164,6 +177,26 @@ class SentryKafkaRecordInterceptorTest {
164177
)
165178
}
166179

180+
@Test
181+
fun `sets body size from serializedValueSize`() {
182+
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
183+
val record = createRecord(serializedValueSize = 42)
184+
185+
interceptor.intercept(record, consumer)
186+
187+
assertEquals(42, transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_BODY_SIZE))
188+
}
189+
190+
@Test
191+
fun `does not set body size when serializedValueSize is negative`() {
192+
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
193+
val record = createRecord(serializedValueSize = -1)
194+
195+
interceptor.intercept(record, consumer)
196+
197+
assertNull(transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_BODY_SIZE))
198+
}
199+
167200
@Test
168201
fun `sets retry count from delivery attempt header`() {
169202
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)

0 commit comments

Comments
 (0)