@@ -119,20 +119,21 @@ def _state_cache_stats(state_cache: StateCache) -> str:
119119 return '\n ' .join (cache_stats )
120120
121121
122- def _active_processing_bundles_state (bundle_process_cache ):
122+ def _active_processing_bundles_state (bundle_processor_cache ):
123123 """Gather information about the currently in-processing active bundles.
124124
125125 The result only keeps the longest lasting 10 bundles to avoid excessive
126126 spamming.
127127 """
128128 active_bundles = ['=' * 10 + ' ACTIVE PROCESSING BUNDLES ' + '=' * 10 ]
129- if not bundle_process_cache .active_bundle_processors :
129+ if (not bundle_processor_cache .active_bundle_processors and
130+ not bundle_processor_cache .processors_being_created ):
130131 active_bundles .append ("No active processing bundles." )
131132 else :
132133 cache = []
133134 for instruction in list (
134- bundle_process_cache .active_bundle_processors .keys ()):
135- processor = bundle_process_cache .lookup (instruction )
135+ bundle_processor_cache .active_bundle_processors .keys ()):
136+ processor = bundle_processor_cache .lookup (instruction )
136137 if processor :
137138 info = processor .state_sampler .get_info ()
138139 cache .append ((
@@ -149,6 +150,18 @@ def _active_processing_bundles_state(bundle_process_cache):
149150 state += "time since transition: %.2f seconds\n " % (s [3 ] / 1e9 )
150151 active_bundles .append (state )
151152
153+ if bundle_processor_cache .processors_being_created :
154+ active_bundles .append ("Processors being created:\n " )
155+ current_time = time .time ()
156+ for instruction , (bundle_id , thread , creation_time ) in (
157+ bundle_processor_cache .processors_being_created .items ()):
158+ state = '--- instruction %s ---\n ' % instruction
159+ state += 'ProcessBundleDescriptorId: %s\n ' % bundle_id
160+ state += "tracked thread: %s\n " % thread
161+ state += "time since creation started: %.2f seconds\n " % (
162+ current_time - creation_time )
163+ active_bundles .append (state )
164+
152165 active_bundles .append ('=' * 30 )
153166 return '\n ' .join (active_bundles )
154167
@@ -161,7 +174,7 @@ class FnApiWorkerStatusHandler(object):
161174 def __init__ (
162175 self ,
163176 status_address ,
164- bundle_process_cache = None ,
177+ bundle_processor_cache = None ,
165178 state_cache = None ,
166179 enable_heap_dump = False ,
167180 worker_id = None ,
@@ -171,11 +184,11 @@ def __init__(
171184
172185 Args:
173186 status_address: The URL Runner uses to host the WorkerStatus server.
174- bundle_process_cache : The BundleProcessor cache dict from sdk worker.
187+ bundle_processor_cache : The BundleProcessor cache dict from sdk worker.
175188 state_cache: The StateCache form sdk worker.
176189 """
177190 self ._alive = True
178- self ._bundle_process_cache = bundle_process_cache
191+ self ._bundle_processor_cache = bundle_processor_cache
179192 self ._state_cache = state_cache
180193 ch = GRPCChannelFactory .insecure_channel (status_address )
181194 grpc .channel_ready_future (ch ).result (timeout = 60 )
@@ -200,7 +213,7 @@ def __init__(
200213 self ._server .start ()
201214 self ._lull_logger = threading .Thread (
202215 target = lambda : self ._log_lull_in_bundle_processor (
203- self ._bundle_process_cache ),
216+ self ._bundle_processor_cache ),
204217 name = 'lull_operation_logger' )
205218 self ._lull_logger .daemon = True
206219 self ._lull_logger .start ()
@@ -234,9 +247,9 @@ def generate_status_response(self):
234247 if self ._state_cache :
235248 all_status_sections .append (_state_cache_stats (self ._state_cache ))
236249
237- if self ._bundle_process_cache :
250+ if self ._bundle_processor_cache :
238251 all_status_sections .append (
239- _active_processing_bundles_state (self ._bundle_process_cache ))
252+ _active_processing_bundles_state (self ._bundle_processor_cache ))
240253
241254 all_status_sections .append (thread_dump ())
242255 if self ._enable_heap_dump :
@@ -247,24 +260,64 @@ def generate_status_response(self):
247260 def close (self ):
248261 self ._responses .put (DONE , timeout = 5 )
249262
250- def _log_lull_in_bundle_processor (self , bundle_process_cache ):
263+ def _log_lull_in_bundle_processor (self , bundle_processor_cache ):
251264 while True :
252265 time .sleep (2 * 60 )
253- if bundle_process_cache and bundle_process_cache .active_bundle_processors :
254- for instruction in list (
255- bundle_process_cache .active_bundle_processors .keys ()):
256- processor = bundle_process_cache .lookup (instruction )
257- if processor :
258- info = processor .state_sampler .get_info ()
259- self ._log_lull_sampler_info (info , instruction )
266+ if not bundle_processor_cache :
267+ continue
268+
269+ for instruction in list (
270+ bundle_processor_cache .active_bundle_processors .keys ()):
271+ processor = bundle_processor_cache .lookup (instruction )
272+ if processor :
273+ info = processor .state_sampler .get_info ()
274+ self ._log_lull_sampler_info (info , instruction )
275+
276+ for instruction , (bundle_id , thread , creation_time ) in list (
277+ bundle_processor_cache .processors_being_created .items ()):
278+ self ._log_lull_in_creating_bundle_descriptor (
279+ instruction , bundle_id , thread , creation_time )
280+
281+ def _log_lull_in_creating_bundle_descriptor (
282+ self , instruction , bundle_id , thread , creation_time ):
283+ time_since_creation_ns = (time .time () - creation_time ) * 1e9
284+
285+ if (self ._element_processing_timeout_ns and
286+ time_since_creation_ns > self ._element_processing_timeout_ns ):
287+ stack_trace = self ._get_stack_trace (thread )
288+ _LOGGER .error ((
289+ 'Creation of bundle processor for instruction %s (bundle %s) '
290+ 'has exceeded the specified timeout of %.2f minutes. '
291+ 'This might indicate stuckness in DoFn.setup() or in DoFn creation. '
292+ 'SDK harness will be terminated.\n '
293+ 'Current Traceback:\n %s' ),
294+ instruction ,
295+ bundle_id ,
296+ self ._element_processing_timeout_ns / 1e9 / 60 ,
297+ stack_trace )
298+ from apache_beam .runners .worker .sdk_worker_main import terminate_sdk_harness
299+ terminate_sdk_harness ()
300+
301+ if (time_since_creation_ns > self .log_lull_timeout_ns and
302+ self ._passed_lull_timeout_since_last_log ()):
303+ stack_trace = self ._get_stack_trace (thread )
304+ _LOGGER .warning ((
305+ 'Bundle processor for instruction %s (bundle %s) '
306+ 'has been creating for at least %.2f seconds.\n '
307+ 'This might indicate slowness in DoFn.setup() or in DoFn creation. '
308+ 'Current Traceback:\n %s' ),
309+ instruction ,
310+ bundle_id ,
311+ time_since_creation_ns / 1e9 ,
312+ stack_trace )
260313
261314 def _log_lull_sampler_info (self , sampler_info , instruction ):
262315 if (not sampler_info or not sampler_info .time_since_transition ):
263316 return
264317
265318 log_lull = (
266- self ._passed_lull_timeout_since_last_log () and
267- sampler_info . time_since_transition > self .log_lull_timeout_ns )
319+ sampler_info . time_since_transition > self .log_lull_timeout_ns and
320+ self ._passed_lull_timeout_since_last_log () )
268321 timeout_exceeded = (
269322 self ._element_processing_timeout_ns and
270323 sampler_info .time_since_transition
@@ -281,7 +334,7 @@ def _log_lull_sampler_info(self, sampler_info, instruction):
281334 ' for PTransform{name=%s, state=%s}' % (step_name , state_name ))
282335 else :
283336 step_name_log = ''
284- stack_trace = self ._get_stack_trace (sampler_info )
337+ stack_trace = self ._get_stack_trace (sampler_info . tracked_thread )
285338
286339 if timeout_exceeded :
287340 _LOGGER .error (
@@ -310,10 +363,9 @@ def _log_lull_sampler_info(self, sampler_info, instruction):
310363 stack_trace ,
311364 )
312365
313- def _get_stack_trace (self , sampler_info ):
314- exec_thread = getattr (sampler_info , 'tracked_thread' , None )
315- if exec_thread is not None :
316- thread_frame = _current_frames ().get (exec_thread .ident )
366+ def _get_stack_trace (self , thread ):
367+ if thread :
368+ thread_frame = _current_frames ().get (thread .ident )
317369 return '\n ' .join (
318370 traceback .format_stack (thread_frame )) if thread_frame else ''
319371 else :
0 commit comments