|
18 | 18 | #include <rmw/rmw.h> |
19 | 19 |
|
20 | 20 | #include <functional> |
| 21 | +#include <map> |
21 | 22 | #include <memory> |
| 23 | +#include <stdexcept> |
22 | 24 | #include <string> |
23 | 25 | #include <utility> |
24 | 26 |
|
@@ -115,13 +117,31 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase |
115 | 117 | bool |
116 | 118 | is_ready(rcl_wait_set_t * wait_set) |
117 | 119 | { |
118 | | - (void)wait_set; |
| 120 | + (void) wait_set; |
119 | 121 | return buffer_->has_data(); |
120 | 122 | } |
121 | 123 |
|
122 | | - void execute() |
| 124 | + std::shared_ptr<void> |
| 125 | + take_data() |
123 | 126 | { |
124 | | - execute_impl<CallbackMessageT>(); |
| 127 | + ConstMessageSharedPtr shared_msg; |
| 128 | + MessageUniquePtr unique_msg; |
| 129 | + |
| 130 | + if (any_callback_.use_take_shared_method()) { |
| 131 | + shared_msg = buffer_->consume_shared(); |
| 132 | + } else { |
| 133 | + unique_msg = buffer_->consume_unique(); |
| 134 | + } |
| 135 | + return std::static_pointer_cast<void>( |
| 136 | + std::make_shared<std::pair<ConstMessageSharedPtr, MessageUniquePtr>>( |
| 137 | + std::pair<ConstMessageSharedPtr, MessageUniquePtr>( |
| 138 | + shared_msg, std::move(unique_msg))) |
| 139 | + ); |
| 140 | + } |
| 141 | + |
| 142 | + void execute(std::shared_ptr<void> & data) |
| 143 | + { |
| 144 | + execute_impl<CallbackMessageT>(data); |
125 | 145 | } |
126 | 146 |
|
127 | 147 | void |
@@ -154,26 +174,35 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase |
154 | 174 |
|
155 | 175 | template<typename T> |
156 | 176 | typename std::enable_if<std::is_same<T, rcl_serialized_message_t>::value, void>::type |
157 | | - execute_impl() |
| 177 | + execute_impl(std::shared_ptr<void> & data) |
158 | 178 | { |
| 179 | + (void)data; |
159 | 180 | throw std::runtime_error("Subscription intra-process can't handle serialized messages"); |
160 | 181 | } |
161 | 182 |
|
162 | 183 | template<class T> |
163 | 184 | typename std::enable_if<!std::is_same<T, rcl_serialized_message_t>::value, void>::type |
164 | | - execute_impl() |
| 185 | + execute_impl(std::shared_ptr<void> & data) |
165 | 186 | { |
| 187 | + if (!data) { |
| 188 | + throw std::runtime_error("'data' is empty"); |
| 189 | + } |
| 190 | + |
166 | 191 | rmw_message_info_t msg_info; |
167 | 192 | msg_info.publisher_gid = {0, {0}}; |
168 | 193 | msg_info.from_intra_process = true; |
169 | 194 |
|
| 195 | + auto shared_ptr = std::static_pointer_cast<std::pair<ConstMessageSharedPtr, MessageUniquePtr>>( |
| 196 | + data); |
| 197 | + |
170 | 198 | if (any_callback_.use_take_shared_method()) { |
171 | | - ConstMessageSharedPtr msg = buffer_->consume_shared(); |
172 | | - any_callback_.dispatch_intra_process(msg, msg_info); |
| 199 | + ConstMessageSharedPtr shared_msg = shared_ptr->first; |
| 200 | + any_callback_.dispatch_intra_process(shared_msg, msg_info); |
173 | 201 | } else { |
174 | | - MessageUniquePtr msg = buffer_->consume_unique(); |
175 | | - any_callback_.dispatch_intra_process(std::move(msg), msg_info); |
| 202 | + MessageUniquePtr unique_msg = std::move(shared_ptr->second); |
| 203 | + any_callback_.dispatch_intra_process(std::move(unique_msg), msg_info); |
176 | 204 | } |
| 205 | + shared_ptr.reset(); |
177 | 206 | } |
178 | 207 |
|
179 | 208 | AnySubscriptionCallback<CallbackMessageT, Alloc> any_callback_; |
|
0 commit comments