Skip to content

Commit abd4229

Browse files
Add callbacks for PlayerClock::jump(time_point) API (ros2#775)
* Add callbacks to the jump API Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Add unit tests for TimeControllerClock::jump callbacks Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Address some code style issues and renames Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Change Doxygen comments to `C` style. Signed-off-by: Michael Orlov <michael.orlov@apex.ai> * Allow one of the callbacks to be nullptr. Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
1 parent 04a76c3 commit abd4229

4 files changed

Lines changed: 502 additions & 11 deletions

File tree

rosbag2_cpp/include/rosbag2_cpp/clocks/player_clock.hpp

Lines changed: 96 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#ifndef ROSBAG2_CPP__CLOCKS__PLAYER_CLOCK_HPP_
1616
#define ROSBAG2_CPP__CLOCKS__PLAYER_CLOCK_HPP_
1717

18+
#include <atomic>
1819
#include <chrono>
1920
#include <functional>
2021
#include <memory>
@@ -35,6 +36,76 @@ namespace rosbag2_cpp
3536
class PlayerClock
3637
{
3738
public:
39+
class JumpHandler final
40+
{
41+
public:
42+
using SharedPtr = std::shared_ptr<JumpHandler>;
43+
using pre_callback_t = std::function<void ()>;
44+
using post_callback_t = std::function<void (const rcl_time_jump_t &)>;
45+
46+
JumpHandler() = delete;
47+
JumpHandler(const JumpHandler &) = delete;
48+
const JumpHandler & operator=(const JumpHandler &) = delete;
49+
50+
/**
51+
* \brief Creates JumpHandler object with callbacks for jump operation.
52+
* \param pre_callback Pre-time jump callback. Must be non-throwing.
53+
* \param post_callback Post-time jump callback. Must be non-throwing.
54+
* \param threshold Callbacks will be triggered if the time jump is greater then the threshold.
55+
* \note These callback functions must remain valid as long as the returned shared pointer is
56+
* valid.
57+
* \return Shared pointer to the newly created JumpHandler object.
58+
* \throws std::bad_alloc if the allocation of the JumpHandler fails.
59+
* \throws std::invalid argument if any of the provided callbacks are nullptr.
60+
*/
61+
ROSBAG2_CPP_PUBLIC
62+
static SharedPtr create(
63+
const pre_callback_t & pre_callback, const post_callback_t & post_callback,
64+
const rcl_jump_threshold_t & threshold)
65+
{
66+
JumpHandler::SharedPtr handler(new JumpHandler(pre_callback, post_callback, threshold));
67+
if (handler == nullptr) {
68+
throw std::bad_alloc{};
69+
}
70+
return handler;
71+
}
72+
73+
/**
74+
* \brief Equal operator for PlayerClock::JumpHandler class.
75+
* \param right Right side operand for equal comparison operation.
76+
* \return true if internal id's of two JumpHandlers match, otherwise false.
77+
*/
78+
ROSBAG2_CPP_PUBLIC
79+
bool operator==(const PlayerClock::JumpHandler & right) const
80+
{
81+
return id == right.id;
82+
}
83+
84+
const pre_callback_t pre_callback;
85+
const post_callback_t post_callback;
86+
const rcl_jump_threshold_t notice_threshold;
87+
88+
private:
89+
uint64_t id;
90+
91+
JumpHandler(
92+
const pre_callback_t & pre_callback, const post_callback_t & post_callback,
93+
const rcl_jump_threshold_t & threshold)
94+
: pre_callback(pre_callback), post_callback(post_callback), notice_threshold(threshold)
95+
{
96+
if (pre_callback == nullptr && post_callback == nullptr) {
97+
throw std::invalid_argument("At least one callback for JumpHandler should be not nullptr");
98+
}
99+
id = create_id();
100+
}
101+
102+
static uint64_t create_id()
103+
{
104+
static std::atomic<uint64_t> id_count{0};
105+
return id_count.fetch_add(1, std::memory_order_relaxed) + 1;
106+
}
107+
};
108+
38109
/**
39110
* Type representing an arbitrary steady time, used to measure real-time durations
40111
* This type is never exposed by the PlayerClock - it is only used as input to the PlayerClock.
@@ -101,16 +172,39 @@ class PlayerClock
101172
virtual bool is_paused() const = 0;
102173

103174
/**
104-
* Change the current ROS time to an arbitrary time.
175+
* \brief Change the current ROS time to an arbitrary time.
176+
* \note This will wake any waiting `sleep_until` and trigger any registered JumpHandler
177+
* callbacks.
178+
* \param time_point Time point in ROS playback timeline.
105179
*/
106180
ROSBAG2_CPP_PUBLIC
107-
virtual void jump(rcutils_time_point_value_t) = 0;
181+
virtual void jump(rcutils_time_point_value_t time_point) = 0;
108182

109183
/**
110184
* \sa jump
111185
*/
112186
ROSBAG2_CPP_PUBLIC
113187
virtual void jump(rclcpp::Time time) = 0;
188+
189+
/**
190+
* \brief Add callbacks to be called when a time jump exceeds a threshold.
191+
* \details Callbacks specified in JumpHandler object will be called in two cases:
192+
* 1. use_sim_time is true: if the external time source jumps back in time, or forward
193+
* farther than the threshold.
194+
* 2. use_sim_time is false: if jump(time_point) is called and time jumps back or forward
195+
* farther than the threshold.
196+
* \param handler Shared pointer to the JumpHandler object returned from JumpHandler::create(..)
197+
* \throws std::invalid argument if jump threshold has invalid value.
198+
*/
199+
ROSBAG2_CPP_PUBLIC
200+
virtual void add_jump_calbacks(PlayerClock::JumpHandler::SharedPtr handler) = 0;
201+
202+
/**
203+
* \brief remove jump callbacks from processing list.
204+
* \param handler Shared pointer to the JumpHandler object returned from JumpHandler::create(..)
205+
*/
206+
ROSBAG2_CPP_PUBLIC
207+
virtual void remove_jump_callbacks(PlayerClock::JumpHandler::SharedPtr handler) = 0;
114208
};
115209

116210
} // namespace rosbag2_cpp

