Skip to content

Commit 0cb0687

Browse files
committed
Fix EMR Spark support by using Seq instead of ArrayBuffer for externalAccums
1 parent 1abe140 commit 0cb0687

2 files changed

Lines changed: 8 additions & 2 deletions

File tree

dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.apache.spark.util.AccumulatorV2;
1515
import scala.Function1;
1616
import scala.collection.JavaConverters;
17+
import scala.collection.Seq;
1718
import scala.collection.mutable.ArrayBuffer;
1819

1920
/**
@@ -91,7 +92,9 @@ protected List<AccumulatorV2> getExternalAccumulators(TaskMetrics metrics) {
9192
}
9293

9394
// withExternalAccums didn't work, try the legacy method
94-
ArrayBuffer<AccumulatorV2> accumulators = methodLoader.invoke(externalAccums, metrics);
95+
// Use Seq (not ArrayBuffer) since some Spark distributions (e.g. Amazon EMR) return a
96+
// JListWrapper which implements Seq but cannot be cast to ArrayBuffer
97+
Seq<AccumulatorV2> accumulators = methodLoader.invoke(externalAccums, metrics);
9598
if (accumulators != null && !accumulators.isEmpty()) {
9699
return JavaConverters.seqAsJavaList(accumulators);
97100
}

dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.spark.sql.execution.metric.SQLMetricInfo;
1414
import org.apache.spark.util.AccumulatorV2;
1515
import scala.Function1;
16+
import scala.collection.Seq;
1617
import scala.collection.mutable.ArrayBuffer;
1718
import scala.jdk.javaapi.CollectionConverters;
1819

@@ -91,7 +92,9 @@ protected List<AccumulatorV2> getExternalAccumulators(TaskMetrics metrics) {
9192
}
9293

9394
// withExternalAccums didn't work, try the legacy method
94-
ArrayBuffer<AccumulatorV2> accumulators = methodLoader.invoke(externalAccums, metrics);
95+
// Use Seq (not ArrayBuffer) since some Spark distributions (e.g. Amazon EMR) return a
96+
// JListWrapper which implements Seq but cannot be cast to ArrayBuffer
97+
Seq<AccumulatorV2> accumulators = methodLoader.invoke(externalAccums, metrics);
9598
if (accumulators != null && !accumulators.isEmpty()) {
9699
return CollectionConverters.asJava(accumulators);
97100
}

0 commit comments

Comments
 (0)