1717
1818#include < atomic>
1919#include < future>
20- #include < map >
20+ #include < unordered_map >
2121#include < memory>
22+ #include < optional> // NOLINT, cpplint doesn't think this is a cpp std header
2223#include < sstream>
2324#include < string>
2425#include < tuple>
2526#include < utility>
27+ #include < variant> // NOLINT
2628
2729#include " rcl/client.h"
2830#include " rcl/error_handling.h"
@@ -178,6 +180,9 @@ template<typename ServiceT>
178180class Client : public ClientBase
179181{
180182public:
183+ using Request = typename ServiceT::Request;
184+ using Response = typename ServiceT::Response;
185+
181186 using SharedRequest = typename ServiceT::Request::SharedPtr;
182187 using SharedResponse = typename ServiceT::Response::SharedPtr;
183188
@@ -187,6 +192,7 @@ class Client : public ClientBase
187192 using SharedPromise = std::shared_ptr<Promise>;
188193 using SharedPromiseWithRequest = std::shared_ptr<PromiseWithRequest>;
189194
195+ using Future = std::future<SharedResponse>;
190196 using SharedFuture = std::shared_future<SharedResponse>;
191197 using SharedFutureWithRequest = std::shared_future<std::pair<SharedRequest, SharedResponse>>;
192198
@@ -195,6 +201,88 @@ class Client : public ClientBase
195201
196202 RCLCPP_SMART_PTR_DEFINITIONS (Client)
197203
204+ // / A convenient Client::Future and request id pair.
205+ class FutureAndRequestId
206+ {
207+ public:
208+ FutureAndRequestId (Future impl, int64_t req_id)
209+ : impl_(std::move(impl)), req_id_(req_id)
210+ {}
211+
212+ // / Allow implicit conversions to `std::future` by reference.
213+ // TODO(ivanpauno): Maybe, deprecate this in favor of get_future() (?)
214+ operator Future &() {return impl_;}
215+
216+ // / Deprecated, use take_future() instead.
217+ /* *
218+ * Allow implicit conversions to `std::future` by value.
219+ * \deprecated
220+ */
221+ [[deprecated(" FutureAndRequestId: use take_future() instead of an implicit conversion" )]]
222+ operator Future () {return impl_;}
223+
224+ // / Returns the internal `std::future`, moving it out.
225+ /* *
226+ * After calling this method, the internal future gets invalidated.
227+ */
228+ Future
229+ take_future () {return impl_;}
230+
231+ // / Getter for the internal future.
232+ Future &
233+ get_future () {return impl_;}
234+
235+ // / Getter for the internal future.
236+ const Future &
237+ get_future () const {return impl_;}
238+
239+ // / Returns the request id associated with this future.
240+ int64_t get_request_id () const {return req_id_;}
241+
242+ // delegate future like methods in the std::future impl_
243+
244+ // / See std::future::share().
245+ SharedFuture share () noexcept {return impl_.share ();}
246+ // / See std::future::get().
247+ SharedResponse get () {return impl_.get ();}
248+ // / See std::future::valid().
249+ bool valid () const noexcept {return impl_.valid ();}
250+ // / See std::future::wait().
251+ void wait () const {return impl_.wait ();}
252+ // / See std::future::wait_for().
253+ template <class Rep , class Period >
254+ std::future_status wait_for (
255+ const std::chrono::duration<Rep, Period> & timeout_duration) const
256+ {
257+ return impl_.wait_for (timeout_duration);
258+ }
259+ // / See std::future::wait_until().
260+ template <class Clock , class Duration >
261+ std::future_status wait_until (
262+ const std::chrono::time_point<Clock, Duration> & timeout_time) const
263+ {
264+ return impl_.wait_until (timeout_time);
265+ }
266+
267+ // Rule of five, we could use the rule of zero here, but better be explicit as some of the
268+ // methods are deleted.
269+
270+ // / Move constructor.
271+ FutureAndRequestId (FutureAndRequestId && other) noexcept = default ;
272+ // / Deleted copy constructor, each instance is a unique owner of the future.
273+ FutureAndRequestId (const FutureAndRequestId & other) = delete ;
274+ // / Move assignment.
275+ FutureAndRequestId & operator =(FutureAndRequestId && other) noexcept = default ;
276+ // / Deleted copy assignment, each instance is a unique owner of the future.
277+ FutureAndRequestId & operator =(const FutureAndRequestId & other) = delete ;
278+ // / Destructor.
279+ ~FutureAndRequestId () = default ;
280+
281+ private:
282+ Future impl_;
283+ int64_t req_id_;
284+ };
285+
198286 // / Default constructor.
199287 /* *
200288 * The constructor for a Client is almost never called directly.
@@ -292,34 +380,83 @@ class Client : public ClientBase
292380 std::shared_ptr<rmw_request_id_t > request_header,
293381 std::shared_ptr<void > response) override
294382 {
295- std::unique_lock<std::mutex> lock (pending_requests_mutex_);
296- auto typed_response = std::static_pointer_cast<typename ServiceT::Response>(response);
297- int64_t sequence_number = request_header->sequence_number ;
298- // TODO(esteve) this should throw instead since it is not expected to happen in the first place
299- if (this ->pending_requests_ .count (sequence_number) == 0 ) {
300- RCUTILS_LOG_ERROR_NAMED (
301- " rclcpp" ,
302- " Received invalid sequence number. Ignoring..." );
383+ auto opt = this ->get_and_erase_pending_request (request_header->sequence_number );
384+ if (!opt) {
303385 return ;
304386 }
305- auto tuple = this ->pending_requests_ [sequence_number];
306- auto call_promise = std::get<0 >(tuple);
307- auto callback = std::get<1 >(tuple);
308- auto future = std::get<2 >(tuple);
309- this ->pending_requests_ .erase (sequence_number);
310- // Unlock here to allow the service to be called recursively from one of its callbacks.
311- lock.unlock ();
312-
313- call_promise->set_value (typed_response);
314- callback (future);
387+ auto & value = *opt;
388+ auto typed_response = std::static_pointer_cast<typename ServiceT::Response>(
389+ std::move (response));
390+ if (std::holds_alternative<Promise>(value)) {
391+ auto & promise = std::get<Promise>(value);
392+ promise.set_value (std::move (typed_response));
393+ } else if (std::holds_alternative<CallbackType>(value)) {
394+ Promise promise;
395+ promise.set_value (std::move (typed_response));
396+ const auto & callback = std::get<CallbackType>(value);
397+ callback (promise.get_future ().share ());
398+ } else if (std::holds_alternative<std::pair<CallbackWithRequestType, SharedRequest>>(value)) {
399+ PromiseWithRequest promise;
400+ const auto & pair = std::get<std::pair<CallbackWithRequestType, SharedRequest>>(value);
401+ promise.set_value (std::make_pair (std::move (pair.second ), std::move (typed_response)));
402+ pair.first (promise.get_future ().share ());
403+ }
315404 }
316405
317- SharedFuture
406+ // / Send a request to the service server.
407+ /* *
408+ * This method returns a `FutureAndRequestId` instance
409+ * that can be passed to Executor::spin_until_future_complete() to
410+ * wait until it has been completed.
411+ *
412+ * If the future never completes,
413+ * e.g. the call to Executor::spin_until_future_complete() times out,
414+ * Client::remove_pending_request() must be called to clean the client internal state.
415+ * Not doing so will make the `Client` instance to use more memory each time a response is not
416+ * received from the service server.
417+ *
418+ * ```cpp
419+ * auto future = client->async_send_request(my_request);
420+ * if (
421+ * rclcpp::FutureReturnCode::TIMEOUT ==
422+ * executor->spin_until_future_complete(future, timeout))
423+ * {
424+ * client->remove_pending_request(future);
425+ * // handle timeout
426+ * } else {
427+ * handle_response(future.get());
428+ * }
429+ * ```
430+ *
431+ * \param[in] request request to be send.
432+ * \return a FutureAndRequestId instance.
433+ */
434+ FutureAndRequestId
318435 async_send_request (SharedRequest request)
319436 {
320- return async_send_request (request, [](SharedFuture) {});
437+ Promise promise;
438+ auto future = promise.get_future ();
439+ auto req_id = async_send_request_impl (
440+ *request,
441+ std::move (promise));
442+ return FutureAndRequestId (std::move (future), req_id);
321443 }
322444
445+ // / Send a request to the service server and schedule a callback in the executor.
446+ /* *
447+ * Similar to the previous overload, but a callback will automatically be called when a response is received.
448+ *
449+ * If the callback is never called, because we never got a reply for the service server, remove_pending_request()
450+ * has to be called with the returned request id or prune_pending_requests().
451+ * Not doing so will make the `Client` instance use more memory each time a response is not
452+ * received from the service server.
453+ * In this case, it's convenient to setup a timer to cleanup the pending requests.
454+ *
455+ * \param[in] request request to be send.
456+ * \param[in] cb callback that will be called when we get a response for this request.
457+ * \return the request id representing the request just sent.
458+ */
459+ // TODO(ivanpauno): Link to example that shows how to cleanup requests.
323460 template <
324461 typename CallbackT,
325462 typename std::enable_if<
@@ -329,23 +466,24 @@ class Client : public ClientBase
329466 >::value
330467 >::type * = nullptr
331468 >
332- SharedFuture
469+ int64_t
333470 async_send_request (SharedRequest request, CallbackT && cb)
334471 {
335- std::lock_guard<std::mutex> lock (pending_requests_mutex_);
336- int64_t sequence_number;
337- rcl_ret_t ret = rcl_send_request (get_client_handle ().get (), request.get (), &sequence_number);
338- if (RCL_RET_OK != ret) {
339- rclcpp::exceptions::throw_from_rcl_error (ret, " failed to send request" );
340- }
341-
342- SharedPromise call_promise = std::make_shared<Promise>();
343- SharedFuture f (call_promise->get_future ());
344- pending_requests_[sequence_number] =
345- std::make_tuple (call_promise, std::forward<CallbackType>(cb), f);
346- return f;
472+ return async_send_request_impl (
473+ *request,
474+ CallbackType{std::forward<CallbackT>(cb)});
347475 }
348476
477+ // / Send a request to the service server and schedule a callback in the executor.
478+ /* *
479+ * Similar to the previous method, but you can get both the request and response in the callback.
480+ *
481+ * \param[in] request request to be send.
482+ * \param[in] cb callback that will be called when we get a response for this request.
483+ * \return the request id representing the request just sent.
484+ */
485+ // TODO(ivanpauno): Deprecate this.
486+ // If someone wants the request they can capture it in the lambda.
349487 template <
350488 typename CallbackT,
351489 typename std::enable_if<
@@ -355,28 +493,100 @@ class Client : public ClientBase
355493 >::value
356494 >::type * = nullptr
357495 >
358- SharedFutureWithRequest
496+ int64_t
359497 async_send_request (SharedRequest request, CallbackT && cb)
360498 {
361- SharedPromiseWithRequest promise = std::make_shared<PromiseWithRequest>();
362- SharedFutureWithRequest future_with_request (promise->get_future ());
499+ auto & req = *request;
500+ return async_send_request_impl (
501+ req,
502+ std::make_pair (CallbackWithRequestType{std::forward<CallbackT>(cb)}, std::move (request)));
503+ }
504+
505+ // / Cleanup a pending request.
506+ /* *
507+ * This notifies the client that we have waited long enough for a response from the server
508+ * to come, we have given up and we are not waiting for a response anymore.
509+ *
510+ * Not calling this will make the client start using more memory for each request
511+ * that never got a reply from the server.
512+ *
513+ * \param[in] request_id request id returned by async_send_request().
514+ * \return true when a pending request was removed, false if not (e.g. a response was received).
515+ */
516+ bool
517+ remove_pending_request (int64_t request_id)
518+ {
519+ std::lock_guard guard (pending_requests_mutex_);
520+ return pending_requests_.erase (request_id) != 0u ;
521+ }
522+
523+ // / Cleanup a pending request.
524+ /* *
525+ * Convenient overload, same as:
526+ *
527+ * `Client::remove_pending_request(this, future.get_request_id())`.
528+ */
529+ bool
530+ remove_pending_request (const FutureAndRequestId & future)
531+ {
532+ return this ->remove_pending_request (future.get_request_id ());
533+ }
534+
535+ // / Clean all pending requests.
536+ /* *
537+ * \return number of pending requests that were removed.
538+ */
539+ size_t
540+ prune_requests ()
541+ {
542+ std::lock_guard guard (pending_requests_mutex_);
543+ auto ret = pending_requests_.size ();
544+ pending_requests_.clear ();
545+ return ret;
546+ }
363547
364- auto wrapping_cb = [future_with_request, promise, request,
365- cb = std::forward<CallbackWithRequestType>(cb)](SharedFuture future) {
366- auto response = future.get ();
367- promise->set_value (std::make_pair (request, response));
368- cb (future_with_request);
369- };
548+ protected:
549+ using PendingRequestsMapValue = std::variant<
550+ std::promise<SharedResponse>,
551+ CallbackType,
552+ std::pair<CallbackWithRequestType, SharedRequest>>;
370553
371- async_send_request (request, wrapping_cb);
554+ RCLCPP_PUBLIC
555+ int64_t
556+ async_send_request_impl (const Request & request, PendingRequestsMapValue value)
557+ {
558+ int64_t sequence_number;
559+ rcl_ret_t ret = rcl_send_request (get_client_handle ().get (), &request, &sequence_number);
560+ if (RCL_RET_OK != ret) {
561+ rclcpp::exceptions::throw_from_rcl_error (ret, " failed to send request" );
562+ }
563+ {
564+ std::lock_guard<std::mutex> lock (pending_requests_mutex_);
565+ pending_requests_.try_emplace (sequence_number, std::move (value));
566+ }
567+ return sequence_number;
568+ }
372569
373- return future_with_request;
570+ RCLCPP_PUBLIC
571+ std::optional<PendingRequestsMapValue>
572+ get_and_erase_pending_request (int64_t request_number)
573+ {
574+ std::unique_lock<std::mutex> lock (pending_requests_mutex_);
575+ auto it = this ->pending_requests_ .find (request_number);
576+ if (it == this ->pending_requests_ .end ()) {
577+ RCUTILS_LOG_DEBUG_NAMED (
578+ " rclcpp" ,
579+ " Received invalid sequence number. Ignoring..." );
580+ return std::nullopt ;
581+ }
582+ auto value = std::move (it->second );
583+ this ->pending_requests_ .erase (request_number);
584+ return value;
374585 }
375586
376- private:
377587 RCLCPP_DISABLE_COPY (Client)
378588
379- std::map <int64_t , std::tuple<SharedPromise, CallbackType, SharedFuture> > pending_requests_;
589+ std::unordered_map <int64_t , PendingRequestsMapValue > pending_requests_;
380590 std::mutex pending_requests_mutex_;
381591};
382592
0 commit comments