Skip to content

Callback not reached, using callback group and Fast-RTPS #1611

@anaelle-sw

Description

@anaelle-sw

Bug report

Required Info:

  • Operating System:
    • Ubuntu 20.04
  • Installation type:
    • Binaries
  • Version or commit hash:
    • 8.1.0 ROS2 Rolling
    • Edit: Also reproduced on latest release: 8.2.0 ROS2 Rolling
  • DDS implementation:
    • Fast-RTPS
  • Client library (if applicable):
    • rclcpp

Steps to reproduce issue

Hello,
we are currently using CycloneDDS as RWM implementation, but we would need to use Fast-RTPS.
Using Fast-RTPS changes our ROS2 application behavior. I found out it is related to the callback groups, which we use a lot.
I tried to isolate the issue with 3 nodes that are quite simple:

  • A service client
#include <chrono>
#include <functional>
#include <memory>
#include <string>

#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/bool.hpp>
#include <std_srvs/srv/trigger.hpp>

class CallbackGroupSrvClient: public rclcpp::Node {
public:
  // Constructor
  CallbackGroupSrvClient()
  : Node("callback_goup_srv_client"){

    // Init callback group
    callback_group_ = this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive, false);
    callback_group_executor_thread_ = std::thread([this]() {
      callback_group_executor_.add_callback_group(callback_group_, this->get_node_base_interface());
      callback_group_executor_.spin();
    });

    // Init subscription
    subscription_ = this->create_subscription<std_msgs::msg::Bool>(
          "topic_bool",
          rclcpp::QoS(1),
          std::bind(
            &CallbackGroupSrvClient::sub_callback,
            this,
            std::placeholders::_1
            )
          );

    // Create service client and link it to callback group
    srv_client_ = this->create_client<std_srvs::srv::Trigger>(
          "trigger_srv",
          rmw_qos_profile_services_default,
          callback_group_
          );
  }

  // Destructor
  ~CallbackGroupSrvClient(){
    callback_group_executor_.cancel();
    callback_group_executor_thread_.join();
  }

private:
  // Methods
  void sub_callback(const std_msgs::msg::Bool::SharedPtr ){
    RCLCPP_INFO_STREAM(this->get_logger(), "Received new msg! ");

    // Send request and wait for future
    auto req = std::make_shared<std_srvs::srv::Trigger::Request>();
    auto future = srv_client_->async_send_request(req);
    RCLCPP_INFO_STREAM(this->get_logger(), "Sending request");
    auto future_status = future.wait_for(std::chrono::seconds(10));

    // Process result
    if (future_status == std::future_status::ready) {
      if (future.get()->success) {
        RCLCPP_INFO_STREAM(this->get_logger(), "Srv suceeded");
      }
      else {
        RCLCPP_INFO_STREAM(this->get_logger(), "Srv failed");
      }
    }
    else if (future_status == std::future_status::timeout) {
      RCLCPP_INFO_STREAM(this->get_logger(), "Response not received within timeout");
    }
    else if (future_status == std::future_status::deferred) {
      RCLCPP_INFO_STREAM(this->get_logger(), "Srv deferred");
    }

    return;
  }

  // Attributes
  rclcpp::Subscription<std_msgs::msg::Bool>::SharedPtr subscription_;
  rclcpp::CallbackGroup::SharedPtr callback_group_;
  rclcpp::executors::SingleThreadedExecutor callback_group_executor_;
  std::thread callback_group_executor_thread_;
  rclcpp::Client<std_srvs::srv::Trigger>::SharedPtr srv_client_;
};



int main(int argc, char * argv[]){
  rclcpp::init(argc, argv);
  auto client = std::make_shared<CallbackGroupSrvClient>();
  rclcpp::spin(client);
  rclcpp::shutdown();
  return 0;
}
  • A service server:
#include <chrono>
#include <functional>
#include <memory>
#include <string>

#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/bool.hpp>
#include <std_srvs/srv/trigger.hpp>


class CallbackGroupSrvServer: public rclcpp::Node {
public:
  // Constructor
  CallbackGroupSrvServer()
  : Node("callback_goup_srv_server"){
    // Init callback group and spin it
    callback_group_ = this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive, false);
    callback_group_executor_thread_ = std::thread([this]() {
      callback_group_executor_.add_callback_group(callback_group_, this->get_node_base_interface());
      callback_group_executor_.spin();
    });

    // Init service server and link it to callback group
    service_ = this->create_service<std_srvs::srv::Trigger>(
          "trigger_srv",
          std::bind(
            &CallbackGroupSrvServer::srv_callback,
            this,
            std::placeholders::_1,
            std::placeholders::_2)
          );
  }

  // Destructor
  ~CallbackGroupSrvServer(){
    callback_group_executor_.cancel();
    callback_group_executor_thread_.join();
  }

