Skip to content

Commit c49d17b

Browse files
committed
debugger/symdb: add upload metadata fields to upload event message and attachment
Add the following fields to the SymDB upload event message that accompanies each multipart upload (camelCase, matching the rest of the EvP event schema): - "version" (top-level): the service version - "language" (top-level): "java" - "uploadId" (top-level): a UUID generated once per SymbolSink instance, shared by all batches uploaded by the sink - "batchNum" (top-level): 1-indexed counter incremented per upload - "final" (top-level): always false; the Java tracer continuously uploads new code as classes get loaded, so there is no defined end-of-upload point - "attachmentSize" (top-level): size in bytes of the (compressed) attachment payload Also add the same metadata to the gzipped attachment body via the ServiceVersion wrapper (snake_case to match the rest of the attachment scope schema): - "upload_id" - "batch_num" - "final" uploadId/batchNum are computed once per batch in serializeAndUpload so both the attachment and the event JSON carry the same values. Some of these fields are new, to be used by the backend in the future. Others duplicate info that was already included in the attachment; by duplicating some metadata out of the SymDB attachment body into the EvP event body, the backend can populate per-attachment bookkeeping without downloading the attachment.
1 parent 0015e8c commit c49d17b

3 files changed

Lines changed: 99 additions & 19 deletions

File tree

dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SymbolSink.java

