Skip to content

Commit be3189d

Browse files
committed
Address review comments: always fire both body and filenames callbacks
- Fire both bodyCallback and filenamesCallback regardless of whether one triggers blocking, matching the canonical Tomcat/Liberty pattern. Previously, executeCallback threw BlockingException immediately, preventing filenamesCallback from running if body was blocked. - Avoid creating the filenames ArrayList when filenamesCallback is null. - Extract tryBlock() helper to deduplicate blocking logic shared between handleMultipartStrictFormData, handleStrictFormData, and executeCallback.
1 parent 99060f6 commit be3189d

1 file changed

Lines changed: 71 additions & 54 deletions

File tree

  • dd-java-agent/instrumentation/akka/akka-http/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec

dd-java-agent/instrumentation/akka/akka-http/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/UnmarshallerHelpers.java

Lines changed: 71 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -125,18 +125,10 @@ private static void executeCallback(
125125
Flow<Void> flow = callback.apply(reqCtx, conv);
126126
Flow.Action action = flow.getAction();
127127
if (action instanceof Flow.Action.RequestBlockingAction) {
128-
Flow.Action.RequestBlockingAction rba = (Flow.Action.RequestBlockingAction) action;
129-
BlockResponseFunction blockResponseFunction = reqCtx.getBlockResponseFunction();
130-
if (blockResponseFunction != null) {
131-
boolean success =
132-
blockResponseFunction.tryCommitBlockingResponse(reqCtx.getTraceSegment(), rba);
133-
if (success) {
134-
if (blockResponseFunction instanceof AkkaBlockResponseFunction) {
135-
AkkaBlockResponseFunction abrf = (AkkaBlockResponseFunction) blockResponseFunction;
136-
abrf.setUnmarshallBlock(true);
137-
}
138-
throw new BlockingException("Blocked request (for " + details + ")");
139-
}
128+
BlockingException e =
129+
tryBlock(reqCtx, (Flow.Action.RequestBlockingAction) action, "for " + details);
130+
if (e != null) {
131+
throw e;
140132
}
141133
}
142134
}
@@ -211,10 +203,10 @@ private static void handleMultipartStrictFormData(
211203
java.lang.Iterable<akka.http.javadsl.model.Multipart.FormData.BodyPart.Strict> strictParts =
212204
st.getStrictParts();
213205
Map<String, List<String>> conv = new HashMap<>();
214-
List<String> filenames = new ArrayList<>();
206+
List<String> filenames = filenamesCallback != null ? new ArrayList<>() : null;
215207
for (akka.http.javadsl.model.Multipart.FormData.BodyPart.Strict part : strictParts) {
216208
Optional<String> filenameOpt = part.getFilename();
217-
if (filenameOpt.isPresent() && !filenameOpt.get().isEmpty()) {
209+
if (filenames != null && filenameOpt.isPresent() && !filenameOpt.get().isEmpty()) {
218210
filenames.add(filenameOpt.get());
219211
}
220212

@@ -244,27 +236,33 @@ private static void handleMultipartStrictFormData(
244236
curStrings.add(s);
245237
}
246238

239+
BlockingException pendingBlock = null;
247240
if (bodyCallback != null) {
248-
executeCallback(reqCtx, bodyCallback, conv, "multipartFormDataUnmarshaller");
241+
Flow<Void> flow = bodyCallback.apply(reqCtx, conv);
242+
Flow.Action action = flow.getAction();
243+
if (action instanceof Flow.Action.RequestBlockingAction) {
244+
pendingBlock =
245+
tryBlock(
246+
reqCtx,
247+
(Flow.Action.RequestBlockingAction) action,
248+
"multipartFormDataUnmarshaller");
249+
}
249250
}
250251

251-
if (filenamesCallback != null && !filenames.isEmpty()) {
252-
Flow<Void> filenamesFlow = filenamesCallback.apply(reqCtx, filenames);
253-
Flow.Action filenamesAction = filenamesFlow.getAction();
254-
if (filenamesAction instanceof Flow.Action.RequestBlockingAction) {
255-
Flow.Action.RequestBlockingAction rba = (Flow.Action.RequestBlockingAction) filenamesAction;
256-
BlockResponseFunction brf = reqCtx.getBlockResponseFunction();
257-
if (brf != null) {
258-
boolean success = brf.tryCommitBlockingResponse(reqCtx.getTraceSegment(), rba);
259-
if (success) {
260-
if (brf instanceof AkkaBlockResponseFunction) {
261-
((AkkaBlockResponseFunction) brf).setUnmarshallBlock(true);
262-
}
263-
throw new BlockingException("Blocked request (multipart file upload)");
264-
}
252+
if (filenamesCallback != null && filenames != null && !filenames.isEmpty()) {
253+
Flow<Void> flow = filenamesCallback.apply(reqCtx, filenames);
254+
if (pendingBlock == null) {
255+
Flow.Action action = flow.getAction();
256+
if (action instanceof Flow.Action.RequestBlockingAction) {
257+
pendingBlock =
258+
tryBlock(reqCtx, (Flow.Action.RequestBlockingAction) action, "multipart file upload");
265259
}
266260
}
267261
}
262+
263+
if (pendingBlock != null) {
264+
throw pendingBlock;
265+
}
268266
}
269267

270268
public static Unmarshaller<HttpEntity, String> transformStringUnmarshaller(
@@ -418,9 +416,13 @@ public static Unmarshaller<HttpEntity, StrictForm> transformStrictFormUnmarshall
418416
}
419417

420418
private static void handleStrictFormData(StrictForm sf) {
419+
CallbackProvider cbp = AgentTracer.get().getCallbackProvider(RequestContextSlot.APPSEC);
420+
BiFunction<RequestContext, List<String>, Flow<Void>> filenamesCb =
421+
cbp.getCallback(EVENTS.requestFilesFilenames());
422+
421423
Iterator<Tuple2<String, StrictForm.Field>> iterator = sf.fields().iterator();
422424
Map<String, List<String>> conv = new HashMap<>();
423-
List<String> filenames = new ArrayList<>();
425+
List<String> filenames = filenamesCb != null ? new ArrayList<>() : null;
424426
while (iterator.hasNext()) {
425427
Tuple2<String, StrictForm.Field> next = iterator.next();
426428
String fieldName = next._1();
@@ -447,9 +449,11 @@ private static void handleStrictFormData(StrictForm sf) {
447449
instanceof akka.http.scaladsl.model.Multipart$FormData$BodyPart$Strict) {
448450
akka.http.scaladsl.model.Multipart$FormData$BodyPart$Strict bodyPart =
449451
(akka.http.scaladsl.model.Multipart$FormData$BodyPart$Strict) strictFieldValue;
450-
Optional<String> filenameOpt = bodyPart.getFilename();
451-
if (filenameOpt.isPresent() && !filenameOpt.get().isEmpty()) {
452-
filenames.add(filenameOpt.get());
452+
if (filenames != null) {
453+
Optional<String> filenameOpt = bodyPart.getFilename();
454+
if (filenameOpt.isPresent() && !filenameOpt.get().isEmpty()) {
455+
filenames.add(filenameOpt.get());
456+
}
453457
}
454458
HttpEntity.Strict sentity = bodyPart.entity();
455459
String s =
@@ -461,37 +465,34 @@ private static void handleStrictFormData(StrictForm sf) {
461465
}
462466
}
463467

464-
handleArbitraryPostData(conv, "HttpEntity -> StrictForm unmarshaller");
468+
BlockingException pendingBlock = null;
469+
try {
470+
handleArbitraryPostData(conv, "HttpEntity -> StrictForm unmarshaller");
471+
} catch (BlockingException e) {
472+
pendingBlock = e;
473+
}
465474

466-
if (!filenames.isEmpty()) {
475+
if (filenamesCb != null && filenames != null && !filenames.isEmpty()) {
467476
AgentSpan span = activeSpan();
468477
RequestContext reqCtx;
469478
if (span != null
470479
&& (reqCtx = span.getRequestContext()) != null
471480
&& reqCtx.getData(RequestContextSlot.APPSEC) != null) {
472-
CallbackProvider cbp = AgentTracer.get().getCallbackProvider(RequestContextSlot.APPSEC);
473-
BiFunction<RequestContext, List<String>, Flow<Void>> filenamesCb =
474-
cbp.getCallback(EVENTS.requestFilesFilenames());
475-
if (filenamesCb != null) {
476-
Flow<Void> filenamesFlow = filenamesCb.apply(reqCtx, filenames);
477-
Flow.Action filenamesAction = filenamesFlow.getAction();
478-
if (filenamesAction instanceof Flow.Action.RequestBlockingAction) {
479-
Flow.Action.RequestBlockingAction rba =
480-
(Flow.Action.RequestBlockingAction) filenamesAction;
481-
BlockResponseFunction brf = reqCtx.getBlockResponseFunction();
482-
if (brf != null) {
483-
boolean success = brf.tryCommitBlockingResponse(reqCtx.getTraceSegment(), rba);
484-
if (success) {
485-
if (brf instanceof AkkaBlockResponseFunction) {
486-
((AkkaBlockResponseFunction) brf).setUnmarshallBlock(true);
487-
}
488-
throw new BlockingException("Blocked request (multipart file upload)");
489-
}
490-
}
481+
Flow<Void> flow = filenamesCb.apply(reqCtx, filenames);
482+
if (pendingBlock == null) {
483+
Flow.Action action = flow.getAction();
484+
if (action instanceof Flow.Action.RequestBlockingAction) {
485+
pendingBlock =
486+
tryBlock(
487+
reqCtx, (Flow.Action.RequestBlockingAction) action, "multipart file upload");
491488
}
492489
}
493490
}
494491
}
492+
493+
if (pendingBlock != null) {
494+
throw pendingBlock;
495+
}
495496
}
496497

497498
private static Object tryConvertingScalaContainers(Object obj, int depth) {
@@ -539,6 +540,22 @@ private static void handleArbitraryPostData(Object o, String source) {
539540
executeCallback(reqCtx, callback, o, source);
540541
}
541542

543+
private static BlockingException tryBlock(
544+
RequestContext reqCtx, Flow.Action.RequestBlockingAction rba, String details) {
545+
BlockResponseFunction brf = reqCtx.getBlockResponseFunction();
546+
if (brf == null) {
547+
return null;
548+
}
549+
boolean success = brf.tryCommitBlockingResponse(reqCtx.getTraceSegment(), rba);
550+
if (!success) {
551+
return null;
552+
}
553+
if (brf instanceof AkkaBlockResponseFunction) {
554+
((AkkaBlockResponseFunction) brf).setUnmarshallBlock(true);
555+
}
556+
return new BlockingException("Blocked request (" + details + ")");
557+
}
558+
542559
private static void handleException(Exception e, String logMessage) {
543560
if (e instanceof BlockingException) {
544561
throw (BlockingException) e;

0 commit comments

Comments
 (0)