Skip to content

Commit 8c8c268

Browse files
authored
Add take_data to Waitable and data to AnyExecutable (#1241)
Signed-off-by: Audrow <audrow.nash@gmail.com> Signed-off-by: Ivan Santiago Paunovic <ivanpauno@ekumenlabs.com>
1 parent 27d1b11 commit 8c8c268

25 files changed

Lines changed: 384 additions & 106 deletions

rclcpp/include/rclcpp/any_executable.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ struct AnyExecutable
4747
// These are used to keep the scope on the containing items
4848
rclcpp::CallbackGroup::SharedPtr callback_group;
4949
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base;
50+
std::shared_ptr<void> data;
5051
};
5152

5253
namespace executor

rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,19 @@ class StaticExecutorEntitiesCollector final
7272
void
7373
fini();
7474

75+
/// Execute the waitable.
7576
RCLCPP_PUBLIC
7677
void
77-
execute() override;
78+
execute(std::shared_ptr<void> & data) override;
79+
80+
/// Take the data so that it can be consumed with `execute`.
81+
/**
82+
* For `StaticExecutorEntitiesCollector`, this always return `nullptr`.
83+
* \sa rclcpp::Waitable::take_data()
84+
*/
85+
RCLCPP_PUBLIC
86+
std::shared_ptr<void>
87+
take_data() override;
7888

