Skip to content

Refactor launch service run_async loop to wait on futures and queued events#449

Merged
jacobperron merged 11 commits intomasterfrom
jacob/thread_safe_event_queue
Aug 13, 2020
Merged

Refactor launch service run_async loop to wait on futures and queued events#449
jacobperron merged 11 commits intomasterfrom
jacob/thread_safe_event_queue

Conversation

@jacobperron
Copy link
Copy Markdown
Member

@jacobperron jacobperron commented Aug 11, 2020

Fixes ros2/launch_ros#169

Otherwise, it's possible to run into a race condition between checking for 'idle' and processing events from the queue.

Edit:

As described in #449 (comment), we can not be idle because we're waiting for futures to complete, but if there are no more events in the queue then we end up blocking forever.

To resolve this corner-case that results in deadlock, this PR refactors the run_async loop such that we concurrently wait on futures and events in the queue.

cc/ @wjwwood

Fixes ros2/launch_ros#169

Otherwise, it's possible to run into a race condition between checking for 'idle' and processing events from the queue.

I.e. it's possible that an event is completed between executing the following two lines:

https://github.com/ros2/launch/blob/657745c315ba6a61984b66f168f9c34b3a7e2108/launch/launch/launch_service.py#L158

https://github.com/ros2/launch/blob/657745c315ba6a61984b66f168f9c34b3a7e2108/launch/launch/launch_service.py#L272

To fix the race, this changes makes all accesses to the event queue thread safe and introduces a helper function
get_next_event() that first checks if the queue is empty before potentially blocking forever with asynchio.Queue.get().

Signed-off-by: Jacob Perron <jacob@openrobotics.org>
@jacobperron jacobperron self-assigned this Aug 11, 2020
@jacobperron jacobperron added the bug Something isn't working label Aug 11, 2020
@jacobperron
Copy link
Copy Markdown
Member Author

See ros2/launch_ros#169 for steps to reproduce the issue.

ivanpauno
ivanpauno previously approved these changes Aug 11, 2020
Copy link
Copy Markdown
Member

@ivanpauno ivanpauno left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is fine ...

I'm not convinced if launch code should be thread safe or not (it currently isn't).
Maybe it should not and we should only rely on running things asynchronously in an event loop, IDK.

@ivanpauno ivanpauno requested review from hidmic and wjwwood August 11, 2020 13:11
@ivanpauno
Copy link
Copy Markdown
Member

ivanpauno commented Aug 11, 2020

Actually, I'm not sure if this is the right way to handle the problem.

You can currently emit an event thread safely if you want, e.g.:

future = asyncio.run_coroutine_threadsafe(
self.__context.emit_event(event),
self.__loop_from_run_thread
)

So, I'm not sure if the event Queue should be thread safe.

@ivanpauno ivanpauno dismissed their stale review August 11, 2020 13:24

not convinced ...

@wjwwood
Copy link
Copy Markdown
Member

wjwwood commented Aug 11, 2020

Reading the code I'm not sure how the two lines you pointed out @jacobperron could ever be run concurrently, but maybe I'm not using my imagination properly :)

@jacobperron
Copy link
Copy Markdown
Member Author

jacobperron commented Aug 11, 2020

Reading the code I'm not sure how the two lines you pointed out @jacobperron could ever be run concurrently, but maybe I'm not using my imagination properly :)

Looking again, I might be wrong about the reason for the hang.

It looks like is_idle evaluates to False on this line:

if not self.__shutting_down and shutdown_when_idle and is_idle:

Which means either the queue is not empty or there are still futures to wait on. But when we get to processing events (from the queue) the queue is empty and we block forever on this line:

await process_one_event_task

Signed-off-by: Jacob Perron <jacob@openrobotics.org>
@jacobperron
Copy link
Copy Markdown
Member Author

It looks like the future created here is never marked as done, even though logging suggests the function is run to completion:

