Skip to content

Commit 7746033

Browse files
committed
Fix sending multiple LLM Obs traces per request.
Previously, llm-obs-events-processor only recognized the first trace per request and ignored the rest. Fix sizeInBytes calculation for metrics and logging. Previously, it was the size of the entire buffer rather than of the actual number of bytes for the request
1 parent 69155bf commit 7746033

2 files changed

Lines changed: 50 additions & 22 deletions

File tree

dd-trace-core/src/main/java/datadog/trace/common/writer/PayloadDispatcherImpl.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,19 +112,21 @@ public void accept(int messageCount, ByteBuffer buffer) {
112112
if (response.success()) {
113113
if (log.isDebugEnabled()) {
114114
log.debug(
115-
"Successfully sent {} traces {} bytes to the API {}",
115+
"Successfully sent {} traces of size {} bytes to the API {}",
116116
messageCount,
117-
buffer.position(),
117+
sizeInBytes,
118118
mapper.endpoint());
119119
}
120120
healthMetrics.onSend(messageCount, sizeInBytes, response);
121121
} else {
122122
if (log.isDebugEnabled()) {
123123
log.debug(
124-
"Failed to send {} traces of size {} bytes to the API {}",
124+
"Failed to send {} traces of size {} bytes to the API {} status {} response {}",
125125
messageCount,
126126
sizeInBytes,
127-
mapper.endpoint());
127+
mapper.endpoint(),
128+
response.status(),
129+
response.response());
128130
}
129131
healthMetrics.onFailedSend(messageCount, sizeInBytes, response);
130132
}

dd-trace-core/src/main/java/datadog/trace/llmobs/writer/ddintake/LLMObsSpanMapper.java

Lines changed: 44 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
import static datadog.communication.http.OkHttpUtils.gzippedMsgpackRequestBodyOf;
44

5+
import datadog.communication.serialization.GrowableBuffer;
56
import datadog.communication.serialization.Writable;
7+
import datadog.communication.serialization.msgpack.MsgPackWriter;
68
import datadog.trace.api.DDTags;
79
import datadog.trace.api.intake.TrackType;
810
import datadog.trace.api.llmobs.LLMObs;
@@ -77,15 +79,30 @@ public class LLMObsSpanMapper implements RemoteMapper {
7779

7880
private static final String PARENT_ID_TAG_INTERNAL_FULL = LLMOBS_TAG_PREFIX + "parent_id";
7981

80-
private final LLMObsSpanMapper.MetaWriter metaWriter = new MetaWriter();
82+
private final MetaWriter metaWriter = new MetaWriter();
8183
private final int size;
8284

85+
private final ByteBuffer header;
86+
private int spansWritten;
87+
8388
public LLMObsSpanMapper() {
8489
this(5 << 20);
8590
}
8691

8792
private LLMObsSpanMapper(int size) {
8893
this.size = size;
94+
95+
GrowableBuffer header = new GrowableBuffer(64);
96+
MsgPackWriter headerWriter = new MsgPackWriter(header);
97+
98+
headerWriter.startMap(3);
99+
headerWriter.writeUTF8(EVENT_TYPE);
100+
headerWriter.writeString("span", null);
101+
headerWriter.writeUTF8(STAGE);
102+
headerWriter.writeString("raw", null);
103+
headerWriter.writeUTF8(SPANS);
104+
105+
this.header = header.slice();
89106
}
90107

91108
@Override
@@ -98,16 +115,6 @@ public void map(List<? extends CoreSpan<?>> trace, Writable writable) {
98115
return;
99116
}
100117

101-
writable.startMap(3);
102-
103-
writable.writeUTF8(EVENT_TYPE);
104-
writable.writeString("span", null);
105-
106-
writable.writeUTF8(STAGE);
107-
writable.writeString("raw", null);
108-
109-
writable.writeUTF8(SPANS);
110-
writable.startArray(llmobsSpans.size());
111118
for (CoreSpan<?> span : llmobsSpans) {
112119
writable.startMap(11);
113120
// 1
@@ -148,6 +155,10 @@ public void map(List<? extends CoreSpan<?>> trace, Writable writable) {
148155
/* 9 (metrics), 10 (tags), 11 meta */
149156
span.processTagsAndBaggage(metaWriter.withWritable(writable, getErrorsMap(span)));
150157
}
158+
159+
// Increase only after all spans have been written. This way, if it rolls back because of a
160+
// buffer overflow, the counter won't be skewed.
161+
spansWritten += llmobsSpans.size();
151162
}
152163

153164
private static boolean isLLMObsSpan(CoreSpan<?> span) {
@@ -157,7 +168,7 @@ private static boolean isLLMObsSpan(CoreSpan<?> span) {
157168

158169
@Override
159170
public Payload newPayload() {
160-
return new PayloadV1();
171+
return new PayloadV1(header, spansWritten);
161172
}
162173

163174
@Override
@@ -166,7 +177,10 @@ public int messageBufferSize() {
166177
}
167178

168179
@Override
169-
public void reset() {}
180+
public void reset() {
181+
// Reset the number of spans per message with each flush.
182+
spansWritten = 0;
183+
}
170184

171185
@Override
172186
public String endpoint() {
@@ -206,7 +220,7 @@ private static final class MetaWriter implements MetadataConsumer {
206220
LLMOBS_TAG_PREFIX + LLMObsTags.MODEL_VERSION,
207221
LLMOBS_TAG_PREFIX + LLMObsTags.METADATA)));
208222

209-
LLMObsSpanMapper.MetaWriter withWritable(Writable writable, Map<String, String> errorInfo) {
223+
MetaWriter withWritable(Writable writable, Map<String, String> errorInfo) {
210224
this.writable = writable;
211225
this.errorInfo = errorInfo;
212226
return this;
@@ -348,14 +362,20 @@ public void accept(Metadata metadata) {
348362
}
349363

350364
private static class PayloadV1 extends Payload {
365+
private final ByteBuffer header;
366+
private final int spansWritten;
367+
368+
public PayloadV1(ByteBuffer header, int spansWritten) {
369+
this.spansWritten = spansWritten;
370+
this.header = header;
371+
}
351372

352373
@Override
353374
public int sizeInBytes() {
354375
if (traceCount() == 0) {
355376
return msgpackMapHeaderSize(0);
356377
}
357-
358-
return body.array().length;
378+
return header.remaining() + msgpackArrayHeaderSize(spansWritten) + body.remaining();
359379
}
360380

361381
@Override
@@ -368,6 +388,8 @@ public void writeTo(WritableByteChannel channel) throws IOException {
368388
}
369389
} else {
370390
while (body.hasRemaining()) {
391+
channel.write(header.slice());
392+
channel.write(msgpackArrayHeader(spansWritten));
371393
channel.write(body);
372394
}
373395
}
@@ -379,9 +401,13 @@ public RequestBody toRequest() {
379401
if (traceCount() == 0) {
380402
buffers = Collections.singletonList(msgpackMapHeader(0));
381403
} else {
382-
buffers = Collections.singletonList(body);
404+
buffers =
405+
Arrays.asList(
406+
header.slice(),
407+
// Third Value: is an array of spans serialized into the body
408+
msgpackArrayHeader(spansWritten),
409+
body);
383410
}
384-
385411
return gzippedMsgpackRequestBodyOf(buffers);
386412
}
387413
}

0 commit comments

Comments
 (0)