7989
/// Function to add_handles_to_wait_set and wait for work and
8090
/**

rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
#include <rmw/rmw.h>
1919

2020
#include <functional>
21+
#include <map>
2122
#include <memory>
23+
#include <stdexcept>
2224
#include <string>
2325
#include <utility>
2426

@@ -115,13 +117,31 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
115117
bool
116118
is_ready(rcl_wait_set_t * wait_set)
117119
{
118-
(void)wait_set;
120+
(void) wait_set;
119121
return buffer_->has_data();
120122
}
121123

122-
void execute()
124+
std::shared_ptr<void>
125+
take_data()
123126
{
124-
execute_impl<CallbackMessageT>();
127+
ConstMessageSharedPtr shared_msg;
128+
MessageUniquePtr unique_msg;
129+
130+
if (any_callback_.use_take_shared_method()) {
131+
shared_msg = buffer_->consume_shared();
132+
} else {
133+
unique_msg = buffer_->consume_unique();
134+
}
135+
return std::static_pointer_cast<void>(
136+
std::make_shared<std::pair<ConstMessageSharedPtr, MessageUniquePtr>>(
137+
std::pair<ConstMessageSharedPtr, MessageUniquePtr>(
138+
shared_msg, std::move(unique_msg)))
139+
);
140+
}
141+
142+
void execute(std::shared_ptr<void> & data)
143+
{
144+
execute_impl<CallbackMessageT>(data);
125145
}
126146

127147
void
@@ -154,26 +174,35 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
154174

155175
template<typename T>
156176
typename std::enable_if<std::is_same<T, rcl_serialized_message_t>::value, void>::type
157-
execute_impl()
177+
execute_impl(std::shared_ptr<void> & data)
158178
{
179+
(void)data;
159180
throw std::runtime_error("Subscription intra-process can't handle serialized messages");
160181
}
161182

162183
template<class T>
163184
typename std::enable_if<!std::is_same<T, rcl_serialized_message_t>::value, void>::type
164-
execute_impl()
185+
execute_impl(std::shared_ptr<void> & data)
165186
{
187+
if (!data) {
188+
throw std::runtime_error("'data' is empty");
189+
}
190+
166191
rmw_message_info_t msg_info;
167192
msg_info.publisher_gid = {0, {0}};
168193
msg_info.from_intra_process = true;
169194

195+
auto shared_ptr = std::static_pointer_cast<std::pair<ConstMessageSharedPtr, MessageUniquePtr>>(
196+
data);
197+
170198
if (any_callback_.use_take_shared_method()) {
171-
ConstMessageSharedPtr msg = buffer_->consume_shared();
172-
any_callback_.dispatch_intra_process(msg, msg_info);
199+
ConstMessageSharedPtr shared_msg = shared_ptr->first;
200+
any_callback_.dispatch_intra_process(shared_msg, msg_info);
173201
} else {
174-
MessageUniquePtr msg = buffer_->consume_unique();
175-
any_callback_.dispatch_intra_process(std::move(msg), msg_info);
202+
MessageUniquePtr unique_msg = std::move(shared_ptr->second);
203+
any_callback_.dispatch_intra_process(std::move(unique_msg), msg_info);
176204
}
205+
shared_ptr.reset();
177206
}
178207

179208
AnySubscriptionCallback<CallbackMessageT, Alloc> any_callback_;

rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,12 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
5656
virtual bool
5757
is_ready(rcl_wait_set_t * wait_set) = 0;
5858

59+
virtual
60+
std::shared_ptr<void>
61+
take_data() = 0;
62+
5963
virtual void
60-
execute() = 0;
64+
execute(std::shared_ptr<void> & data) = 0;
6165

6266
virtual bool
6367
use_take_shared_method() const = 0;

rclcpp/include/rclcpp/qos_event.hpp

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
#define RCLCPP__QOS_EVENT_HPP_
1717

1818
#include <functional>
19+
#include <memory>
20+
#include <stdexcept>
1921
#include <string>
2022

2123
#include "rcl/error_handling.h"
@@ -131,21 +133,31 @@ class QOSEventHandler : public QOSEventHandlerBase
131133
}
132134
}
133135

134-
/// Execute any entities of the Waitable that are ready.
135-
void
136-
execute() override
136+
/// Take data so that the callback cannot be scheduled again
137+
std::shared_ptr<void>
138+
take_data() override
137139
{
138140
EventCallbackInfoT callback_info;
139-
140141
rcl_ret_t ret = rcl_take_event(&event_handle_, &callback_info);
141142
if (ret != RCL_RET_OK) {
142143
RCUTILS_LOG_ERROR_NAMED(
143144
"rclcpp",
144145
"Couldn't take event info: %s", rcl_get_error_string().str);
145-
return;
146+
return nullptr;
146147
}
148+
return std::static_pointer_cast<void>(std::make_shared<EventCallbackInfoT>(callback_info));
149+
}
147150

148-
event_callback_(callback_info);
151+
/// Execute any entities of the Waitable that are ready.
152+
void
153+
execute(std::shared_ptr<void> & data) override
154+
{
155+
if (!data) {
156+
throw std::runtime_error("'data' is empty");
157+
}
158+
auto callback_ptr = std::static_pointer_cast<EventCallbackInfoT>(data);
159+
event_callback_(*callback_ptr);
160+
callback_ptr.reset();
149161
}
150162

151163
private:

rclcpp/include/rclcpp/waitable.hpp

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#define RCLCPP__WAITABLE_HPP_
1717

1818
#include <atomic>
19+
#include <memory>
1920

2021
#include "rclcpp/macros.hpp"
2122
#include "rclcpp/visibility_control.hpp"
@@ -125,8 +126,17 @@ class Waitable
125126
bool
126127
is_ready(rcl_wait_set_t * wait_set) = 0;
127128

128-
/// Execute any entities of the Waitable that are ready.
129+
/// Take the data so that it can be consumed with `execute`.
129130
/**
131+
* NOTE: take_data is a partial fix to a larger design issue with the
132+
* multithreaded executor. This method is likely to be removed when
133+
* a more permanent fix is implemented. A longterm fix is currently
134+
* being discussed here: https://github.com/ros2/rclcpp/pull/1276
135+
*
136+
* This method takes the data from the underlying data structure and
137+
* writes it to the void shared pointer `data` that is passed into the
138+
* method. The `data` can then be executed with the `execute` method.
139+
*
130140
* Before calling this method, the Waitable should be added to a wait set,
131141
* waited on, and then updated.
132142
*
@@ -143,13 +153,41 @@ class Waitable
143153
* // Update the Waitable
144154
* waitable.update(wait_set);
145155
* // Execute any entities of the Waitable that may be ready
146-
* waitable.execute();
156+
* std::shared_ptr<void> data = waitable.take_data();
157+
* ```
158+
*/
159+
RCLCPP_PUBLIC
160+
virtual
161+
std::shared_ptr<void>
162+
take_data() = 0;
163+
164+
/// Execute data that is passed in.
165+
/**
166+
* Before calling this method, the Waitable should be added to a wait set,
167+
* waited on, and then updated - and the `take_data` method should be
168+
* called.
169+
*
170+
* Example usage:
171+
*
172+
* ```cpp
173+
* // ... create a wait set and a Waitable
174+
* // Add the Waitable to the wait set
175+
* bool add_ret = waitable.add_to_wait_set(wait_set);
176+
* // ... error handling
177+
* // Wait
178+
* rcl_ret_t wait_ret = rcl_wait(wait_set);
179+
* // ... error handling
180+
* // Update the Waitable
181+
* waitable.update(wait_set);
182+
* // Execute any entities of the Waitable that may be ready
183+
* std::shared_ptr<void> data = waitable.take_data();
184+
* waitable.execute(data);
147185
* ```
148186
*/
149187
RCLCPP_PUBLIC
150188
virtual
151189
void
152-
execute() = 0;
190+
execute(std::shared_ptr<void> & data) = 0;
153191

