Skip to content

Commit 79c2dd8

Browse files
wjwwoodhidmic
andauthored
use dynamic_pointer_cast to detect allocator mismatch in intra process manager (#1643)
* use dynamic_pointer_cast to detect allocator mismatch in intra process manager Signed-off-by: William Woodall <william@osrfoundation.org> * add test case for mismatched allocators Signed-off-by: William Woodall <william@osrfoundation.org> * forward template arguments to avoid mismatched types in intra process manager Signed-off-by: William Woodall <william@osrfoundation.org> * style fixes Signed-off-by: William Woodall <william@osrfoundation.org> * refactor to test message address and count, more DRY Signed-off-by: William Woodall <william@osrfoundation.org> * update copyright Signed-off-by: William Woodall <william@osrfoundation.org> * fix typo Signed-off-by: William Woodall <william@osrfoundation.org> Co-authored-by: Michel Hidalgo <michel@ekumenlabs.com> Co-authored-by: Michel Hidalgo <michel@ekumenlabs.com>
1 parent fba080c commit 79c2dd8

4 files changed

Lines changed: 314 additions & 10 deletions

File tree

rclcpp/include/rclcpp/experimental/intra_process_manager.hpp

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,8 @@ class IntraProcessManager
202202
// None of the buffers require ownership, so we promote the pointer
203203
std::shared_ptr<MessageT> msg = std::move(message);
204204

205-
this->template add_shared_msg_to_buffers<MessageT>(msg, sub_ids.take_shared_subscriptions);
205+
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
206+
msg, sub_ids.take_shared_subscriptions);
206207
} else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT
207208
sub_ids.take_shared_subscriptions.size() <= 1)
208209
{
@@ -227,7 +228,7 @@ class IntraProcessManager
227228
// for the buffers that do not require ownership
228229
auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(*allocator, *message);
229230

230-
this->template add_shared_msg_to_buffers<MessageT>(
231+
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
231232
shared_msg, sub_ids.take_shared_subscriptions);
232233
this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
233234
std::move(message), sub_ids.take_ownership_subscriptions, allocator);
@@ -263,7 +264,7 @@ class IntraProcessManager
263264
// If there are no owning, just convert to shared.
264265
std::shared_ptr<MessageT> shared_msg = std::move(message);
265266
if (!sub_ids.take_shared_subscriptions.empty()) {
266-
this->template add_shared_msg_to_buffers<MessageT>(
267+
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
267268
shared_msg, sub_ids.take_shared_subscriptions);
268269
}
269270
return shared_msg;
@@ -273,7 +274,7 @@ class IntraProcessManager
273274
auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(*allocator, *message);
274275

275276
if (!sub_ids.take_shared_subscriptions.empty()) {
276-
this->template add_shared_msg_to_buffers<MessageT>(
277+
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
277278
shared_msg,
278279
sub_ids.take_shared_subscriptions);
279280
}
@@ -350,7 +351,10 @@ class IntraProcessManager
350351
bool
351352
can_communicate(PublisherInfo pub_info, SubscriptionInfo sub_info) const;
352353