https://github.com/ros2/launch_ros/blob/8c05ebb4be52dcd360e47694875643e4f53a010e/launch_ros/launch_ros/actions/load_composable_nodes.py#L173

@ivanpauno
Copy link
Copy Markdown
Member

It looks like the future created here is never marked as done, even though logging suggests the function is run to completion:

https://github.com/ros2/launch_ros/blob/8c05ebb4be52dcd360e47694875643e4f53a010e/launch_ros/launch_ros/actions/load_composable_nodes.py#L173

mmmm, so run_in_executor is using the default executor, which seems to actually be multi-threaded (reference needed here).
So, maybe that's the problem (?) ...

@ivanpauno
Copy link
Copy Markdown
Member

reference needed here

Reference found: https://github.com/python/cpython/blob/374d998b507d34a6c0a3816a163926a8ba0c483f/Lib/asyncio/base_events.py#L808-L812.
IMO, that's the problem

@jacobperron
Copy link
Copy Markdown
Member Author

Yeah, the docs say the default executor must be a ThreadPoolExecutor.

@ivanpauno
Copy link
Copy Markdown
Member

What I think it's happening is the following:

  • There's no more events to be processed
  • We are waiting for some completion feature to be done
  • the launch service awaits for a new event instead of waiting for the remaining completion futures to be done:
    process_one_event_task = this_loop.create_task(self._process_one_event())

    async def _process_one_event(self) -> None:
    next_event = await self.__context._event_queue.get()
    await self.__process_event(next_event)

    Thus, it waits for ever.

Easy hack: Using get with a timeout.
Not ideal, but I will need to think for real to figure out a better solution 😂.

@ivanpauno
Copy link
Copy Markdown
Member

Yeah, the docs say the default executor must be a ThreadPoolExecutor.

I like looking at the code 😂

@jacobperron
Copy link
Copy Markdown
Member Author

Easy hack: Using get with a timeout.

Unfortunately, asyncio.Queue.get() does not have a timeout argument..

@wjwwood
Copy link
Copy Markdown
Member

wjwwood commented Aug 11, 2020

That was my initial theory too but the shutdown event didn’t clear the hang.

A better solution would be to have a noop event like “check for idle” or “future completed” that would cause get to return and would be posted after the future is completed.

@ivanpauno
Copy link
Copy Markdown
Member

I think I have figured out a fix (it works well in my mind at least):

The issue:

_is_iddle is returning true if there is either a future available or an event has not be processed. But we are then blocking on getting a new event (see 1 and 2).

We could rather do the following:

  • Collect all futures like here.
  • Wait on the first completed future like here.

If we always do that, not only after shutdown has happened, we should never run into the race (as we would check again _is_idle after any future has completed).

@wjwwood
Copy link
Copy Markdown
Member

wjwwood commented Aug 11, 2020

@ivanpauno how do you wait on events in the queue and the incomplete futures at the same time?

@ivanpauno
Copy link
Copy Markdown
Member

ivanpauno commented Aug 11, 2020

@ivanpauno how do you wait on events in the queue and the incomplete futures at the same time?

Using asyncio.wait with asyncio.FIRST_COMPLETED condition:

done, pending = await asyncio.wait(
entity_futures,
timeout=1.0,
return_when=asyncio.FIRST_COMPLETED
)

IIUC, that allows you to wait on a collection of awaitables at the same time, and it will return when one of them is completed.
In that way, we can recheck if the launch service is idle or not after any future is completed.

We should take care of not processing another event if wait returned for another reason, and not because one event was processed. But that can be checked.

@wjwwood
Copy link
Copy Markdown
Member

wjwwood commented Aug 11, 2020

Ok, so the set of things being waited on in each loop would be await process_one_event() + the context's futures + the action futures? Some of the logic might need to change, but it sounds like it should work.

@jacobperron
Copy link
Copy Markdown
Member Author

Sounds good to me. I'll give it a try.

