-
Notifications
You must be signed in to change notification settings - Fork 522
Description
Bug report
Required Info:
- Operating System:
- Ubuntu 20.04
- Installation type:
- Binaries
- Version or commit hash:
- DDS implementation:
- Fast-RTPS
- Client library (if applicable):
- rclcpp
Steps to reproduce issue
Hello,
we are currently using CycloneDDS as RWM implementation, but we would need to use Fast-RTPS.
Using Fast-RTPS changes our ROS2 application behavior. I found out it is related to the callback groups, which we use a lot.
I tried to isolate the issue with 3 nodes that are quite simple:
- A service client
#include <chrono>
#include <functional>
#include <memory>
#include <string>
#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/bool.hpp>
#include <std_srvs/srv/trigger.hpp>
class CallbackGroupSrvClient: public rclcpp::Node {
public:
// Constructor
CallbackGroupSrvClient()
: Node("callback_goup_srv_client"){
// Init callback group
callback_group_ = this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive, false);
callback_group_executor_thread_ = std::thread([this]() {
callback_group_executor_.add_callback_group(callback_group_, this->get_node_base_interface());
callback_group_executor_.spin();
});
// Init subscription
subscription_ = this->create_subscription<std_msgs::msg::Bool>(
"topic_bool",
rclcpp::QoS(1),
std::bind(
&CallbackGroupSrvClient::sub_callback,
this,
std::placeholders::_1
)
);
// Create service client and link it to callback group
srv_client_ = this->create_client<std_srvs::srv::Trigger>(
"trigger_srv",
rmw_qos_profile_services_default,
callback_group_
);
}
// Destructor
~CallbackGroupSrvClient(){
callback_group_executor_.cancel();
callback_group_executor_thread_.join();
}
private:
// Methods
void sub_callback(const std_msgs::msg::Bool::SharedPtr ){
RCLCPP_INFO_STREAM(this->get_logger(), "Received new msg! ");
// Send request and wait for future
auto req = std::make_shared<std_srvs::srv::Trigger::Request>();
auto future = srv_client_->async_send_request(req);
RCLCPP_INFO_STREAM(this->get_logger(), "Sending request");
auto future_status = future.wait_for(std::chrono::seconds(10));
// Process result
if (future_status == std::future_status::ready) {
if (future.get()->success) {
RCLCPP_INFO_STREAM(this->get_logger(), "Srv suceeded");
}
else {
RCLCPP_INFO_STREAM(this->get_logger(), "Srv failed");
}
}
else if (future_status == std::future_status::timeout) {
RCLCPP_INFO_STREAM(this->get_logger(), "Response not received within timeout");
}
else if (future_status == std::future_status::deferred) {
RCLCPP_INFO_STREAM(this->get_logger(), "Srv deferred");
}
return;
}
// Attributes
rclcpp::Subscription<std_msgs::msg::Bool>::SharedPtr subscription_;
rclcpp::CallbackGroup::SharedPtr callback_group_;
rclcpp::executors::SingleThreadedExecutor callback_group_executor_;
std::thread callback_group_executor_thread_;
rclcpp::Client<std_srvs::srv::Trigger>::SharedPtr srv_client_;
};
int main(int argc, char * argv[]){
rclcpp::init(argc, argv);
auto client = std::make_shared<CallbackGroupSrvClient>();
rclcpp::spin(client);
rclcpp::shutdown();
return 0;
}- A service server:
#include <chrono>
#include <functional>
#include <memory>
#include <string>
#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/bool.hpp>
#include <std_srvs/srv/trigger.hpp>
class CallbackGroupSrvServer: public rclcpp::Node {
public:
// Constructor
CallbackGroupSrvServer()
: Node("callback_goup_srv_server"){
// Init callback group and spin it
callback_group_ = this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive, false);
callback_group_executor_thread_ = std::thread([this]() {
callback_group_executor_.add_callback_group(callback_group_, this->get_node_base_interface());
callback_group_executor_.spin();
});
// Init service server and link it to callback group
service_ = this->create_service<std_srvs::srv::Trigger>(
"trigger_srv",
std::bind(
&CallbackGroupSrvServer::srv_callback,
this,
std::placeholders::_1,
std::placeholders::_2)
);
}
// Destructor
~CallbackGroupSrvServer(){
callback_group_executor_.cancel();
callback_group_executor_thread_.join();
}
private:
// Methods
void sub_callback(const std_msgs::msg::Bool::SharedPtr ){
new_msg_ = true;
RCLCPP_INFO_STREAM(this->get_logger(), "Received new msg!");
return;
}
void srv_callback(const std_srvs::srv::Trigger::Request::SharedPtr, std_srvs::srv::Trigger::Response::SharedPtr res){
RCLCPP_INFO_STREAM(this->get_logger(), "Received request");
new_msg_ = false;
// Create subscription
auto sub_options_ = rclcpp::SubscriptionOptions();
sub_options_.callback_group = callback_group_;
auto subscription = this->create_subscription<std_msgs::msg::Bool>(
"topic_bool",
rclcpp::QoS(1),
std::bind(
&CallbackGroupSrvServer::sub_callback,
this,
std::placeholders::_1),
sub_options_
);
// Wait for new_msg
while (rclcpp::ok()
&& new_msg_ == false) {
rclcpp::sleep_for(std::chrono::seconds(1));
RCLCPP_INFO_STREAM(this->get_logger(), "Sleep...");
}
res->message = "";
res->success = true;
return;
}
// Attributes
rclcpp::Service<std_srvs::srv::Trigger>::SharedPtr service_;
rclcpp::CallbackGroup::SharedPtr callback_group_;
rclcpp::executors::SingleThreadedExecutor callback_group_executor_;
std::thread callback_group_executor_thread_;
bool new_msg_;
};
int main(int argc, char * argv[]){
rclcpp::init(argc, argv);
auto srv_server = std::make_shared<CallbackGroupSrvServer>();
rclcpp::spin(srv_server);
rclcpp::shutdown();
return 0;
}- And a minimal publisher:
#include <chrono>
#include <functional>
#include <memory>
#include <string>
#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/bool.hpp>
class MinimalPublisher: public rclcpp::Node {
public:
// constructor
MinimalPublisher()
: Node("minimal_publisher"){
publisher_ = this->create_publisher<std_msgs::msg::Bool>("topic_bool", rclcpp::QoS(1));
timer_ = this->create_wall_timer(
std::chrono::seconds(5),
std::bind(
&MinimalPublisher::timer_callback,
this
)
);
}
private:
// method
void timer_callback(){
std_msgs::msg::Bool msg;
msg.data = false;
publisher_->publish(msg);
RCLCPP_INFO_STREAM(this->get_logger(), "Publishing data! ");
return;
}
// attributes
rclcpp::TimerBase::SharedPtr timer_;
rclcpp::Publisher<std_msgs::msg::Bool>::SharedPtr publisher_;
};
int main(int argc, char * argv[]){
rclcpp::init(argc, argv);
auto pub = std::make_shared<MinimalPublisher>();
rclcpp::spin(pub);
rclcpp::shutdown();
return 0;
}Here's a small description of each:
- The publisher publishes data every 5 seconds on topic
/topic_bool. - The client subscribes to
/topic_bool. When a new message is received on this topic, the client calls the service/trigger_srvand wait 10 seconds for a response. The service client is declared with a callback group (which is added to a dedicated spinning executor) - The server for service
/trigger_srvsubscribes to/topic_boolwhen a new service request is received. When a new message is received on this topic, the server sends the service response. The subscription is declared with a callback group (which is added to a dedicated spinning executor)
Then in 3 different shells I force Fast-RTPS as RMW implementation: export RMW_IMPLEMENTATION=rmw_fastrtps_cpp
And I launch each node in a shell.
Expected behavior
To get the expected behavior, you can launch the nodes using CycloneDDS.
Here's what should happen:
- The publisher publishes data on
/topic_bool. - The client calls on
/trigger_srv. - The server wait for a new message on
/topic_bool. - The publisher publishes data on
/topic_bool. - The server sends the response
- The client receives the response
Actual behavior
Here's what happens when using Fast-RTPS:
- The publisher publishes data on
/topic_bool. - The client calls on
/trigger_srv. - The server wait for a new message on
/topic_bool. - The publisher publishes data on
/topic_bool. - The server doesn't get the new message on
/topic_bool. - The client stop waiting for a response (timeout).
Additional information
I tried to reproduce this issue under different conditions:
- On two different computers (to check it is not related to one particular setup)
- With / without a domain Id (via setting environment variable
ROS_DOMAIN_ID) - With WiFi On / Off (to be sure to not catch other ROS nodes / daemons on the network)
- With CycloneDDS binaries installed / removed, and a rebuild
Between each test, I made sure to do ros2 daemon stop.
Under all these conditions, and all combinations of them, the issue can be reproduced.
Thanks for helping!