353-
template<typename MessageT>
354+
template<
355+
typename MessageT,
356+
typename Alloc,
357+
typename Deleter>
354358
void
355359
add_shared_msg_to_buffers(
356360
std::shared_ptr<const MessageT> message,
@@ -363,9 +367,16 @@ class IntraProcessManager
363367
}
364368
auto subscription_base = subscription_it->second.subscription.lock();
365369
if (subscription_base) {
366-
auto subscription = std::static_pointer_cast<
367-
rclcpp::experimental::SubscriptionIntraProcess<MessageT>
370+
auto subscription = std::dynamic_pointer_cast<
371+
rclcpp::experimental::SubscriptionIntraProcess<MessageT, Alloc, Deleter>
368372
>(subscription_base);
373+
if (nullptr == subscription) {
374+
throw std::runtime_error(
375+
"failed to dynamic cast SubscriptionIntraProcessBase to "
376+
"SubscriptionIntraProcess<MessageT, Alloc, Deleter>, which "
377+
"can happen when the publisher and subscription use different "
378+
"allocator types, which is not supported");
379+
}
369380

370381
subscription->provide_intra_process_message(message);
371382
} else {
@@ -394,9 +405,16 @@ class IntraProcessManager
394405
}
395406
auto subscription_base = subscription_it->second.subscription.lock();
396407
if (subscription_base) {
397-
auto subscription = std::static_pointer_cast<
398-
rclcpp::experimental::SubscriptionIntraProcess<MessageT>
408+
auto subscription = std::dynamic_pointer_cast<
409+
rclcpp::experimental::SubscriptionIntraProcess<MessageT, Alloc, Deleter>
399410
>(subscription_base);
411+
if (nullptr == subscription) {
412+
throw std::runtime_error(
413+
"failed to dynamic cast SubscriptionIntraProcessBase to "
414+
"SubscriptionIntraProcess<MessageT, Alloc, Deleter>, which "
415+
"can happen when the publisher and subscription use different "
416+
"allocator types, which is not supported");
417+
}
400418

401419
if (std::next(it) == subscription_ids.end()) {
402420
// If this is the last subscription, give up ownership

rclcpp/test/rclcpp/CMakeLists.txt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,13 @@ if(TARGET test_intra_process_manager)
143143
)
144144
target_link_libraries(test_intra_process_manager ${PROJECT_NAME})
145145
endif()
146+
ament_add_gmock(test_intra_process_manager_with_allocators test_intra_process_manager_with_allocators.cpp)
147+
if(TARGET test_intra_process_manager_with_allocators)
148+
ament_target_dependencies(test_intra_process_manager_with_allocators
149+
"test_msgs"
150+
)
151+
target_link_libraries(test_intra_process_manager_with_allocators ${PROJECT_NAME})
152+
endif()
146153
ament_add_gtest(test_ring_buffer_implementation test_ring_buffer_implementation.cpp)
147154
if(TARGET test_ring_buffer_implementation)
148155
ament_target_dependencies(test_ring_buffer_implementation

rclcpp/test/rclcpp/test_intra_process_manager.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,10 @@ class SubscriptionIntraProcessBase
216216
const char * topic_name;
217217
};
218218

219-
template<typename MessageT>
219+
template<
220+
typename MessageT,
221+
typename Alloc = std::allocator<void>,
222+
typename Deleter = std::default_delete<MessageT>>
220223
class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
221224
{
222225
public:
Lines changed: 276 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
// Copyright 2021 Open Source Robotics Foundation, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include <gmock/gmock.h>
16+
17+
#include <chrono>
18+
#include <list>
19+
#include <memory>
20+
#include <string>
21+
#include <utility>
22+
23+
#include "test_msgs/msg/empty.hpp"
24+
25+
#include "rclcpp/rclcpp.hpp"
26+
#include "rclcpp/allocator/allocator_common.hpp"
27+
#include "rclcpp/strategies/allocator_memory_strategy.hpp"
28+
29+
// For demonstration purposes only, not necessary for allocator_traits
30+
static uint32_t num_allocs = 0;
31+
static uint32_t num_deallocs = 0;
32+
// A very simple custom allocator. Counts calls to allocate and deallocate.
33+
template<typename T = void>
34+
struct MyAllocator
35+
{
36+
public:
37+
using value_type = T;
38+
using size_type = std::size_t;
39+
using pointer = T *;
40+
using const_pointer = const T *;
41+
using difference_type = typename std::pointer_traits<pointer>::difference_type;
42+
43+
MyAllocator() noexcept
44+
{
45+
}
46+
47+
~MyAllocator() noexcept {}
48+
49+
template<typename U>
50+
MyAllocator(const MyAllocator<U> &) noexcept
51+
{
52+
}
53+
54+
T * allocate(size_t size, const void * = 0)
55+
{
56+
if (size == 0) {
57+
return nullptr;
58+
}
59+
num_allocs++;
60+
return static_cast<T *>(std::malloc(size * sizeof(T)));
61+
}
62+
63+
void deallocate(T * ptr, size_t size)
64+
{
65+
(void)size;
66+
if (!ptr) {
67+
return;
68+
}
69+
num_deallocs++;
70+
std::free(ptr);
71+
}
72+
73+
template<typename U>
74+
struct rebind
75+
{
76+
typedef MyAllocator<U> other;
77+
};
78+
};
79+
80+
template<typename T, typename U>
81+
constexpr bool operator==(
82+
const MyAllocator<T> &,
83+
const MyAllocator<U> &) noexcept
84+
{
85+
return true;
86+
}
87+
88+
template<typename T, typename U>
89+
constexpr bool operator!=(
90+
const MyAllocator<T> &,
91+
const MyAllocator<U> &) noexcept
92+
{
93+
return false;
94+
}
95+
96+
template<
97+
typename PublishedMessageAllocatorT,
98+
typename PublisherAllocatorT,
99+
typename SubscribedMessageAllocatorT,
100+
typename SubscriptionAllocatorT,
101+
typename MessageMemoryStrategyAllocatorT,
102+
typename MemoryStrategyAllocatorT,
103+
typename ExpectedExceptionT
104+
>
105+
void
106+
do_custom_allocator_test(
107+
PublishedMessageAllocatorT published_message_allocator,
108+
PublisherAllocatorT publisher_allocator,
109+
SubscribedMessageAllocatorT /* subscribed_message_allocator */, // intentionally unused
110+
SubscriptionAllocatorT subscription_allocator,
111+
MessageMemoryStrategyAllocatorT message_memory_strategy,
112+
MemoryStrategyAllocatorT memory_strategy_allocator)
113+
{
114+
using PublishedMessageAllocTraits =
115+
rclcpp::allocator::AllocRebind<test_msgs::msg::Empty, PublishedMessageAllocatorT>;
116+
using PublishedMessageAlloc = typename PublishedMessageAllocTraits::allocator_type;
117+
using PublishedMessageDeleter =
118+
rclcpp::allocator::Deleter<PublishedMessageAlloc, test_msgs::msg::Empty>;
119+
120+
using SubscribedMessageAllocTraits =
121+
rclcpp::allocator::AllocRebind<test_msgs::msg::Empty, SubscribedMessageAllocatorT>;
122+
using SubscribedMessageAlloc = typename SubscribedMessageAllocTraits::allocator_type;
123+
using SubscribedMessageDeleter =
124+
rclcpp::allocator::Deleter<SubscribedMessageAlloc, test_msgs::msg::Empty>;
125+
126+
// init and node
127+
auto context = std::make_shared<rclcpp::Context>();
128+
context->init(0, nullptr);
129+
auto node = std::make_shared<rclcpp::Node>(
130+
"custom_allocator_test",
131+
rclcpp::NodeOptions().context(context).use_intra_process_comms(true));
132+
133+
// publisher
134+
auto shared_publisher_allocator = std::make_shared<PublisherAllocatorT>(publisher_allocator);
135+
rclcpp::PublisherOptionsWithAllocator<PublisherAllocatorT> publisher_options;
136+
publisher_options.allocator = shared_publisher_allocator;
137+
auto publisher =
138+
node->create_publisher<test_msgs::msg::Empty>("custom_allocator_test", 10, publisher_options);
139+
140+
// callback for subscription
141+
uint32_t counter = 0;
142+
std::promise<std::unique_ptr<test_msgs::msg::Empty, SubscribedMessageDeleter>> received_message;
143+
auto received_message_future = received_message.get_future();
144+
auto callback =
145+
[&counter, &received_message](
146+
std::unique_ptr<test_msgs::msg::Empty, SubscribedMessageDeleter> msg)
147+
{
148+
++counter;
149+
received_message.set_value(std::move(msg));
150+
};
151+
152+
// subscription
153+
auto shared_subscription_allocator =
154+
std::make_shared<SubscriptionAllocatorT>(subscription_allocator);
155+
rclcpp::SubscriptionOptionsWithAllocator<SubscriptionAllocatorT> subscription_options;
156+
subscription_options.allocator = shared_subscription_allocator;
157+
auto shared_message_strategy_allocator =
158+
std::make_shared<MessageMemoryStrategyAllocatorT>(message_memory_strategy);
159+
auto msg_mem_strat = std::make_shared<
160+
rclcpp::message_memory_strategy::MessageMemoryStrategy<
161+
test_msgs::msg::Empty,
162+
MessageMemoryStrategyAllocatorT
163+
>
164+
>(shared_message_strategy_allocator);
165+
using CallbackMessageT =
166+
typename rclcpp::subscription_traits::has_message_type<decltype(callback)>::type;
167+
auto subscriber = node->create_subscription<
168+
test_msgs::msg::Empty,
169+
decltype(callback),
170+
SubscriptionAllocatorT,
171+
CallbackMessageT,
172+
rclcpp::Subscription<CallbackMessageT, SubscriptionAllocatorT>,
173+
rclcpp::message_memory_strategy::MessageMemoryStrategy<
174+
CallbackMessageT,
175+
MessageMemoryStrategyAllocatorT
176+
>
177+
>(
178+
"custom_allocator_test",
179+
10,
180+
std::forward<decltype(callback)>(callback),
181+
subscription_options,
182+
msg_mem_strat);
183+
184+
// executor memory strategy
185+
using rclcpp::memory_strategies::allocator_memory_strategy::AllocatorMemoryStrategy;
186+
auto shared_memory_strategy_allocator = std::make_shared<MemoryStrategyAllocatorT>(
187+
memory_strategy_allocator);
188+
std::shared_ptr<rclcpp::memory_strategy::MemoryStrategy> memory_strategy =
189+
std::make_shared<AllocatorMemoryStrategy<MemoryStrategyAllocatorT>>(
190+
shared_memory_strategy_allocator);
191+
192+
// executor
193+
rclcpp::ExecutorOptions options;
194+
options.memory_strategy = memory_strategy;
195+
options.context = context;
196+
rclcpp::executors::SingleThreadedExecutor executor(options);
197+
198+
executor.add_node(node);
199+
200+
// rebind message_allocator to ensure correct type
201+
PublishedMessageDeleter message_deleter;
202+
PublishedMessageAlloc rebound_message_allocator = published_message_allocator;
203+
rclcpp::allocator::set_allocator_for_deleter(&message_deleter, &rebound_message_allocator);
204+
205+
// allocate a message
206+
auto ptr = PublishedMessageAllocTraits::allocate(rebound_message_allocator, 1);
207+
PublishedMessageAllocTraits::construct(rebound_message_allocator, ptr);
208+
std::unique_ptr<test_msgs::msg::Empty, PublishedMessageDeleter> msg(ptr, message_deleter);
209+
210+
// publisher and receive
211+
if constexpr (std::is_same_v<ExpectedExceptionT, void>) {
212+
// no exception expected
213+
EXPECT_NO_THROW(
214+
{
215+
publisher->publish(std::move(msg));
216+
executor.spin_until_future_complete(received_message_future, std::chrono::seconds(10));
217+
});
218+
EXPECT_EQ(ptr, received_message_future.get().get());
219+
EXPECT_EQ(1u, counter);
220+
} else {
221+
// exception expected
222+
EXPECT_THROW(
223+
{
224+
publisher->publish(std::move(msg));
225+
executor.spin_until_future_complete(received_message_future, std::chrono::seconds(10));
226+
}, ExpectedExceptionT);
227+
}
228+
}
229+
230+
/*
231+
This tests the case where a custom allocator is used correctly, i.e. the same
232+
custom allocator on both sides.
233+
*/
234+
TEST(TestIntraProcessManagerWithAllocators, custom_allocator) {
235+
using PublishedMessageAllocatorT = std::allocator<void>;
236+
using PublisherAllocatorT = std::allocator<void>;
237+
using SubscribedMessageAllocatorT = std::allocator<void>;
238+
using SubscriptionAllocatorT = std::allocator<void>;
239+
using MessageMemoryStrategyAllocatorT = std::allocator<void>;
240+
using MemoryStrategyAllocatorT = std::allocator<void>;
241+
auto allocator = std::allocator<void>();
242+
do_custom_allocator_test<
243+
PublishedMessageAllocatorT,
244+
PublisherAllocatorT,
245+
SubscribedMessageAllocatorT,
246+
SubscriptionAllocatorT,
247+
MessageMemoryStrategyAllocatorT,
248+
MemoryStrategyAllocatorT,
249+
void // no exception expected
250+
>(allocator, allocator, allocator, allocator, allocator, allocator);
251+
}
252+
253+
/*
254+
This tests the case where a custom allocator is used incorrectly, i.e. different
255+
custom allocators on both sides.
256+
*/
257+
TEST(TestIntraProcessManagerWithAllocators, custom_allocator_wrong) {
258+
// explicitly use a different allocator here to provoke a failure
259+
using PublishedMessageAllocatorT = std::allocator<void>;
260+
using PublisherAllocatorT = std::allocator<void>;
261+
using SubscribedMessageAllocatorT = MyAllocator<void>;
262+
using SubscriptionAllocatorT = MyAllocator<void>;
263+
using MessageMemoryStrategyAllocatorT = MyAllocator<void>;
264+
using MemoryStrategyAllocatorT = std::allocator<void>;
265+
auto allocator = std::allocator<void>();
266+
auto my_allocator = MyAllocator<void>();
267+
do_custom_allocator_test<
268+
PublishedMessageAllocatorT,
269+
PublisherAllocatorT,
270+
SubscribedMessageAllocatorT,
271+
SubscriptionAllocatorT,
272+
MessageMemoryStrategyAllocatorT,
273+
MemoryStrategyAllocatorT,
274+
std::runtime_error // expected exception
275+
>(allocator, allocator, my_allocator, my_allocator, my_allocator, allocator);
276+
}

0 commit comments

Comments
 (0)