154192
/// Exchange the "in use by wait set" state for this timer.
155193
/**

rclcpp/src/rclcpp/executor.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,7 @@ Executor::execute_any_executable(AnyExecutable & any_exec)
504504
execute_client(any_exec.client);
505505
}
506506
if (any_exec.waitable) {
507-
any_exec.waitable->execute();
507+
any_exec.waitable->execute(any_exec.data);
508508
}
509509
// Reset the callback_group, regardless of type
510510
any_exec.callback_group->can_be_taken_from().store(true);
@@ -827,6 +827,7 @@ Executor::get_next_ready_executable_from_map(
827827
// Check the waitables to see if there are any that are ready
828828
memory_strategy_->get_next_waitable(any_executable, weak_groups_to_nodes);
829829
if (any_executable.waitable) {
830+
any_executable.data = any_executable.waitable->take_data();
830831
success = true;
831832
}
832833
}

rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ StaticExecutorEntitiesCollector::init(
7777
// Add executor's guard condition
7878
memory_strategy_->add_guard_condition(executor_guard_condition);
7979
// Get memory strategy and executable list. Prepare wait_set_
80-
execute();
80+
std::shared_ptr<void> shared_ptr;
81+
execute(shared_ptr);
8182
}
8283

8384
void
@@ -87,9 +88,16 @@ StaticExecutorEntitiesCollector::fini()
8788
exec_list_.clear();
8889
}
8990

91+
std::shared_ptr<void>
92+
StaticExecutorEntitiesCollector::take_data()
93+
{
94+
return nullptr;
95+
}
96+
9097
void
91-
StaticExecutorEntitiesCollector::execute()
98+
StaticExecutorEntitiesCollector::execute(std::shared_ptr<void> & data)
9299
{
100+
(void) data;
93101
// Fill memory strategy with entities coming from weak_nodes_
94102
fill_memory_strategy();
95103
// Fill exec_list_ with entities coming from weak_nodes_ (same as memory strategy)

rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,8 @@ StaticSingleThreadedExecutor::execute_ready_executables()
176176
// Execute all the ready waitables
177177
for (size_t i = 0; i < entities_collector_->get_number_of_waitables(); ++i) {
178178
if (entities_collector_->get_waitable(i)->is_ready(&wait_set_)) {
179-
entities_collector_->get_waitable(i)->execute();
179+
std::shared_ptr<void> shared_ptr;
180+
entities_collector_->get_waitable(i)->execute(shared_ptr);
180181
}
181182
}
182183
}

rclcpp/test/benchmark/benchmark_executor.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,7 @@ BENCHMARK_F(
384384
reset_heap_counters();
385385

386386
for (auto _ : st) {
387-
entities_collector_->execute();
387+
std::shared_ptr<void> data = entities_collector_->take_data();
388+
entities_collector_->execute(data);
388389
}
389390
}

0 commit comments

Comments
 (0)