-
Notifications
You must be signed in to change notification settings - Fork 278
Description
Feature request
Feature description
I have been writing some hardware "test" scripts used for checking the integration of hardware in real time (not using any test framework or assertions but simply stimulating a system with inputs and manually confirming the hardware is performing as expected). I have found writing this using async has made things much simpler and nicer:
e.g.
async def test_1(self):
self.get_logger().info("Setting Rudder to Center")
await self.set_and_wait_until_rudder_angle(0.0, 5.0)
self.get_logger().info("Setting Rudder to Full Port")
await self.set_and_wait_until_rudder_angle(-1.0, 5.0)To which I kick off like so:
def main(args=None):
rclpy.init(args=args)
executor = SingleThreadedExecutor()
node = TestNode()
executor.add_node(node)
try:
node.get_logger().info("Starting Test 1")
task = executor.create_task(node.test_1())
executor.spin_until_future_complete(task)
node.get_logger().info("Testing complete!")
except KeyboardInterrupt:
pass
node.destroy_node()
executor.shutdown()Now there is pretty much no examples of create_task being used like this so I have been experimenting and reading a bit of the rclpy code. After looking at the executor code in depth I couldn't find out how to use the executor to do an equivalent to await asyncio.sleep. I had a quick look at how asyncio implemented theirs and I came up with this hack using a Timer:
Edit: use the version in a comment below instead: #1234 (comment)
async def async_wait_for(self, rel_time: float):
fut = Future()
timer = None
def done_waiting():
fut.set_result(None)
timer.cancel()
timer = self.create_timer(rel_time, done_waiting)
await futThis was the only missing piece and I was able to asynchronously "wait" and write the co-routines as expected.
My request is something like this is added to the Executor class, perhaps using a more elegant interface than abusing a Timer like this.
#279 might be related.