1919
2020import static org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .Preconditions .checkNotNull ;
2121
22- import com .google .api .client .util .Clock ;
2322import com .google .api .services .dataflow .model .SideInputInfo ;
2423import java .io .Closeable ;
2524import java .io .IOException ;
3029import java .util .LinkedHashMap ;
3130import java .util .Map ;
3231import java .util .Optional ;
33- import java .util .logging .Level ;
34- import java .util .logging .LogRecord ;
3532import java .util .stream .Collectors ;
3633import javax .annotation .concurrent .GuardedBy ;
3734import org .apache .beam .runners .core .NullSideInputReader ;
4037import org .apache .beam .runners .core .TimerInternals .TimerData ;
4138import org .apache .beam .runners .core .metrics .ExecutionStateSampler ;
4239import org .apache .beam .runners .core .metrics .ExecutionStateTracker ;
43- import org .apache .beam .runners .core .metrics .ExecutionStateTracker .ExecutionState ;
4440import org .apache .beam .runners .dataflow .worker .DataflowExecutionContext .DataflowStepContext ;
4541import org .apache .beam .runners .dataflow .worker .DataflowOperationContext .DataflowExecutionState ;
4642import org .apache .beam .runners .dataflow .worker .counters .CounterFactory ;
4743import org .apache .beam .runners .dataflow .worker .counters .NameContext ;
48- import org .apache .beam .runners .dataflow .worker .logging .DataflowWorkerLoggingHandler ;
49- import org .apache .beam .runners .dataflow .worker .logging .DataflowWorkerLoggingInitializer ;
5044import org .apache .beam .runners .dataflow .worker .util .common .worker .ElementExecutionTracker ;
5145import org .apache .beam .sdk .coders .Coder ;
5246import org .apache .beam .sdk .metrics .MetricsContainer ;
5347import org .apache .beam .sdk .metrics .MetricsEnvironment ;
5448import org .apache .beam .sdk .options .PipelineOptions ;
5549import org .apache .beam .sdk .transforms .windowing .BoundedWindow ;
5650import org .apache .beam .sdk .values .PCollectionView ;
57- import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .annotations .VisibleForTesting ;
5851import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .collect .Iterables ;
5952import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .io .Closer ;
6053import org .checkerframework .checker .nullness .qual .Nullable ;
61- import org .joda .time .Duration ;
54+ import org .joda .time .DateTimeUtils . MillisProvider ;
6255import org .joda .time .Instant ;
63- import org .joda .time .format .PeriodFormatter ;
64- import org .joda .time .format .PeriodFormatterBuilder ;
65- import org .slf4j .Logger ;
66- import org .slf4j .LoggerFactory ;
6756
6857/** Execution context for the Dataflow worker. */
6958@ SuppressWarnings ({
@@ -271,59 +260,23 @@ public static class DataflowExecutionStateTracker extends ExecutionStateTracker
271260 @ Nullable
272261 private ActiveMessageMetadata activeMessageMetadata = null ;
273262
274- /** Clock used to either provide real system time or mocked to virtualize time for testing. */
275- private final Clock clock ;
263+ private final MillisProvider clock = System ::currentTimeMillis ;
276264
277265 @ GuardedBy ("this" )
278266 private final Map <String , IntSummaryStatistics > processingTimesByStep = new HashMap <>();
279267
280- /** Last milliseconds since epoch when a full thread dump was performed. */
281- private long lastFullThreadDumpMillis = 0 ;
282-
283- /** The minimum lull duration in milliseconds to perform a full thread dump. */
284- private static final long LOG_BUNDLE_LULL_FULL_THREAD_DUMP_LULL_MS = 20 * 60 * 1000 ;
285-
286- private static final Logger LOG = LoggerFactory .getLogger (DataflowExecutionStateTracker .class );
287-
288- private static final PeriodFormatter DURATION_FORMATTER =
289- new PeriodFormatterBuilder ()
290- .appendDays ()
291- .appendSuffix ("d" )
292- .minimumPrintedDigits (2 )
293- .appendHours ()
294- .appendSuffix ("h" )
295- .printZeroAlways ()
296- .appendMinutes ()
297- .appendSuffix ("m" )
298- .appendSeconds ()
299- .appendSuffix ("s" )
300- .toFormatter ();
301-
302268 public DataflowExecutionStateTracker (
303269 ExecutionStateSampler sampler ,
304270 DataflowOperationContext .DataflowExecutionState otherState ,
305271 CounterFactory counterFactory ,
306272 PipelineOptions options ,
307273 String workItemId ) {
308- this (sampler , otherState , counterFactory , options , workItemId , Clock .SYSTEM );
309- }
310-
311- @ VisibleForTesting
312- public DataflowExecutionStateTracker (
313- ExecutionStateSampler sampler ,
314- DataflowOperationContext .DataflowExecutionState otherState ,
315- CounterFactory counterFactory ,
316- PipelineOptions options ,
317- String workItemId ,
318- Clock clock ) {
319274 super (sampler );
320275 this .elementExecutionTracker =
321276 DataflowElementExecutionTracker .create (counterFactory , options );
322277 this .otherState = otherState ;
323278 this .workItemId = workItemId ;
324279 this .contextActivationObserverRegistry = ContextActivationObserverRegistry .createDefault ();
325- this .clock = clock ;
326- DataflowWorkerLoggingInitializer .initialize ();
327280 }
328281
329282 @ Override
@@ -348,76 +301,12 @@ public Closeable activate() {
348301 }
349302 }
350303
351- private boolean shouldLogFullThreadDumpForBundle (Duration lullDuration ) {
352- if (lullDuration .getMillis () < LOG_BUNDLE_LULL_FULL_THREAD_DUMP_LULL_MS ) {
353- return false ;
354- }
355- long now = clock .currentTimeMillis ();
356- if (lastFullThreadDumpMillis + LOG_BUNDLE_LULL_FULL_THREAD_DUMP_LULL_MS < now ) {
357- lastFullThreadDumpMillis = now ;
358- return true ;
359- }
360- return false ;
361- }
362-
363- private String getBundleLullMessage (Duration lullDuration ) {
364- StringBuilder message = new StringBuilder ();
365- message
366- .append ("Operation ongoing in bundle for at least " )
367- .append (DURATION_FORMATTER .print (lullDuration .toPeriod ()))
368- .append (" without completing" )
369- .append ("\n " );
370- synchronized (this ) {
371- if (this .activeMessageMetadata != null ) {
372- message .append (
373- "Current user step name: " + getActiveMessageMetadata ().get ().userStepName () + "\n " );
374- message .append (
375- "Time spent in this step(millis): "
376- + (clock .currentTimeMillis () - getActiveMessageMetadata ().get ().startTime ())
377- + "\n " );
378- }
379- message .append ("Processing times in each step(millis)\n " );
380- for (Map .Entry <String , IntSummaryStatistics > entry :
381- this .processingTimesByStep .entrySet ()) {
382- message .append ("Step name: " + entry .getKey () + "\n " );
383- message .append ("Time spent in this step: " + entry .getValue ().toString () + "\n " );
384- }
385- }
386-
387- return message .toString ();
388- }
389-
390304 @ Override
391305 protected void takeSampleOnce (long millisSinceLastSample ) {
392306 elementExecutionTracker .takeSample (millisSinceLastSample );
393307 super .takeSampleOnce (millisSinceLastSample );
394308 }
395309
396- @ Override
397- protected void reportBundleLull (long millisElapsedSinceBundleStart ) {
398- // If we're not logging warnings, nothing to report.
399- if (!LOG .isWarnEnabled ()) {
400- return ;
401- }
402-
403- Duration lullDuration = Duration .millis (millisElapsedSinceBundleStart );
404-
405- // Since the lull reporting executes in the sampler thread, it won't automatically inherit the
406- // context of the current step. To ensure things are logged correctly, we get the currently
407- // registered DataflowWorkerLoggingHandler and log directly in the desired context.
408- LogRecord logRecord = new LogRecord (Level .WARNING , getBundleLullMessage (lullDuration ));
409- logRecord .setLoggerName (DataflowExecutionStateTracker .LOG .getName ());
410-
411- // Publish directly in the context of this specific ExecutionState.
412- DataflowWorkerLoggingHandler dataflowLoggingHandler =
413- DataflowWorkerLoggingInitializer .getLoggingHandler ();
414- dataflowLoggingHandler .publish (logRecord );
415-
416- if (shouldLogFullThreadDumpForBundle (lullDuration )) {
417- StackTraceUtil .logAllStackTraces ();
418- }
419- }
420-
421310 /**
422311 * Enter a new state on the tracker. If the new state is a Dataflow processing state, tracks the
423312 * activeMessageMetadata with the start time of the new state.
@@ -434,7 +323,7 @@ public Closeable enterState(ExecutionState newState) {
434323 synchronized (this ) {
435324 this .activeMessageMetadata =
436325 ActiveMessageMetadata .create (
437- newDFState .getStepName ().userName (), clock .currentTimeMillis ());
326+ newDFState .getStepName ().userName (), clock .getMillis ());
438327 }
439328 }
440329 elementExecutionTracker .enter (newDFState .getStepName ());
0 commit comments