Skip to content

Commit 4c239d3

Browse files
jmachowinskiJanosch Machowinski
andauthored
fix: Fixed expiring of goals if events executor is used (ros2#2674)
* feat: Add ClockWaiter and ClockConditionalVariable Added two new synchronization primitives to perform waits using an rclcpp::Clock. Signed-off-by: Janosch Machowinski <J.Machowinski@cellumation.com> * fix: Fixed expiring of goals if events executor is used This commit is only relevant if the events executor is used. This commit starts a background thread, to create events for the expire timer of the action. Without this the action server will not expire old goal results leading to memory and performance issues. Signed-off-by: Janosch Machowinski <J.Machowinski@cellumation.com> --------- Signed-off-by: Janosch Machowinski <J.Machowinski@cellumation.com> Co-authored-by: Janosch Machowinski <J.Machowinski@cellumation.com>
1 parent d24a317 commit 4c239d3

3 files changed

Lines changed: 493 additions & 1 deletion

File tree

rclcpp/include/rclcpp/clock.hpp

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,117 @@ class Clock
260260
std::shared_ptr<Impl> impl_;
261261
};
262262

263+
/**
264+
* A synchronization primitive, equal to std::conditional_variable,
265+
* that works with the rclcpp::Clock.
266+
*
267+
* For more information on the API see https://en.cppreference.com/w/cpp/thread/condition_variable.
268+
*
269+
* Note, this class does not handle shutdowns, if you want to
270+
* haven them handles as well, use ClockConditionalVariable.
271+
*/
272+
class ClockWaiter
273+
{
274+
private:
275+
class ClockWaiterImpl;
276+
std::unique_ptr<ClockWaiterImpl> impl_;
277+
278+
public:
279+
RCLCPP_SMART_PTR_DEFINITIONS(ClockWaiter)
280+
281+
RCLCPP_PUBLIC
282+
explicit ClockWaiter(const rclcpp::Clock::SharedPtr & clock);
283+
284+
RCLCPP_PUBLIC
285+
~ClockWaiter();
286+
287+
/**
288+
* Calling this function will block the current thread, until abs_time is reached,
289+
* or pred returns true.
290+
* @param lock A locked lock. The lock must be locked at call time, or this method will throw.
291+
* The lock will be atomically released and this thread will blocked.
292+
* @param abs_time The time until which this thread shall be blocked.
293+
* @param pred may be called in cased of spurious wakeups, but must be called every time
294+
* notify_one() was called. During the call to pred, the given lock will be locked.
295+
* This method will return, if pred returns true.
296+
*/
297+
RCLCPP_PUBLIC
298+
bool
299+
wait_until(
300+
std::unique_lock<std::mutex> & lock,
301+
const rclcpp::Time & abs_time, const std::function<bool ()> & pred);
302+
303+
/**
304+
* Notify the blocked thread, that it should reevaluate the wakeup condition.
305+
* The given pred function in wait_until will be reevaluated and wait_until
306+
* will return if it evaluates to true.
307+
*/
308+
RCLCPP_PUBLIC
309+
void
310+
notify_one();
311+
};
312+
313+
314+
/**
315+
* A synchronization primitive, similar to std::conditional_variable,
316+
* that works with the rclcpp::Clock.
317+
*
318+
* For more information on the API see https://en.cppreference.com/w/cpp/thread/condition_variable.
319+
*
320+
* This primitive will wake up if the context was shut down.
321+
*/
322+
class ClockConditionalVariable
323+
{
324+
class Impl;
325+
std::unique_ptr<Impl> impl_;
326+
327+
public:
328+
RCLCPP_SMART_PTR_DEFINITIONS(ClockConditionalVariable)
329+
330+
RCLCPP_PUBLIC
331+
ClockConditionalVariable(
332+
rclcpp::Clock::SharedPtr & clock,
333+
rclcpp::Context::SharedPtr context = rclcpp::contexts::get_global_default_context());
334+
RCLCPP_PUBLIC
335+
~ClockConditionalVariable();
336+
337+
/**
338+
* Calling this function will block the current thread, until abs_time is reached,
339+
* or pred returns true.
340+
* @param lock A locked lock. The lock must be locked at call time, or this method will throw.
341+
* The lock will be atomically released and this thread will blocked.
342+
* The given lock must be created using the mutex returned my mutex().
343+
* @param abs_time The time until which this thread shall be blocked.
344+
* @param pred may be called in cased of spurious wakeups, but must be called every time
345+
* notify_one() was called. During the call to pred, the given lock will be locked.
346+
* This method will return, if pred returns true.
347+
*
348+
* @return true if until was reached.
349+
*/
350+
RCLCPP_PUBLIC
351+
bool
352+
wait_until(
353+
std::unique_lock<std::mutex> & lock, rclcpp::Time until,
354+
const std::function<bool ()> & pred);
355+
356+
/**
357+
* Notify the blocked thread, that is should reevaluate the wakeup condition.
358+
* E.g. the given pred function in wait_until shall be reevaluated.
359+
*/
360+
RCLCPP_PUBLIC
361+
void
362+
notify_one();
363+
364+
/**
365+
* Returns the internal mutex. In order to be race free with the context shutdown,
366+
* this mutex must be used for the wait_until call.
367+
*/
368+
RCLCPP_PUBLIC
369+
std::mutex &
370+
mutex();
371+
};
372+
373+
263374
} // namespace rclcpp
264375

265376
#endif // RCLCPP__CLOCK_HPP_

rclcpp/src/rclcpp/clock.cpp

Lines changed: 237 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ Clock::sleep_until(
118118
});
119119
// No longer need the shutdown callback when this function exits
120120
auto callback_remover = rcpputils::scope_exit(
121-
[context, &shutdown_cb_handle]() {
121+
[&context, &shutdown_cb_handle]() {
122122
context->remove_on_shutdown_callback(shutdown_cb_handle);
123123
});
124124

@@ -367,4 +367,240 @@ Clock::create_jump_callback(
367367
// *INDENT-ON*
368368
}
369369

370+
class ClockWaiter::ClockWaiterImpl
371+
{
372+
private:
373+
std::condition_variable cv_;
374+
375+
rclcpp::Clock::SharedPtr clock_;
376+
bool time_source_changed_ = false;
377+
std::function<void(const rcl_time_jump_t &)> post_time_jump_callback;
378+
379+
bool
380+
wait_until_system_time(
381+
std::unique_lock<std::mutex> & lock,
382+
const rclcpp::Time & abs_time, const std::function<bool ()> & pred)
383+
{
384+
auto system_time = std::chrono::system_clock::time_point(
385+
// Cast because system clock resolution is too big for nanoseconds on some systems
386+
std::chrono::duration_cast<std::chrono::system_clock::duration>(
387+
std::chrono::nanoseconds(abs_time.nanoseconds())));
388+
389+
return cv_.wait_until(lock, system_time, pred);
390+
}
391+
392+
bool
393+
wait_until_steady_time(
394+
std::unique_lock<std::mutex> & lock,
395+
const rclcpp::Time & abs_time, const std::function<bool ()> & pred)
396+
{
397+
// Synchronize because RCL steady clock epoch might differ from chrono::steady_clock epoch
398+
const rclcpp::Time rcl_entry = clock_->now();
399+
const std::chrono::steady_clock::time_point chrono_entry = std::chrono::steady_clock::now();
400+
const rclcpp::Duration delta_t = abs_time - rcl_entry;
401+
const std::chrono::steady_clock::time_point chrono_until =
402+
chrono_entry + std::chrono::nanoseconds(delta_t.nanoseconds());
403+
404+
return cv_.wait_until(lock, chrono_until, pred);
405+
}
406+
407+
408+
bool
409+
wait_until_ros_time(
410+
std::unique_lock<std::mutex> & lock,
411+
const rclcpp::Time & abs_time, const std::function<bool ()> & pred)
412+
{
413+
// Install jump handler for any amount of time change, for two purposes:
414+
// - if ROS time is active, check if time reached on each new clock sample
415+
// - Trigger via on_clock_change to detect if time source changes, to invalidate sleep
416+
rcl_jump_threshold_t threshold;
417+
threshold.on_clock_change = true;
418+
// 0 is disable, so -1 and 1 are smallest possible time changes
419+
threshold.min_backward.nanoseconds = -1;
420+
threshold.min_forward.nanoseconds = 1;
421+
422+
time_source_changed_ = false;
423+
424+
post_time_jump_callback = [this, &lock] (const rcl_time_jump_t & jump)
425+
{
426+
if (jump.clock_change != RCL_ROS_TIME_NO_CHANGE) {
427+
std::lock_guard<std::mutex> lk(*lock.mutex());
428+
time_source_changed_ = true;
429+
}
430+
cv_.notify_one();
431+
};
432+
433+
// Note this is a trade-off. Adding the callback for every call
434+
// is expensive for high frequency calls. For low frequency waits
435+
// its more overhead to have the callback being called all the time.
436+
// As we expect the use case to be low frequency calls to wait_until
437+
// with relative big pauses between the calls, we install it on demand.
438+
auto clock_handler = clock_->create_jump_callback(
439+
nullptr,
440+
post_time_jump_callback,
441+
threshold);
442+
443+
if (!clock_->ros_time_is_active()) {
444+
auto system_time = std::chrono::system_clock::time_point(
445+
// Cast because system clock resolution is too big for nanoseconds on some systems
446+
std::chrono::duration_cast<std::chrono::system_clock::duration>(
447+
std::chrono::nanoseconds(abs_time.nanoseconds())));
448+
449+
return cv_.wait_until(lock, system_time, [this, &pred] () {
450+
return time_source_changed_ || pred();
451+
});
452+
}
453+
454+
// RCL_ROS_TIME with ros_time_is_active.
455+
// Just wait without "until" because installed
456+
// jump callbacks wake the cv on every new sample.
457+
cv_.wait(lock, [this, &pred, &abs_time] () {
458+
return clock_->now() >= abs_time || time_source_changed_ || pred();
459+
});
460+
461+
return clock_->now() < abs_time;
462+
}
463+
464+
public:
465+
explicit ClockWaiterImpl(const rclcpp::Clock::SharedPtr & clock)
466+
:clock_(clock)
467+
{
468+
}
469+
470+
bool
471+
wait_until(
472+
std::unique_lock<std::mutex> & lock,
473+
const rclcpp::Time & abs_time, const std::function<bool ()> & pred)
474+
{
475+
switch(clock_->get_clock_type()) {
476+
case RCL_CLOCK_UNINITIALIZED:
477+
throw std::runtime_error("Error, wait on uninitialized clock called");
478+
case RCL_ROS_TIME:
479+
return wait_until_ros_time(lock, abs_time, pred);
480+
break;
481+
case RCL_STEADY_TIME:
482+
return wait_until_steady_time(lock, abs_time, pred);
483+
break;
484+
case RCL_SYSTEM_TIME:
485+
return wait_until_system_time(lock, abs_time, pred);
486+
break;
487+
}
488+
489+
return false;
490+
}
491+
492+
void
493+
notify_one()
494+
{
495+
cv_.notify_one();
496+
}
497+
};
498+
499+
ClockWaiter::ClockWaiter(const rclcpp::Clock::SharedPtr & clock)
500+
:impl_(std::make_unique<ClockWaiterImpl>(clock))
501+
{
502+
}
503+
504+
ClockWaiter::~ClockWaiter() = default;
505+
506+
bool
507+
ClockWaiter::wait_until(
508+
std::unique_lock<std::mutex> & lock,
509+
const rclcpp::Time & abs_time, const std::function<bool ()> & pred)
510+
{
511+
return impl_->wait_until(lock, abs_time, pred);
512+
}
513+
514+
void
515+
ClockWaiter::notify_one()
516+
{
517+
impl_->notify_one();
518+
}
519+
520+
class ClockConditionalVariable::Impl
521+
{
522+
std::mutex pred_mutex_;
523+
bool shutdown_ = false;
524+
rclcpp::Context::SharedPtr context_;
525+
rclcpp::OnShutdownCallbackHandle shutdown_cb_handle_;
526+
ClockWaiter::UniquePtr clock_;
527+
528+
public:
529+
Impl(rclcpp::Clock::SharedPtr & clock, rclcpp::Context::SharedPtr context)
530+
:context_(context),
531+
clock_(std::make_unique<ClockWaiter>(clock))
532+
{
533+
if (!context_ || !context_->is_valid()) {
534+
throw std::runtime_error("context cannot be slept with because it's invalid");
535+
}
536+
// Wake this thread if the context is shutdown
537+
shutdown_cb_handle_ = context_->add_on_shutdown_callback(
538+
[this]() {
539+
{
540+
std::unique_lock lock(pred_mutex_);
541+
shutdown_ = true;
542+
}
543+
clock_->notify_one();
544+
});
545+
}
546+
547+
bool
548+
wait_until(
549+
std::unique_lock<std::mutex> & lock, rclcpp::Time until,
550+
const std::function<bool ()> & pred)
551+
{
552+
if(lock.mutex() != &pred_mutex_) {
553+
throw std::runtime_error(
554+
"ClockConditionalVariable::wait_until: Internal error, given lock does not use"
555+
" mutex returned by this->mutex()");
556+
}
557+
558+
clock_->wait_until(lock, until, [this, &pred] () -> bool {
559+
return shutdown_ || pred();
560+
});
561+
return true;
562+
}
563+
564+
void
565+
notify_one()
566+
{
567+
clock_->notify_one();
568+
}
569+
570+
std::mutex &
571+
mutex()
572+
{
573+
return pred_mutex_;
574+
}
575+
};
576+
577+
ClockConditionalVariable::ClockConditionalVariable(
578+
rclcpp::Clock::SharedPtr & clock,
579+
rclcpp::Context::SharedPtr context)
580+
:impl_(std::make_unique<Impl>(clock, context))
581+
{
582+
}
583+
584+
ClockConditionalVariable::~ClockConditionalVariable() = default;
585+
586+
void
587+
ClockConditionalVariable::notify_one()
588+
{
589+
impl_->notify_one();
590+
}
591+
592+
bool
593+
ClockConditionalVariable::wait_until(
594+
std::unique_lock<std::mutex> & lock, rclcpp::Time until,
595+
const std::function<bool ()> & pred)
596+
{
597+
return impl_->wait_until(lock, until, pred);
598+
}
599+
600+
std::mutex &
601+
ClockConditionalVariable::mutex()
602+
{
603+
return impl_->mutex();
604+
}
605+
370606
} // namespace rclcpp

0 commit comments

Comments
 (0)