@jacobperron
Copy link
Copy Markdown
Member Author

Almost seems to work; now I get stuck in the inner shutdown loop. Maybe I've hit a different bug related to shutdown, but I don't understand why there is a continue here:

What I see is this condition evaluates to True:

if not self.__shutting_down and shutdown_when_idle and is_idle:

Then we call _shutdown and continue. And the next iteration, _is_idle() evaluates to False (I guess because the shutdown causes another event to be added to the queue):

is_idle = self._is_idle() # self._entity_future_pairs is pruned here

Then we get stuck waiting to get something from the queue and repeatedly see this debug:

'still waiting on futures: {}'.format(entity_futures)

I guess this logic is broken (I don't know under what circumstance this is True):

if is_idle:
process_one_event_task.cancel()
break

If I remove the continue, my test works as expected, but maybe there are other unintended consequences.

@wjwwood
Copy link
Copy Markdown
Member

wjwwood commented Aug 11, 2020

Then we call _shutdown and continue. And the next iteration, _is_idle() evaluates to False (I guess because the shutdown causes another event to be added to the queue):

That continue ensure that self.__shutting_down is true:

if self.__shutting_down:

At least I think that's the case. Maybe that's no longer the case.

@ivanpauno
Copy link
Copy Markdown
Member

ivanpauno commented Aug 12, 2020

@jacobperron can you push that code somewhere?

Signed-off-by: Jacob Perron <jacob@openrobotics.org>
@jacobperron
Copy link
Copy Markdown
Member Author

I've reverted the original changes and instead refactored the run_async loop to wait on both queued events and futures (92bfb44). I also added some logic to wait until the process_one_event task is done or a timeout occurs (19e0d68).

@jacobperron
Copy link
Copy Markdown
Member Author

This solves my original issue, but now the test_launch_service.py tests are hangning 🙃

@jacobperron
Copy link
Copy Markdown
Member Author

The test is emitting an event from a thread and it gets get stuck here:

ls.emit_event(MockEvent())

Launch service call stack ends up here:

future = asyncio.run_coroutine_threadsafe(
self.__context.emit_event(event),
self.__loop_from_run_thread

and waiting forever for the result

# Block until asynchronously emitted event is emitted by loop
future.result()

@jacobperron
Copy link
Copy Markdown
Member Author

jacobperron commented Aug 12, 2020

I guess the issue is that we need to await always in the loop, in the case that there are no futures or events to wait on I'm running a busy loop.

Always create a task to wait on and cancel it if there is a timeout.

Signed-off-by: Jacob Perron <jacob@openrobotics.org>
@jacobperron
Copy link
Copy Markdown
Member Author

@ivanpauno @wjwwood Okay, I think I've got it now (1118883). PTAL at the latest changes.

@jacobperron jacobperron changed the title Make context event queue thread safe Refactor launch service run_async loop to wait on futures and queued events Aug 12, 2020
Signed-off-by: Jacob Perron <jacob@openrobotics.org>
Signed-off-by: Jacob Perron <jacob@openrobotics.org>
Signed-off-by: Jacob Perron <jacob@openrobotics.org>
@jacobperron
Copy link
Copy Markdown
Member Author

I did a minor refactor 41a31d2 and added an guard against a pending task during shutdown f33e68e. Let me know if there's anything else.

We don't need to have an inner loop or timeout when waiting on futures.

Signed-off-by: Jacob Perron <jacob@openrobotics.org>
Copy link
Copy Markdown
Member

@ivanpauno ivanpauno left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! (with full CI testing packages above launch)

I would like somebody else reviewing this before merging

@hidmic
Copy link
Copy Markdown

hidmic commented Aug 13, 2020

Great finding! LGTM too with green CI.

This prevents spurious wake-ups when we're waiting for an event to finish processing.

Signed-off-by: Jacob Perron <jacob@openrobotics.org>
@jacobperron
Copy link
Copy Markdown
Member Author

jacobperron commented Aug 13, 2020

Testing packages recursively depending on launch (except ros1_bridge):

  • Linux Build Status
  • Linux-aarch64 Build Status
  • macOS Build Status
  • Windows Build Status

Copy link
Copy Markdown
Member

@wjwwood wjwwood left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new logic makes sense (once I read it without the diff), lgtm

@jacobperron
Copy link
Copy Markdown
Member Author

The Rpr job is failing due to an infrastructure issue. Since the failure is unrelated to this change and CI is green, I'll go ahead and merge.

Thanks for the reviews everyone!

@jacobperron jacobperron merged commit 4f32af5 into master Aug 13, 2020
@jacobperron jacobperron deleted the jacob/thread_safe_event_queue branch August 13, 2020 23:16
jacobperron added a commit that referenced this pull request Aug 20, 2020
…events (#449)

Fixes ros2/launch_ros#169

Otherwise, it's possible to get into a hung state where we wait for an event,
even though there are no more events. This is because the check for an "idle"
state evaluates to "True" as we wait for some futures to complete.

By waiting for futures and events concurrently, we can avoid this problem.
Further, we don't have to wait for an event if there's nothing in the queue.

Signed-off-by: Jacob Perron <jacob@openrobotics.org>

* Wait until one event is processed or nothing done (timeout)

Signed-off-by: Jacob Perron <jacob@openrobotics.org>

* Fix bugs

Always create a task to wait on and cancel it if there is a timeout.

Signed-off-by: Jacob Perron <jacob@openrobotics.org>

* Avoid canceling an event mid-processing

Signed-off-by: Jacob Perron <jacob@openrobotics.org>

* minor refactor

Signed-off-by: Jacob Perron <jacob@openrobotics.org>

* Guard against leaving a task pending

Signed-off-by: Jacob Perron <jacob@openrobotics.org>

* Further refactoring

We don't need to have an inner loop or timeout when waiting on futures.

Signed-off-by: Jacob Perron <jacob@openrobotics.org>

* Only wait on futures if there are no events in the queue

This prevents spurious wake-ups when we're waiting for an event to finish processing.

Signed-off-by: Jacob Perron <jacob@openrobotics.org>
@jacobperron
Copy link
Copy Markdown
Member Author

I've decided not to backport this to Eloquent or Dashing since there's been a lot of changes since they were last touched.

jacobperron added a commit that referenced this pull request Aug 27, 2020
…events (#449) (#455)

Fixes ros2/launch_ros#169

Otherwise, it's possible to get into a hung state where we wait for an event,
even though there are no more events. This is because the check for an "idle"
state evaluates to "True" as we wait for some futures to complete.

By waiting for futures and events concurrently, we can avoid this problem.
Further, we don't have to wait for an event if there's nothing in the queue.

Signed-off-by: Jacob Perron <jacob@openrobotics.org>

* Wait until one event is processed or nothing done (timeout)

Signed-off-by: Jacob Perron <jacob@openrobotics.org>

* Fix bugs

Always create a task to wait on and cancel it if there is a timeout.

Signed-off-by: Jacob Perron <jacob@openrobotics.org>

* Avoid canceling an event mid-processing

Signed-off-by: Jacob Perron <jacob@openrobotics.org>

* minor refactor

Signed-off-by: Jacob Perron <jacob@openrobotics.org>

* Guard against leaving a task pending

Signed-off-by: Jacob Perron <jacob@openrobotics.org>

* Further refactoring

We don't need to have an inner loop or timeout when waiting on futures.

Signed-off-by: Jacob Perron <jacob@openrobotics.org>

* Only wait on futures if there are no events in the queue

This prevents spurious wake-ups when we're waiting for an event to finish processing.

Signed-off-by: Jacob Perron <jacob@openrobotics.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working

Projects

None yet

Development

Successfully merging this pull request may close these issues.

LoadComposableNodes action hangs

4 participants