rosbag2_cpp/include/rosbag2_cpp/clocks/time_controller_clock.hpp

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,9 @@ class TimeControllerClock : public PlayerClock
109109

110110
/**
111111
* Change the current ROS time to an arbitrary time.
112-
* This will wake any waiting `sleep_until`.
113-
* Note: this initial implementation does not have any jump-callback handling.
114-
* The Player should not use this method while its queues are active ("during playback")
112+
* \note This will wake any waiting `sleep_until` and trigger any registered JumpHandler
113+
* callbacks.
114+
* \note The Player should not use this method while its queues are active ("during playback")
115115
* as an arbitrary time jump can make those queues, and the state of the Reader/Storage, invalid
116116
* The current Player implementation should only use this method in between calls to `play`,
117117
* to reset the initial time of the clock.
@@ -124,6 +124,26 @@ class TimeControllerClock : public PlayerClock
124124
ROSBAG2_CPP_PUBLIC
125125
void jump(rclcpp::Time ros_time) override;
126126

127+
/**
128+
* \brief Add callbacks to be called when a time jump exceeds a threshold.
129+
* \details Callbacks specified in JumpHandler object will be called in two cases:
130+
* 1. use_sim_time is true: if the external time source jumps back in time, or forward
131+
* farther than the threshold.
132+
* 2. use_sim_time is false: if jump(time_point) is called and time jumps back or forward
133+
* farther than the threshold.
134+
* \param handler Shared pointer to the JumpHandler object returned from JumpHandler::create(..)
135+
* \throws std::invalid argument if jump threshold has invalid value.
136+
*/
137+
ROSBAG2_CPP_PUBLIC
138+
void add_jump_calbacks(PlayerClock::JumpHandler::SharedPtr handler) override;
139+
140+
/**
141+
* \brief remove jump callbacks from processing list.
142+
* \param handler Shared pointer to the JumpHandler object returned from JumpHandler::create(..)
143+
*/
144+
ROSBAG2_CPP_PUBLIC
145+
void remove_jump_callbacks(PlayerClock::JumpHandler::SharedPtr handler) override;
146+
127147
private:
128148
std::unique_ptr<TimeControllerClockImpl> impl_;
129149
};

