-
Notifications
You must be signed in to change notification settings - Fork 96
Closed
Description
Feature request
Feature description
It would be nice to have a way to check if a topic is publishing messages (with a timeout).
THis suggestion came up in #263
Implementation considerations
One way to do that can be :
from threading import Event, Thread
import unittest
import launch
import launch.actions
import launch_ros.actions
import launch_testing.actions
import pytest
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
@pytest.mark.launch_test
def generate_test_description():
return launch.LaunchDescription([
launch_ros.actions.Node(
package='demo_nodes_cpp',
executable='talker',
name='demo_node_1',
),
launch_testing.actions.ReadyToTest()
])
class TestFixture(unittest.TestCase):
def test_check_if_msgs_published(self, proc_output):
rclpy.init()
node = MakeTestNode('test_node')
node.start_subscriber()
msgs_received_flag = node.msg_event_object.wait(timeout=5.0)
assert msgs_received_flag, 'Did not receive msgs !'
rclpy.shutdown()
class MakeTestNode(Node):
def __init__(self, name='test_node'):
super().__init__(name)
self.msg_event_object = Event()
def start_subscriber(self):
# Create a subscriber
self.subscription = self.create_subscription(
String,
'chatter',
self.subscriber_callback,
10
)
# Add a spin thread
self.ros_spin_thread = Thread(target=lambda node: rclpy.spin(node), args=(self,))
self.ros_spin_thread.start()
def subscriber_callback(self, data):
self.msg_event_object.set()
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels