Skip to content

Commit 0dc330e

Browse files
Abacnarvindram03
andauthored
Revert "Implementing lull reporting at bundle level processing (#29882)" (#30648) (#30664)
This reverts commit ffe2dba. Co-authored-by: Arvind Ram <arvindram03@gmail.com>
1 parent d9897b0 commit 0dc330e

6 files changed

Lines changed: 155 additions & 353 deletions

File tree

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ public class ExecutionStateTracker implements Comparable<ExecutionStateTracker>
4646
new ConcurrentHashMap<>();
4747

4848
private static final long LULL_REPORT_MS = TimeUnit.MINUTES.toMillis(5);
49-
private static final long BUNDLE_LULL_REPORT_MS = TimeUnit.MINUTES.toMillis(10);
5049
private static final AtomicIntegerFieldUpdater<ExecutionStateTracker> SAMPLING_UPDATER =
5150
AtomicIntegerFieldUpdater.newUpdater(ExecutionStateTracker.class, "sampling");
5251

@@ -140,17 +139,8 @@ public String getDescription() {
140139
*/
141140
private volatile long millisSinceLastTransition = 0;
142141

143-
/**
144-
* The number of milliseconds since the {@link ExecutionStateTracker} initial state.
145-
*
146-
* <p>This variable is updated by the Sampling thread, and read by the Progress Reporting thread,
147-
* thus it being marked volatile.
148-
*/
149-
private volatile long millisSinceBundleStart = 0;
150-
151142
private long transitionsAtLastSample = 0;
152143
private long nextLullReportMs = LULL_REPORT_MS;
153-
private long nextBundleLullReportMs = BUNDLE_LULL_REPORT_MS;
154144

155145
public ExecutionStateTracker(ExecutionStateSampler sampler) {
156146
this.sampler = sampler;
@@ -165,10 +155,8 @@ public synchronized void reset() {
165155
currentState = null;
166156
numTransitions = 0;
167157
millisSinceLastTransition = 0;
168-
millisSinceBundleStart = 0;
169158
transitionsAtLastSample = 0;
170159
nextLullReportMs = LULL_REPORT_MS;
171-
nextBundleLullReportMs = BUNDLE_LULL_REPORT_MS;
172160
}
173161

174162
@VisibleForTesting
@@ -347,19 +335,6 @@ protected void takeSampleOnce(long millisSinceLastSample) {
347335
transitionsAtLastSample = transitionsAtThisSample;
348336
}
349337
updateMillisSinceLastTransition(millisSinceLastSample, state);
350-
updateMillisSinceBundleStart(millisSinceLastSample);
351-
}
352-
353-
// Override this to implement bundle level lull reporting.
354-
protected void reportBundleLull(long millisSinceBundleStart) {}
355-
356-
@SuppressWarnings("NonAtomicVolatileUpdate")
357-
private void updateMillisSinceBundleStart(long millisSinceLastSample) {
358-
millisSinceBundleStart += millisSinceLastSample;
359-
if (millisSinceBundleStart > nextBundleLullReportMs) {
360-
reportBundleLull(millisSinceBundleStart);
361-
nextBundleLullReportMs += BUNDLE_LULL_REPORT_MS;
362-
}
363338
}
364339

365340
@SuppressWarnings("NonAtomicVolatileUpdate")

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java

Lines changed: 3 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
2121

22-
import com.google.api.client.util.Clock;
2322
import com.google.api.services.dataflow.model.SideInputInfo;
2423
import java.io.Closeable;
2524
import java.io.IOException;
@@ -30,8 +29,6 @@
3029
import java.util.LinkedHashMap;
3130
import java.util.Map;
3231
import java.util.Optional;
33-
import java.util.logging.Level;
34-
import java.util.logging.LogRecord;
3532
import java.util.stream.Collectors;
3633
import javax.annotation.concurrent.GuardedBy;
3734
import org.apache.beam.runners.core.NullSideInputReader;
@@ -40,30 +37,22 @@
4037
import org.apache.beam.runners.core.TimerInternals.TimerData;
4138
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
4239
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
43-
import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
4440
import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
4541
import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
4642
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
4743
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
48-
import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingHandler;
49-
import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer;
5044
import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementExecutionTracker;
5145
import org.apache.beam.sdk.coders.Coder;
5246
import org.apache.beam.sdk.metrics.MetricsContainer;
5347
import org.apache.beam.sdk.metrics.MetricsEnvironment;
5448
import org.apache.beam.sdk.options.PipelineOptions;
5549
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
5650
import org.apache.beam.sdk.values.PCollectionView;
57-
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
5851
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
5952
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Closer;
6053
import org.checkerframework.checker.nullness.qual.Nullable;
61-
import org.joda.time.Duration;
54+
import org.joda.time.DateTimeUtils.MillisProvider;
6255
import org.joda.time.Instant;
63-
import org.joda.time.format.PeriodFormatter;
64-
import org.joda.time.format.PeriodFormatterBuilder;
65-
import org.slf4j.Logger;
66-
import org.slf4j.LoggerFactory;
6756

6857
/** Execution context for the Dataflow worker. */
6958
@SuppressWarnings({
@@ -271,59 +260,23 @@ public static class DataflowExecutionStateTracker extends ExecutionStateTracker
271260
@Nullable
272261
private ActiveMessageMetadata activeMessageMetadata = null;
273262

274-
/** Clock used to either provide real system time or mocked to virtualize time for testing. */
275-
private final Clock clock;
263+
private final MillisProvider clock = System::currentTimeMillis;
276264

277265
@GuardedBy("this")
278266
private final Map<String, IntSummaryStatistics> processingTimesByStep = new HashMap<>();
279267

280-
/** Last milliseconds since epoch when a full thread dump was performed. */
281-
private long lastFullThreadDumpMillis = 0;
282-
283-
/** The minimum lull duration in milliseconds to perform a full thread dump. */
284-
private static final long LOG_BUNDLE_LULL_FULL_THREAD_DUMP_LULL_MS = 20 * 60 * 1000;
285-
286-
private static final Logger LOG = LoggerFactory.getLogger(DataflowExecutionStateTracker.class);
287-
288-
private static final PeriodFormatter DURATION_FORMATTER =
289-
new PeriodFormatterBuilder()
290-
.appendDays()
291-
.appendSuffix("d")
292-
.minimumPrintedDigits(2)
293-
.appendHours()
294-
.appendSuffix("h")
295-
.printZeroAlways()
296-
.appendMinutes()
297-
.appendSuffix("m")
298-
.appendSeconds()
299-
.appendSuffix("s")
300-
.toFormatter();
301-
302268
public DataflowExecutionStateTracker(
303269
ExecutionStateSampler sampler,
304270
DataflowOperationContext.DataflowExecutionState otherState,
305271
CounterFactory counterFactory,
306272
PipelineOptions options,
307273
String workItemId) {
308-
this(sampler, otherState, counterFactory, options, workItemId, Clock.SYSTEM);
309-
}
310-
311-
@VisibleForTesting
312-
public DataflowExecutionStateTracker(
313-
ExecutionStateSampler sampler,
314-
DataflowOperationContext.DataflowExecutionState otherState,
315-
CounterFactory counterFactory,
316-
PipelineOptions options,
317-
String workItemId,
318-
Clock clock) {
319274
super(sampler);
320275
this.elementExecutionTracker =
321276
DataflowElementExecutionTracker.create(counterFactory, options);
322277
this.otherState = otherState;
323278
this.workItemId = workItemId;
324279
this.contextActivationObserverRegistry = ContextActivationObserverRegistry.createDefault();
325-
this.clock = clock;
326-
DataflowWorkerLoggingInitializer.initialize();
327280
}
328281

329282
@Override
@@ -348,76 +301,12 @@ public Closeable activate() {
348301
}
349302
}
350303

351-
private boolean shouldLogFullThreadDumpForBundle(Duration lullDuration) {
352-
if (lullDuration.getMillis() < LOG_BUNDLE_LULL_FULL_THREAD_DUMP_LULL_MS) {
353-
return false;
354-
}
355-
long now = clock.currentTimeMillis();
356-
if (lastFullThreadDumpMillis + LOG_BUNDLE_LULL_FULL_THREAD_DUMP_LULL_MS < now) {
357-
lastFullThreadDumpMillis = now;
358-
return true;
359-
}
360-
return false;
361-
}
362-
363-
private String getBundleLullMessage(Duration lullDuration) {
364-
StringBuilder message = new StringBuilder();
365-
message
366-
.append("Operation ongoing in bundle for at least ")
367-
.append(DURATION_FORMATTER.print(lullDuration.toPeriod()))
368-
.append(" without completing")
369-
.append("\n");
370-
synchronized (this) {
371-
if (this.activeMessageMetadata != null) {
372-
message.append(
373-
"Current user step name: " + getActiveMessageMetadata().get().userStepName() + "\n");
374-
message.append(
375-
"Time spent in this step(millis): "
376-
+ (clock.currentTimeMillis() - getActiveMessageMetadata().get().startTime())
377-
+ "\n");
378-
}
379-
message.append("Processing times in each step(millis)\n");
380-
for (Map.Entry<String, IntSummaryStatistics> entry :
381-
this.processingTimesByStep.entrySet()) {
382-
message.append("Step name: " + entry.getKey() + "\n");
383-
message.append("Time spent in this step: " + entry.getValue().toString() + "\n");
384-
}
385-
}
386-
387-
return message.toString();
388-
}
389-
390304
@Override
391305
protected void takeSampleOnce(long millisSinceLastSample) {
392306
elementExecutionTracker.takeSample(millisSinceLastSample);
393307
super.takeSampleOnce(millisSinceLastSample);
394308
}
395309

396-
@Override
397-
protected void reportBundleLull(long millisElapsedSinceBundleStart) {
398-
// If we're not logging warnings, nothing to report.
399-
if (!LOG.isWarnEnabled()) {
400-
return;
401-
}
402-
403-
Duration lullDuration = Duration.millis(millisElapsedSinceBundleStart);
404-
405-
// Since the lull reporting executes in the sampler thread, it won't automatically inherit the
406-
// context of the current step. To ensure things are logged correctly, we get the currently
407-
// registered DataflowWorkerLoggingHandler and log directly in the desired context.
408-
LogRecord logRecord = new LogRecord(Level.WARNING, getBundleLullMessage(lullDuration));
409-
logRecord.setLoggerName(DataflowExecutionStateTracker.LOG.getName());
410-
411-
// Publish directly in the context of this specific ExecutionState.
412-
DataflowWorkerLoggingHandler dataflowLoggingHandler =
413-
DataflowWorkerLoggingInitializer.getLoggingHandler();
414-
dataflowLoggingHandler.publish(logRecord);
415-
416-
if (shouldLogFullThreadDumpForBundle(lullDuration)) {
417-
StackTraceUtil.logAllStackTraces();
418-
}
419-
}
420-
421310
/**
422311
* Enter a new state on the tracker. If the new state is a Dataflow processing state, tracks the
423312
* activeMessageMetadata with the start time of the new state.
@@ -434,7 +323,7 @@ public Closeable enterState(ExecutionState newState) {
434323
synchronized (this) {
435324
this.activeMessageMetadata =
436325
ActiveMessageMetadata.create(
437-
newDFState.getStepName().userName(), clock.currentTimeMillis());
326+
newDFState.getStepName().userName(), clock.getMillis());
438327
}
439328
}
440329
elementExecutionTracker.enter(newDFState.getStepName());

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,16 @@
1919

2020
import static org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor.longToSplitInt;
2121

22+
import com.google.api.client.util.Clock;
2223
import com.google.api.services.dataflow.model.CounterMetadata;
2324
import com.google.api.services.dataflow.model.CounterStructuredName;
2425
import com.google.api.services.dataflow.model.CounterStructuredNameAndMetadata;
2526
import com.google.api.services.dataflow.model.CounterUpdate;
2627
import java.io.Closeable;
28+
import java.util.Map;
2729
import java.util.logging.Level;
2830
import java.util.logging.LogRecord;
31+
import org.apache.beam.runners.core.SimpleDoFnRunner;
2932
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
3033
import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
3134
import org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Kind;
@@ -39,6 +42,7 @@
3942
import org.apache.beam.sdk.metrics.MetricsContainer;
4043
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
4144
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
45+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
4246
import org.checkerframework.checker.nullness.qual.Nullable;
4347
import org.joda.time.Duration;
4448
import org.joda.time.format.PeriodFormatter;
@@ -181,19 +185,41 @@ public abstract static class DataflowExecutionState extends ExecutionState {
181185
private final ProfileScope profileScope;
182186
private final @Nullable MetricsContainer metricsContainer;
183187

188+
/** Clock used to either provide real system time or mocked to virtualize time for testing. */
189+
private final Clock clock;
190+
184191
public DataflowExecutionState(
185192
NameContext nameContext,
186193
String stateName,
187194
@Nullable String requestingStepName,
188195
@Nullable Integer inputIndex,
189196
@Nullable MetricsContainer metricsContainer,
190197
ProfileScope profileScope) {
198+
this(
199+
nameContext,
200+
stateName,
201+
requestingStepName,
202+
inputIndex,
203+
metricsContainer,
204+
profileScope,
205+
Clock.SYSTEM);
206+
}
207+
208+
public DataflowExecutionState(
209+
NameContext nameContext,
210+
String stateName,
211+
@Nullable String requestingStepName,
212+
@Nullable Integer inputIndex,
213+
@Nullable MetricsContainer metricsContainer,
214+
ProfileScope profileScope,
215+
Clock clock) {
191216
super(stateName);
192217
this.stepName = nameContext;
193218
this.requestingStepName = requestingStepName;
194219
this.inputIndex = inputIndex;
195220
this.profileScope = Preconditions.checkNotNull(profileScope);
196221
this.metricsContainer = metricsContainer;
222+
this.clock = clock;
197223
}
198224

199225
/**
@@ -225,6 +251,9 @@ public String getDescription() {
225251
return description.toString();
226252
}
227253

254+
private static final ImmutableSet<String> FRAMEWORK_CLASSES =
255+
ImmutableSet.of(SimpleDoFnRunner.class.getName(), DoFnInstanceManagers.class.getName());
256+
228257
protected String getLullMessage(Thread trackedThread, Duration lullDuration) {
229258
StringBuilder message = new StringBuilder();
230259
message.append("Operation ongoing");
@@ -243,7 +272,7 @@ protected String getLullMessage(Thread trackedThread, Duration lullDuration) {
243272

244273
message.append("\n");
245274

246-
message.append(StackTraceUtil.getStackTraceForLullMessage(trackedThread.getStackTrace()));
275+
message.append(getStackTraceForLullMessage(trackedThread.getStackTrace()));
247276
return message.toString();
248277
}
249278

@@ -267,6 +296,55 @@ public void reportLull(Thread trackedThread, long millis) {
267296
DataflowWorkerLoggingHandler dataflowLoggingHandler =
268297
DataflowWorkerLoggingInitializer.getLoggingHandler();
269298
dataflowLoggingHandler.publish(this, logRecord);
299+
300+
if (shouldLogFullThreadDump(lullDuration)) {
301+
Map<Thread, StackTraceElement[]> threadSet = Thread.getAllStackTraces();
302+
for (Map.Entry<Thread, StackTraceElement[]> entry : threadSet.entrySet()) {
303+
Thread thread = entry.getKey();
304+
StackTraceElement[] stackTrace = entry.getValue();
305+
StringBuilder message = new StringBuilder();
306+
message.append(thread.toString()).append(":\n");
307+
message.append(getStackTraceForLullMessage(stackTrace));
308+
logRecord = new LogRecord(Level.INFO, message.toString());
309+
logRecord.setLoggerName(DataflowOperationContext.LOG.getName());
310+
dataflowLoggingHandler.publish(this, logRecord);
311+
}
312+
}
313+
}
314+
315+
/**
316+
* The time interval between two full thread dump. (A full thread dump is performed at most once
317+
* every 20 minutes.)
318+
*/
319+
private static final long LOG_LULL_FULL_THREAD_DUMP_INTERVAL_MS = 20 * 60 * 1000;
320+
321+
/** The minimum lull duration to perform a full thread dump. */
322+
private static final long LOG_LULL_FULL_THREAD_DUMP_LULL_MS = 20 * 60 * 1000;
323+
324+
/** Last time when a full thread dump was performed. */
325+
private long lastFullThreadDumpMillis = 0;
326+
327+
private boolean shouldLogFullThreadDump(Duration lullDuration) {
328+
if (lullDuration.getMillis() < LOG_LULL_FULL_THREAD_DUMP_LULL_MS) {
329+
return false;
330+
}
331+
long now = clock.currentTimeMillis();
332+
if (lastFullThreadDumpMillis + LOG_LULL_FULL_THREAD_DUMP_INTERVAL_MS < now) {
333+
lastFullThreadDumpMillis = now;
334+
return true;
335+
}
336+
return false;
337+
}
338+
339+
private String getStackTraceForLullMessage(StackTraceElement[] stackTrace) {
340+
StringBuilder message = new StringBuilder();
341+
for (StackTraceElement e : stackTrace) {
342+
if (FRAMEWORK_CLASSES.contains(e.getClassName())) {
343+
break;
344+
}
345+
message.append(" at ").append(e).append("\n");
346+
}
347+
return message.toString();
270348
}
271349

272350
public @Nullable MetricsContainer getMetricsContainer() {

0 commit comments

Comments
 (0)