private:
  // Methods
  void sub_callback(const std_msgs::msg::Bool::SharedPtr ){
    new_msg_ = true;
    RCLCPP_INFO_STREAM(this->get_logger(), "Received new msg!");
    return;
  }

  void srv_callback(const std_srvs::srv::Trigger::Request::SharedPtr, std_srvs::srv::Trigger::Response::SharedPtr res){
    RCLCPP_INFO_STREAM(this->get_logger(), "Received request");
    new_msg_ = false;

    // Create subscription
    auto sub_options_ = rclcpp::SubscriptionOptions();
    sub_options_.callback_group = callback_group_;
    auto subscription = this->create_subscription<std_msgs::msg::Bool>(
          "topic_bool",
          rclcpp::QoS(1),
          std::bind(
            &CallbackGroupSrvServer::sub_callback,
            this,
            std::placeholders::_1),
          sub_options_
          );

    // Wait for new_msg
    while (rclcpp::ok()
           && new_msg_ == false) {
      rclcpp::sleep_for(std::chrono::seconds(1));
      RCLCPP_INFO_STREAM(this->get_logger(), "Sleep...");
    }

    res->message = "";
    res->success = true;
    return;
  }

  // Attributes
  rclcpp::Service<std_srvs::srv::Trigger>::SharedPtr service_;
  rclcpp::CallbackGroup::SharedPtr callback_group_;
  rclcpp::executors::SingleThreadedExecutor callback_group_executor_;
  std::thread callback_group_executor_thread_;
  bool new_msg_;
};

int main(int argc, char * argv[]){
  rclcpp::init(argc, argv);
  auto srv_server = std::make_shared<CallbackGroupSrvServer>();
  rclcpp::spin(srv_server);
  rclcpp::shutdown();
  return 0;
}
  • And a minimal publisher:
#include <chrono>
#include <functional>
#include <memory>
#include <string>

#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/bool.hpp>


class MinimalPublisher: public rclcpp::Node {
public:
  // constructor
  MinimalPublisher()
  : Node("minimal_publisher"){
    publisher_ = this->create_publisher<std_msgs::msg::Bool>("topic_bool", rclcpp::QoS(1));
    timer_ = this->create_wall_timer(
          std::chrono::seconds(5),
          std::bind(
            &MinimalPublisher::timer_callback,
            this
            )
          );
  }

private:
  // method
  void timer_callback(){
    std_msgs::msg::Bool msg;
    msg.data = false;
    publisher_->publish(msg);
    RCLCPP_INFO_STREAM(this->get_logger(), "Publishing data! ");
    return;
  }

  // attributes
  rclcpp::TimerBase::SharedPtr timer_;
  rclcpp::Publisher<std_msgs::msg::Bool>::SharedPtr publisher_;
};

int main(int argc, char * argv[]){
  rclcpp::init(argc, argv);
  auto pub = std::make_shared<MinimalPublisher>();
  rclcpp::spin(pub);
  rclcpp::shutdown();
  return 0;
}

Here's a small description of each:

  • The publisher publishes data every 5 seconds on topic /topic_bool.
  • The client subscribes to /topic_bool. When a new message is received on this topic, the client calls the service /trigger_srv and wait 10 seconds for a response. The service client is declared with a callback group (which is added to a dedicated spinning executor)
  • The server for service /trigger_srv subscribes to /topic_bool when a new service request is received. When a new message is received on this topic, the server sends the service response. The subscription is declared with a callback group (which is added to a dedicated spinning executor)

Then in 3 different shells I force Fast-RTPS as RMW implementation: export RMW_IMPLEMENTATION=rmw_fastrtps_cpp
And I launch each node in a shell.

Expected behavior

To get the expected behavior, you can launch the nodes using CycloneDDS.
Here's what should happen:

  • The publisher publishes data on /topic_bool.
  • The client calls on /trigger_srv.
  • The server wait for a new message on /topic_bool.
  • The publisher publishes data on /topic_bool.
  • The server sends the response
  • The client receives the response

Actual behavior

Here's what happens when using Fast-RTPS:

  • The publisher publishes data on /topic_bool.
  • The client calls on /trigger_srv.
  • The server wait for a new message on /topic_bool.
  • The publisher publishes data on /topic_bool.
  • The server doesn't get the new message on /topic_bool.
  • The client stop waiting for a response (timeout).

Additional information

I tried to reproduce this issue under different conditions:

  • On two different computers (to check it is not related to one particular setup)
  • With / without a domain Id (via setting environment variable ROS_DOMAIN_ID)
  • With WiFi On / Off (to be sure to not catch other ROS nodes / daemons on the network)
  • With CycloneDDS binaries installed / removed, and a rebuild

Between each test, I made sure to do ros2 daemon stop.
Under all these conditions, and all combinations of them, the issue can be reproduced.

Thanks for helping!

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions