Skip to content

Commit b48d832

Browse files
authored
[release-2.69] Cherrypick #36518 to the branch. (#36551)
* Track bundle processors that are pending creation and terminate SDK if creating a BP exceeds a timeout. * Rename the term * Remove unnecessary conditions. * add tests * Address comments * Also add a test for logging a lull in process.
1 parent 8739e50 commit b48d832

3 files changed

Lines changed: 175 additions & 29 deletions

File tree

sdks/python/apache_beam/runners/worker/sdk_worker.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,8 @@ def __init__(
454454
) # type: collections.OrderedDict[str, Exception]
455455
self.active_bundle_processors = {
456456
} # type: Dict[str, Tuple[str, bundle_processor.BundleProcessor]]
457+
self.processors_being_created = {
458+
} # type: Dict[str, Tuple[str, threading.Thread, float]]
457459
self.cached_bundle_processors = collections.defaultdict(
458460
list) # type: DefaultDict[str, List[bundle_processor.BundleProcessor]]
459461
self.last_access_times = collections.defaultdict(
@@ -501,7 +503,8 @@ def get(self, instruction_id, bundle_descriptor_id):
501503
pass
502504
return processor
503505
except IndexError:
504-
pass
506+
self.processors_being_created[instruction_id] = (
507+
bundle_descriptor_id, threading.current_thread(), time.time())
505508

506509
# Make sure we instantiate the processor while not holding the lock.
507510

@@ -521,6 +524,7 @@ def get(self, instruction_id, bundle_descriptor_id):
521524
with self._lock:
522525
self.active_bundle_processors[
523526
instruction_id] = bundle_descriptor_id, processor
527+
del self.processors_being_created[instruction_id]
524528
try:
525529
del self.known_not_running_instruction_ids[instruction_id]
526530
except KeyError:

sdks/python/apache_beam/runners/worker/worker_status.py

Lines changed: 77 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -119,20 +119,21 @@ def _state_cache_stats(state_cache: StateCache) -> str:
119119
return '\n'.join(cache_stats)
120120

121121

122-
def _active_processing_bundles_state(bundle_process_cache):
122+
def _active_processing_bundles_state(bundle_processor_cache):
123123
"""Gather information about the currently in-processing active bundles.
124124
125125
The result only keeps the longest lasting 10 bundles to avoid excessive
126126
spamming.
127127
"""
128128
active_bundles = ['=' * 10 + ' ACTIVE PROCESSING BUNDLES ' + '=' * 10]
129-
if not bundle_process_cache.active_bundle_processors:
129+
if (not bundle_processor_cache.active_bundle_processors and
130+
not bundle_processor_cache.processors_being_created):
130131
active_bundles.append("No active processing bundles.")
131132
else:
132133
cache = []
133134
for instruction in list(
134-
bundle_process_cache.active_bundle_processors.keys()):
135-
processor = bundle_process_cache.lookup(instruction)
135+
bundle_processor_cache.active_bundle_processors.keys()):
136+
processor = bundle_processor_cache.lookup(instruction)
136137
if processor:
137138
info = processor.state_sampler.get_info()
138139
cache.append((
@@ -149,6 +150,18 @@ def _active_processing_bundles_state(bundle_process_cache):
149150
state += "time since transition: %.2f seconds\n" % (s[3] / 1e9)
150151
active_bundles.append(state)
151152

153+
if bundle_processor_cache.processors_being_created:
154+
active_bundles.append("Processors being created:\n")
155+
current_time = time.time()
156+
for instruction, (bundle_id, thread, creation_time) in (
157+
bundle_processor_cache.processors_being_created.items()):
158+
state = '--- instruction %s ---\n' % instruction
159+
state += 'ProcessBundleDescriptorId: %s\n' % bundle_id
160+
state += "tracked thread: %s\n" % thread
161+
state += "time since creation started: %.2f seconds\n" % (
162+
current_time - creation_time)
163+
active_bundles.append(state)
164+
152165
active_bundles.append('=' * 30)
153166
return '\n'.join(active_bundles)
154167

@@ -161,7 +174,7 @@ class FnApiWorkerStatusHandler(object):
161174
def __init__(
162175
self,
163176
status_address,
164-
bundle_process_cache=None,
177+
bundle_processor_cache=None,
165178
state_cache=None,
166179
enable_heap_dump=False,
167180
worker_id=None,
@@ -171,11 +184,11 @@ def __init__(
171184
172185
Args:
173186
status_address: The URL Runner uses to host the WorkerStatus server.
174-
bundle_process_cache: The BundleProcessor cache dict from sdk worker.
187+
bundle_processor_cache: The BundleProcessor cache dict from sdk worker.
175188
state_cache: The StateCache form sdk worker.
176189
"""
177190
self._alive = True
178-
self._bundle_process_cache = bundle_process_cache
191+
self._bundle_processor_cache = bundle_processor_cache
179192
self._state_cache = state_cache
180193
ch = GRPCChannelFactory.insecure_channel(status_address)
181194
grpc.channel_ready_future(ch).result(timeout=60)
@@ -200,7 +213,7 @@ def __init__(
200213
self._server.start()
201214
self._lull_logger = threading.Thread(
202215
target=lambda: self._log_lull_in_bundle_processor(
203-
self._bundle_process_cache),
216+
self._bundle_processor_cache),
204217
name='lull_operation_logger')
205218
self._lull_logger.daemon = True
206219
self._lull_logger.start()
@@ -234,9 +247,9 @@ def generate_status_response(self):
234247
if self._state_cache:
235248
all_status_sections.append(_state_cache_stats(self._state_cache))
236249

237-
if self._bundle_process_cache:
250+
if self._bundle_processor_cache:
238251
all_status_sections.append(
239-
_active_processing_bundles_state(self._bundle_process_cache))
252+
_active_processing_bundles_state(self._bundle_processor_cache))
240253

241254
all_status_sections.append(thread_dump())
242255
if self._enable_heap_dump:
@@ -247,24 +260,64 @@ def generate_status_response(self):
247260
def close(self):
248261
self._responses.put(DONE, timeout=5)
249262

250-
def _log_lull_in_bundle_processor(self, bundle_process_cache):
263+
def _log_lull_in_bundle_processor(self, bundle_processor_cache):
251264
while True:
252265
time.sleep(2 * 60)
253-
if bundle_process_cache and bundle_process_cache.active_bundle_processors:
254-
for instruction in list(
255-
bundle_process_cache.active_bundle_processors.keys()):
256-
processor = bundle_process_cache.lookup(instruction)
257-
if processor:
258-
info = processor.state_sampler.get_info()
259-
self._log_lull_sampler_info(info, instruction)
266+
if not bundle_processor_cache:
267+
continue
268+
269+
for instruction in list(
270+
bundle_processor_cache.active_bundle_processors.keys()):
271+
processor = bundle_processor_cache.lookup(instruction)
272+
if processor:
273+
info = processor.state_sampler.get_info()
274+
self._log_lull_sampler_info(info, instruction)
275+
276+
for instruction, (bundle_id, thread, creation_time) in list(
277+
bundle_processor_cache.processors_being_created.items()):
278+
self._log_lull_in_creating_bundle_descriptor(
279+
instruction, bundle_id, thread, creation_time)
280+
281+
def _log_lull_in_creating_bundle_descriptor(
282+
self, instruction, bundle_id, thread, creation_time):
283+
time_since_creation_ns = (time.time() - creation_time) * 1e9
284+
285+
if (self._element_processing_timeout_ns and
286+
time_since_creation_ns > self._element_processing_timeout_ns):
287+
stack_trace = self._get_stack_trace(thread)
288+
_LOGGER.error((
289+
'Creation of bundle processor for instruction %s (bundle %s) '
290+
'has exceeded the specified timeout of %.2f minutes. '
291+
'This might indicate stuckness in DoFn.setup() or in DoFn creation. '
292+
'SDK harness will be terminated.\n'
293+
'Current Traceback:\n%s'),
294+
instruction,
295+
bundle_id,
296+
self._element_processing_timeout_ns / 1e9 / 60,
297+
stack_trace)
298+
from apache_beam.runners.worker.sdk_worker_main import terminate_sdk_harness
299+
terminate_sdk_harness()
300+
301+
if (time_since_creation_ns > self.log_lull_timeout_ns and
302+
self._passed_lull_timeout_since_last_log()):
303+
stack_trace = self._get_stack_trace(thread)
304+
_LOGGER.warning((
305+
'Bundle processor for instruction %s (bundle %s) '
306+
'has been creating for at least %.2f seconds.\n'
307+
'This might indicate slowness in DoFn.setup() or in DoFn creation. '
308+
'Current Traceback:\n%s'),
309+
instruction,
310+
bundle_id,
311+
time_since_creation_ns / 1e9,
312+
stack_trace)
260313

261314
def _log_lull_sampler_info(self, sampler_info, instruction):
262315
if (not sampler_info or not sampler_info.time_since_transition):
263316
return
264317

265318
log_lull = (
266-
self._passed_lull_timeout_since_last_log() and
267-
sampler_info.time_since_transition > self.log_lull_timeout_ns)
319+
sampler_info.time_since_transition > self.log_lull_timeout_ns and
320+
self._passed_lull_timeout_since_last_log())
268321
timeout_exceeded = (
269322
self._element_processing_timeout_ns and
270323
sampler_info.time_since_transition
@@ -281,7 +334,7 @@ def _log_lull_sampler_info(self, sampler_info, instruction):
281334
' for PTransform{name=%s, state=%s}' % (step_name, state_name))
282335
else:
283336
step_name_log = ''
284-
stack_trace = self._get_stack_trace(sampler_info)
337+
stack_trace = self._get_stack_trace(sampler_info.tracked_thread)
285338

286339
if timeout_exceeded:
287340
_LOGGER.error(
@@ -310,10 +363,9 @@ def _log_lull_sampler_info(self, sampler_info, instruction):
310363
stack_trace,
311364
)
312365

313-
def _get_stack_trace(self, sampler_info):
314-
exec_thread = getattr(sampler_info, 'tracked_thread', None)
315-
if exec_thread is not None:
316-
thread_frame = _current_frames().get(exec_thread.ident)
366+
def _get_stack_trace(self, thread):
367+
if thread:
368+
thread_frame = _current_frames().get(thread.ident)
317369
return '\n'.join(
318370
traceback.format_stack(thread_frame)) if thread_frame else ''
319371
else:

sdks/python/apache_beam/runners/worker/worker_status_test.py

Lines changed: 93 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def WorkerStatus(self, response_iterator, context):
4545
self.finished.acquire()
4646
self.response_received.append(response)
4747
if len(self.response_received) == self.num_request:
48-
self.finished.notifyAll()
48+
self.finished.notify_all()
4949
self.finished.release()
5050

5151

@@ -63,6 +63,7 @@ def setUp(self):
6363
self.url, element_processing_timeout_minutes=10)
6464

6565
def tearDown(self):
66+
self.fn_status_handler.close()
6667
self.server.stop(5)
6768

6869
def test_send_status_response(self):
@@ -72,7 +73,6 @@ def test_send_status_response(self):
7273
self.test_status_service.finished.release()
7374
for response in self.test_status_service.response_received:
7475
self.assertIsNotNone(response.status_info)
75-
self.fn_status_handler.close()
7676

7777
@mock.patch(
7878
'apache_beam.runners.worker.worker_status'
@@ -85,7 +85,6 @@ def test_generate_error(self, mock_method):
8585
self.test_status_service.finished.release()
8686
for response in self.test_status_service.response_received:
8787
self.assertIsNotNone(response.error)
88-
self.fn_status_handler.close()
8988

9089
def test_log_lull_in_bundle_processor(self):
9190
def get_state_sampler_info_for_lull(lull_duration_s):
@@ -133,6 +132,97 @@ def get_state_sampler_info_for_lull(lull_duration_s):
133132
self.fn_status_handler._log_lull_sampler_info(sampler_info, bundle_id)
134133
self.assertEqual(flush_mock.call_count, 3)
135134

135+
def test_lull_logs_emitted_when_creating_bundle_processor_takes_time(self):
136+
instruction_id = "instruction-1"
137+
bundle_id = "bundle-1"
138+
thread = threading.current_thread()
139+
now = time.time()
140+
creation_time = now
141+
142+
with (
143+
mock.patch('logging.Logger.warning') as warn_mock,
144+
mock.patch('logging.Logger.error') as error_mock,
145+
mock.patch('time.time') as time_mock,
146+
mock.patch(
147+
'apache_beam.runners.worker.sdk_worker_main.terminate_sdk_harness',
148+
) as terminate_mock):
149+
# Set time to be past the lull timeout
150+
time_mock.return_value = (
151+
now + self.fn_status_handler.log_lull_timeout_ns / 1e9 + 1)
152+
self.fn_status_handler._log_lull_in_creating_bundle_descriptor(
153+
instruction_id, bundle_id, thread, creation_time)
154+
warn_mock.assert_called_once()
155+
args, _ = warn_mock.call_args
156+
self.assertIn(
157+
'Bundle processor for instruction %s (bundle %s) has been '
158+
'creating for at least %.2f seconds',
159+
args[0])
160+
161+
# Set time to be past the element processing timeout
162+
time_mock.return_value = (
163+
now + self.fn_status_handler._element_processing_timeout_ns / 1e9 + 1)
164+
165+
self.fn_status_handler._log_lull_in_creating_bundle_descriptor(
166+
instruction_id, bundle_id, thread, creation_time)
167+
168+
error_mock.assert_called_once()
169+
args, _ = error_mock.call_args
170+
self.assertIn(
171+
'Creation of bundle processor for instruction %s (bundle %s) '
172+
'has exceeded the specified timeout',
173+
args[0])
174+
175+
terminate_mock.assert_called_once()
176+
177+
def test_lull_logs_emitted_when_processing_a_bundle_takes_time(self):
178+
instruction_id = "instruction-1"
179+
now = time.time()
180+
thread = threading.current_thread()
181+
182+
with (
183+
mock.patch('logging.Logger.warning') as warn_mock,
184+
mock.patch('logging.Logger.error') as error_mock,
185+
mock.patch('time.time') as time_mock,
186+
mock.patch(
187+
'apache_beam.runners.worker.sdk_worker_main.terminate_sdk_harness',
188+
) as terminate_mock):
189+
time_mock.return_value = now + 1
190+
# Set time to be past the lull timeout
191+
sampler_info = statesampler.StateSamplerInfo(
192+
state_name=CounterName('test_counter', 'test_stage', 'test_step'),
193+
transition_count=1,
194+
# Set time to be past the lull timeout
195+
time_since_transition=(
196+
self.fn_status_handler.log_lull_timeout_ns + 1),
197+
tracked_thread=thread)
198+
self.fn_status_handler._log_lull_sampler_info(
199+
sampler_info, instruction_id)
200+
warn_mock.assert_called_once()
201+
args, _ = warn_mock.call_args
202+
self.assertIn(
203+
'Operation ongoing in bundle %s%s for at least %.2f seconds', args[0])
204+
205+
time_mock.return_value = now + 2
206+
207+
sampler_info = statesampler.StateSamplerInfo(
208+
state_name=CounterName('test_counter', 'test_stage', 'test_step'),
209+
transition_count=1,
210+
# Set time to be past the element processing timeout
211+
time_since_transition=(
212+
self.fn_status_handler._element_processing_timeout_ns + 1),
213+
tracked_thread=thread)
214+
self.fn_status_handler._log_lull_sampler_info(
215+
sampler_info, instruction_id)
216+
217+
error_mock.assert_called_once()
218+
args, _ = error_mock.call_args
219+
self.assertIn(
220+
'Processing of an element in bundle %s%s has exceeded the '
221+
'specified timeout of %.2f minutes',
222+
args[0])
223+
224+
terminate_mock.assert_called_once()
225+
136226

137227
class HeapDumpTest(unittest.TestCase):
138228
@mock.patch('apache_beam.runners.worker.worker_status.hpy', None)

0 commit comments

Comments
 (0)