rosbag2_cpp/src/rosbag2_cpp/clocks/time_controller_clock.cpp

Lines changed: 123 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include <memory>
1717
#include <mutex>
1818
#include <thread>
19+
#include <vector>
1920

2021
#include "rcpputils/thread_safety_annotations.hpp"
2122
#include "rosbag2_cpp/clocks/time_controller_clock.hpp"
@@ -114,6 +115,72 @@ class TimeControllerClockImpl
114115
snapshot(ros_now());
115116
}
116117

118+
/**
119+
* \brief Adjust internal clock to the specified timestamp.
120+
* \details It will change the current internally maintained offset so that next published time
121+
* is different.
122+
* \note Will trigger any registered JumpHandler callbacks.
123+
* \param ros_time Time point in ROS playback timeline.
124+
*/
125+
void jump(rcutils_time_point_value_t ros_time)
126+
{
127+
rcl_duration_t time_jump_delta;
128+
{
129+
std::lock_guard<std::mutex> lock(state_mutex);
130+
time_jump_delta.nanoseconds = ros_time - steady_to_ros(now_fn());
131+
}
132+
133+
rcl_time_jump_t time_jump{};
134+
time_jump.clock_change = RCL_ROS_TIME_NO_CHANGE;
135+
time_jump.delta = time_jump_delta;
136+
137+
process_callbacks_before_jump(time_jump);
138+
{
139+
std::lock_guard<std::mutex> lock(state_mutex);
140+
snapshot(ros_time);
141+
}
142+
process_callbacks_after_jump(time_jump);
143+
cv.notify_all();
144+
}
145+
146+
/**
147+
* \brief Add callbacks to be called when a time jump exceeds a threshold.
148+
* \param handler Shared pointer to the JumpHandler object returned from JumpHandler::create(..)
149+
* \throws std::invalid argument if jump threshold has invalid value.
150+
*/
151+
void add_jump_callbacks(PlayerClock::JumpHandler::SharedPtr handler)
152+
{
153+
if (handler->notice_threshold.min_forward.nanoseconds < 0) {
154+
throw std::invalid_argument("forward jump threshold must be positive or zero");
155+
}
156+
if (handler->notice_threshold.min_backward.nanoseconds > 0) {
157+
throw std::invalid_argument("backward jump threshold must be negative or zero");
158+
}
159+
160+
std::lock_guard<std::mutex> lock(callback_list_mutex_);
161+
for (auto const & registered_handler : callback_list_) {
162+
if (*registered_handler == *handler) {
163+
return; // Already have this callback in the list.
164+
}
165+
}
166+
callback_list_.push_back(handler);
167+
}
168+
169+
/**
170+
* \brief remove jump callbacks from processing list.
171+
* \param handler Shared pointer to the JumpHandler object returned from JumpHandler::create(..)
172+
*/
173+
void remove_jump_callbacks(PlayerClock::JumpHandler::SharedPtr handler)
174+
{
175+
std::lock_guard<std::mutex> lock(callback_list_mutex_);
176+
for (auto it = callback_list_.begin(); it != callback_list_.end(); ++it) {
177+
if (**it == *handler) {
178+
callback_list_.erase(it);
179+
return;
180+
}
181+
}
182+
}
183+
117184
const PlayerClock::NowFunction now_fn;
118185
const std::chrono::milliseconds sleep_time_while_paused;
119186

