@@ -1050,6 +1050,12 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
10501050
10511051 def " test producer DSM transaction tracking extracts transaction id from headers" () {
10521052 setup :
1053+ if (! isDataStreamsEnabled()) {
1054+ return
1055+ }
1056+
1057+ injectEnvConfig(" DD_DATA_STREAMS_ENABLED" , " true" )
1058+
10531059 // Configure a DSM transaction extractor for KAFKA_PRODUCE_HEADERS
10541060 def extractorsByTypeField = TEST_DATA_STREAMS_MONITORING . getClass(). getDeclaredField(" extractorsByType" )
10551061 extractorsByTypeField. setAccessible(true )
@@ -1087,12 +1093,18 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
10871093 producedSpan. getTag(Tags . DSM_TRANSACTION_CHECKPOINT ) == " kafka-produce-test"
10881094
10891095 cleanup :
1090- extractorsByTypeField. set(TEST_DATA_STREAMS_MONITORING , oldExtractorsByType)
1096+ extractorsByTypeField? . set(TEST_DATA_STREAMS_MONITORING , oldExtractorsByType)
10911097 producer?. close()
10921098 }
10931099
10941100 def " test consumer DSM transaction tracking extracts transaction id from headers" () {
10951101 setup :
1102+ if (! isDataStreamsEnabled()) {
1103+ return
1104+ }
1105+
1106+ injectEnvConfig(" DD_DATA_STREAMS_ENABLED" , " true" )
1107+
10961108 // Configure a DSM transaction extractor for KAFKA_CONSUME_HEADERS
10971109 def extractorsByTypeField = TEST_DATA_STREAMS_MONITORING . getClass(). getDeclaredField(" extractorsByType" )
10981110 extractorsByTypeField. setAccessible(true )
@@ -1151,7 +1163,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
11511163 consumerSpan. getTag(Tags . DSM_TRANSACTION_CHECKPOINT ) == " kafka-consume-test"
11521164
11531165 cleanup :
1154- extractorsByTypeField. set(TEST_DATA_STREAMS_MONITORING , oldExtractorsByType)
1166+ extractorsByTypeField? . set(TEST_DATA_STREAMS_MONITORING , oldExtractorsByType)
11551167 consumer?. close()
11561168 producer?. close()
11571169 }
0 commit comments