Lines changed: 54 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.datadog.debugger.util.MoshiHelper;
1111
import com.squareup.moshi.JsonAdapter;
1212
import datadog.trace.api.Config;
13+
import datadog.trace.util.RandomUtils;
1314
import datadog.trace.util.TagsHelper;
1415
import java.io.ByteArrayOutputStream;
1516
import java.io.IOException;
@@ -21,6 +22,7 @@
2122
import java.util.List;
2223
import java.util.concurrent.ArrayBlockingQueue;
2324
import java.util.concurrent.BlockingQueue;
25+
import java.util.concurrent.atomic.AtomicLong;
2426
import java.util.zip.GZIPOutputStream;
2527
import okhttp3.HttpUrl;
2628
import okhttp3.MediaType;
@@ -36,21 +38,34 @@ public class SymbolSink {
3638
public static final BatchUploader.RetryPolicy RETRY_POLICY = new BatchUploader.RetryPolicy(10);
3739
private static final JsonAdapter<ServiceVersion> SERVICE_VERSION_ADAPTER =
3840
MoshiHelper.createMoshiSymbol().adapter(ServiceVersion.class);
41+
// The upload event message JSON. The "final" field is hard-coded to false:
42+
// the Java tracer continuously uploads new code as classes get loaded, so
43+
// there is no defined end-of-upload point.
3944
private static final String EVENT_FORMAT =
4045
"{%n"
4146
+ "\"ddsource\": \"dd_debugger\",%n"
4247
+ "\"service\": \"%s\",%n"
48+
+ "\"version\": \"%s\",%n"
49+
+ "\"language\": \"java\",%n"
4350
+ "\"runtimeId\": \"%s\",%n"
44-
+ "\"type\": \"symdb\"%n"
51+
+ "\"type\": \"symdb\",%n"
52+
+ "\"uploadId\": \"%s\",%n"
53+
+ "\"batchNum\": %d,%n"
54+
+ "\"final\": false,%n"
55+
+ "\"attachmentSize\": %d%n"
4556
+ "}";
4657
static final int MAX_SYMDB_UPLOAD_SIZE = 50 * 1024 * 1024;
4758

4859
private final String serviceName;
4960
private final String env;
5061
private final String version;
62+
private final String runtimeId;
5163
private final BatchUploader symbolUploader;
5264
private final int maxPayloadSize;
53-
private final BatchUploader.MultiPartContent event;
65+
// uploadId is shared by all batches uploaded by this sink. The backend uses
66+
// it to group batches belonging to the same logical upload.
67+
private final String uploadId = RandomUtils.randomUUID().toString();
68+
private final AtomicLong batchNum = new AtomicLong(0);
5469
private final BlockingQueue<Scope> scopes = new ArrayBlockingQueue<>(CAPACITY);
5570
private final Stats stats = new Stats();
5671
private final boolean isCompressed;
@@ -66,15 +81,10 @@ public SymbolSink(Config config) {
6681
this.serviceName = TagsHelper.sanitize(config.getServiceName());
6782
this.env = TagsHelper.sanitize(config.getEnv());
6883
this.version = TagsHelper.sanitize(config.getVersion());
84+
this.runtimeId = config.getRuntimeId();
6985
this.symbolUploader = symbolUploader;
7086
this.maxPayloadSize = maxPayloadSize;
7187
this.isCompressed = config.isSymbolDatabaseCompressed();
72-
byte[] eventContent =
73-
String.format(
74-
EVENT_FORMAT, TagsHelper.sanitize(config.getServiceName()), config.getRuntimeId())
75-
.getBytes(StandardCharsets.UTF_8);
76-
this.event =
77-
new BatchUploader.MultiPartContent(eventContent, "event", "event.json", APPLICATION_JSON);
7888
}
7989

8090
public void stop() {
@@ -111,22 +121,35 @@ public void flush() {
111121
}
112122

113123
private void serializeAndUpload(List<Scope> scopesToSerialize) {
124+
// Determine the batch number once so the attachment body and the EvP event
125+
// message agree on it.
126+
long currentBatch = batchNum.incrementAndGet();
114127
try {
115128
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(2 * 1024 * 1024);
116129
try (OutputStream outputStream =
117130
isCompressed ? new GZIPOutputStream(byteArrayOutputStream) : byteArrayOutputStream) {
118131
BufferedSink sink = Okio.buffer(Okio.sink(outputStream));
119132
SERVICE_VERSION_ADAPTER.toJson(
120-
sink, new ServiceVersion(serviceName, env, version, "JAVA", scopesToSerialize));
133+
sink,
134+
new ServiceVersion(
135+
serviceName,
136+
env,
137+
version,
138+
"JAVA",
139+
scopesToSerialize,
140+
uploadId,
141+
currentBatch,
142+
false /* isFinal */));
121143
sink.flush();
122144
}
123-
doUpload(scopesToSerialize, byteArrayOutputStream.toByteArray(), isCompressed);
145+
doUpload(scopesToSerialize, byteArrayOutputStream.toByteArray(), isCompressed, currentBatch);
124146
} catch (IOException e) {
125147
LOGGER.debug("Error serializing scopes", e);
126148
}
127149
}
128150

129-
private void doUpload(List<Scope> scopesToSerialize, byte[] payload, boolean isCompressed) {
151+
private void doUpload(
152+
List<Scope> scopesToSerialize, byte[] payload, boolean isCompressed, long currentBatch) {
130153
if (payload.length > maxPayloadSize) {
131154
LOGGER.warn(
132155
"Payload is too big: {}/{} isCompressed={}",
@@ -138,20 +161,38 @@ private void doUpload(List<Scope> scopesToSerialize, byte[] payload, boolean isC
138161
}
139162
updateStats(scopesToSerialize, payload.length);
140163
LOGGER.debug(
141-
"Sending {} jar scopes size={} isCompressed={}",
164+
"Sending {} jar scopes size={} isCompressed={} uploadId={} batchNum={}",
142165
scopesToSerialize.size(),
143166
payload.length,
144-
isCompressed);
167+
isCompressed,
168+
uploadId,
169+
currentBatch);
145170
String fileName = "file.json";
146171
MediaType mediaType = APPLICATION_JSON;
147172
if (isCompressed) {
148173
fileName = "file.gz";
149174
mediaType = APPLICATION_GZIP;
150175
}
176+
BatchUploader.MultiPartContent event = buildEvent(currentBatch, payload.length);
151177
symbolUploader.uploadAsMultipart(
152178
"", event, new BatchUploader.MultiPartContent(payload, "file", fileName, mediaType));
153179
}
154180

181+
private BatchUploader.MultiPartContent buildEvent(long currentBatch, int attachmentSize) {
182+
byte[] eventContent =
183+
String.format(
184+
EVENT_FORMAT,
185+
serviceName,
186+
version,
187+
runtimeId,
188+
uploadId.toString(),
189+
currentBatch,
190+
attachmentSize)
191+
.getBytes(StandardCharsets.UTF_8);
192+
return new BatchUploader.MultiPartContent(
193+
eventContent, "event", "event.json", APPLICATION_JSON);
194+
}
195+
155196
private static byte[] compressPayload(byte[] jsonBytes) {
156197
// usual compression factor 40:1 for those json payload, so we are preallocating 1/40
157198
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(jsonBytes.length / 40);

dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/symbol/ServiceVersion.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.datadog.debugger.symbol;
22

3+
import com.squareup.moshi.Json;
34
import java.util.List;
45

56
public class ServiceVersion {
@@ -10,13 +11,32 @@ public class ServiceVersion {
1011
private final String language;
1112
private final List<Scope> scopes;
1213

14+
@Json(name = "upload_id")
15+
private final String uploadId;
16+
17+
@Json(name = "batch_num")
18+
private final long batchNum;
19+
20+
@Json(name = "final")
21+
private final boolean isFinal;
22+
1323
public ServiceVersion(
14-
String service, String env, String version, String language, List<Scope> scopes) {
24+
String service,
25+
String env,
26+
String version,
27+
String language,
28+
List<Scope> scopes,
29+
String uploadId,
30+
long batchNum,
31+
boolean isFinal) {
1532
this.service = service;
1633
this.env = env;
1734
this.version = version;
1835
this.language = language;
1936
this.scopes = scopes;
37+
this.uploadId = uploadId;
38+
this.batchNum = batchNum;
39+
this.isFinal = isFinal;
2040
}
2141

2242
public List<Scope> getScopes() {

dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/sink/SymbolSinkTest.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ public void testSimpleFlush() {
2424
SymbolUploaderMock symbolUploaderMock = new SymbolUploaderMock();
2525
Config config = mock(Config.class);
2626
when(config.getServiceName()).thenReturn("service1");
27+
when(config.getVersion()).thenReturn("1.0.0");
28+
when(config.getRuntimeId()).thenReturn("test-runtime");
2729
when(config.isSymbolDatabaseCompressed()).thenReturn(false);
2830
SymbolSink symbolSink = new SymbolSink(config, symbolUploaderMock, MAX_SYMDB_UPLOAD_SIZE);
2931
symbolSink.addScope(Scope.builder(ScopeType.JAR, null, 0, 0).build());
@@ -35,13 +37,25 @@ public void testSimpleFlush() {
3537
String strEventContent = new String(eventContent.getContent());
3638
assertTrue(strEventContent.contains("\"ddsource\": \"dd_debugger\""));
3739
assertTrue(strEventContent.contains("\"service\": \"service1\""));
40+
assertTrue(strEventContent.contains("\"version\": \"1.0.0\""));
41+
assertTrue(strEventContent.contains("\"language\": \"java\""));
42+
assertTrue(strEventContent.contains("\"runtimeId\": \"test-runtime\""));
3843
assertTrue(strEventContent.contains("\"type\": \"symdb\""));
44+
assertTrue(strEventContent.contains("\"uploadId\":"));
45+
assertTrue(strEventContent.contains("\"batchNum\": 1"));
46+
assertTrue(strEventContent.contains("\"final\": false"));
47+
assertTrue(strEventContent.contains("\"attachmentSize\":"));
3948
BatchUploader.MultiPartContent symbolContent = symbolUploaderMock.multiPartContents.get(1);
4049
assertEquals("file", symbolContent.getPartName());
4150
assertEquals("file.json", symbolContent.getFileName());
42-
assertEquals(
43-
"{\"language\":\"JAVA\",\"scopes\":[{\"end_line\":0,\"has_injectible_lines\":false,\"scope_type\":\"JAR\",\"start_line\":0}],\"service\":\"service1\"}",
44-
new String(symbolContent.getContent()));
51+
String fileContent = new String(symbolContent.getContent());
52+
assertTrue(fileContent.contains("\"language\":\"JAVA\""));
53+
assertTrue(fileContent.contains("\"scopes\":["));
54+
assertTrue(fileContent.contains("\"service\":\"service1\""));
55+
assertTrue(fileContent.contains("\"version\":\"1.0.0\""));
56+
assertTrue(fileContent.contains("\"upload_id\":"));
57+
assertTrue(fileContent.contains("\"batch_num\":1"));
58+
assertTrue(fileContent.contains("\"final\":false"));
4559
}
4660

4761
@Test
@@ -219,8 +233,13 @@ public void maxCompressedAndSplit() {
219233
.build());
220234
}
221235
symbolSink.flush();
222-
assertEquals(4, symbolUploaderMock.multiPartContents.size());
223-
for (int i = 0; i < 4; i += 2) {
236+
int total = symbolUploaderMock.multiPartContents.size();
237+
assertTrue(
238+
total >= 4, "expected at least 4 multipart entries (2+ event/file pairs), got " + total);
239+
assertTrue(
240+
total % 2 == 0,
241+
"expected an even number of multipart entries (event/file pairs), got " + total);
242+
for (int i = 0; i < total; i += 2) {
224243
BatchUploader.MultiPartContent eventContent = symbolUploaderMock.multiPartContents.get(i);
225244
assertEquals("event", eventContent.getPartName());
226245
BatchUploader.MultiPartContent symbolContent =

0 commit comments

Comments
 (0)