@@ -122,6 +189,49 @@ class TimeControllerClockImpl
122189
double rate RCPPUTILS_TSA_GUARDED_BY(state_mutex) = 1.0;
123190
bool paused RCPPUTILS_TSA_GUARDED_BY(state_mutex) = false;
124191
TimeReference reference RCPPUTILS_TSA_GUARDED_BY(state_mutex);
192+
193+
private:
194+
std::mutex callback_list_mutex_;
195+
std::vector<PlayerClock::JumpHandler::SharedPtr> callback_list_
196+
RCPPUTILS_TSA_GUARDED_BY(callback_list_mutex_);
197+
198+
void process_callbacks_before_jump(const rcl_time_jump_t & time_jump)
199+
{
200+
std::lock_guard<std::mutex> lock(callback_list_mutex_);
201+
for (auto const & handler : callback_list_) {
202+
process_callback(handler, time_jump, true);
203+
}
204+
}
205+
206+
void process_callbacks_after_jump(const rcl_time_jump_t & time_jump)
207+
{
208+
std::lock_guard<std::mutex> lock(callback_list_mutex_);
209+
for (auto const & handler : callback_list_) {
210+
process_callback(handler, time_jump, false);
211+
}
212+
}
213+
214+
static void process_callback(
215+
PlayerClock::JumpHandler::SharedPtr handler, const rcl_time_jump_t & time_jump,
216+
bool before_jump) RCPPUTILS_TSA_REQUIRES(callback_list_mutex_)
217+
{
218+
bool is_clock_change = time_jump.clock_change == RCL_ROS_TIME_ACTIVATED ||
219+
time_jump.clock_change == RCL_ROS_TIME_DEACTIVATED;
220+
if ((is_clock_change && handler->notice_threshold.on_clock_change) ||
221+
(handler->notice_threshold.min_backward.nanoseconds != 0 &&
222+
time_jump.delta.nanoseconds < 0 &&
223+
time_jump.delta.nanoseconds <= handler->notice_threshold.min_backward.nanoseconds) ||
224+
(handler->notice_threshold.min_forward.nanoseconds != 0 &&
225+
time_jump.delta.nanoseconds > 0 &&
226+
time_jump.delta.nanoseconds >= handler->notice_threshold.min_forward.nanoseconds))
227+
{
228+
if (before_jump && handler->pre_callback) {
229+
handler->pre_callback();
230+
} else if (!before_jump && handler->post_callback) {
231+
handler->post_callback(time_jump);
232+
}
233+
}
234+
}
125235
};
126236

127237
TimeControllerClock::TimeControllerClock(
@@ -186,6 +296,16 @@ bool TimeControllerClock::set_rate(double rate)
186296
return true;
187297
}
188298

299+
void TimeControllerClock::add_jump_calbacks(PlayerClock::JumpHandler::SharedPtr handler)
300+
{
301+
impl_->add_jump_callbacks(handler);
302+
}
303+
304+
void TimeControllerClock::remove_jump_callbacks(PlayerClock::JumpHandler::SharedPtr handler)
305+
{
306+
impl_->remove_jump_callbacks(handler);
307+
}
308+
189309
double TimeControllerClock::get_rate() const
190310
{
191311
std::lock_guard<std::mutex> lock(impl_->state_mutex);
@@ -198,7 +318,7 @@ void TimeControllerClock::pause()
198318
if (impl_->paused) {
199319
return;
200320
}
201-
// Take snaphot before changing state
321+
// Take snapshot before changing state
202322
impl_->snapshot();
203323
impl_->paused = true;
204324
impl_->cv.notify_all();
@@ -210,7 +330,7 @@ void TimeControllerClock::resume()
210330
if (!impl_->paused) {
211331
return;
212332
}
213-
// Take snaphot before changing state
333+
// Take snapshot before changing state
214334
impl_->snapshot();
215335
impl_->paused = false;
216336
impl_->cv.notify_all();
@@ -224,9 +344,7 @@ bool TimeControllerClock::is_paused() const
224344

225345
void TimeControllerClock::jump(rcutils_time_point_value_t ros_time)
226346
{
227-
std::lock_guard<std::mutex> lock(impl_->state_mutex);
228-
impl_->snapshot(ros_time);
229-
impl_->cv.notify_all();
347+
impl_->jump(ros_time);
230348
}
231349

232350
void TimeControllerClock::jump(rclcpp::Time ros_time)

0 commit comments

Comments
 (0)