1717
1818#include < atomic>
1919#include < future>
20- #include < unordered_map >
20+ #include < map >
2121#include < memory>
22- #include < optional> // NOLINT, cpplint doesn't think this is a cpp std header
2322#include < sstream>
2423#include < string>
2524#include < tuple>
2625#include < utility>
27- #include < variant> // NOLINT
2826
2927#include " rcl/client.h"
3028#include " rcl/error_handling.h"
@@ -180,9 +178,6 @@ template<typename ServiceT>
180178class Client : public ClientBase
181179{
182180public:
183- using Request = typename ServiceT::Request;
184- using Response = typename ServiceT::Response;
185-
186181 using SharedRequest = typename ServiceT::Request::SharedPtr;
187182 using SharedResponse = typename ServiceT::Response::SharedPtr;
188183
@@ -192,7 +187,6 @@ class Client : public ClientBase
192187 using SharedPromise = std::shared_ptr<Promise>;
193188 using SharedPromiseWithRequest = std::shared_ptr<PromiseWithRequest>;
194189
195- using Future = std::future<SharedResponse>;
196190 using SharedFuture = std::shared_future<SharedResponse>;
197191 using SharedFutureWithRequest = std::shared_future<std::pair<SharedRequest, SharedResponse>>;
198192
@@ -201,88 +195,6 @@ class Client : public ClientBase
201195
202196 RCLCPP_SMART_PTR_DEFINITIONS (Client)
203197
204- // / A convenient Client::Future and request id pair.
205- class FutureAndRequestId
206- {
207- public:
208- FutureAndRequestId (Future impl, int64_t req_id)
209- : impl_(std::move(impl)), req_id_(req_id)
210- {}
211-
212- // / Allow implicit conversions to `std::future` by reference.
213- // TODO(ivanpauno): Maybe, deprecate this in favor of get_future() (?)
214- operator Future &() {return impl_;}
215-
216- // / Deprecated, use take_future() instead.
217- /* *
218- * Allow implicit conversions to `std::future` by value.
219- * \deprecated
220- */
221- [[deprecated(" FutureAndRequestId: use take_future() instead of an implicit conversion" )]]
222- operator Future () {return impl_;}
223-
224- // / Returns the internal `std::future`, moving it out.
225- /* *
226- * After calling this method, the internal future gets invalidated.
227- */
228- Future
229- take_future () {return impl_;}
230-
231- // / Getter for the internal future.
232- Future &
233- get_future () {return impl_;}
234-
235- // / Getter for the internal future.
236- const Future &
237- get_future () const {return impl_;}
238-
239- // / Returns the request id associated with this future.
240- int64_t get_request_id () const {return req_id_;}
241-
242- // delegate future like methods in the std::future impl_
243-
244- // / See std::future::share().
245- SharedFuture share () noexcept {return impl_.share ();}
246- // / See std::future::get().
247- SharedResponse get () {return impl_.get ();}
248- // / See std::future::valid().
249- bool valid () const noexcept {return impl_.valid ();}
250- // / See std::future::wait().
251- void wait () const {return impl_.wait ();}
252- // / See std::future::wait_for().
253- template <class Rep , class Period >
254- std::future_status wait_for (
255- const std::chrono::duration<Rep, Period> & timeout_duration) const
256- {
257- return impl_.wait_for (timeout_duration);
258- }
259- // / See std::future::wait_until().
260- template <class Clock , class Duration >
261- std::future_status wait_until (
262- const std::chrono::time_point<Clock, Duration> & timeout_time) const
263- {
264- return impl_.wait_until (timeout_time);
265- }
266-
267- // Rule of five, we could use the rule of zero here, but better be explicit as some of the
268- // methods are deleted.
269-
270- // / Move constructor.
271- FutureAndRequestId (FutureAndRequestId && other) noexcept = default ;
272- // / Deleted copy constructor, each instance is a unique owner of the future.
273- FutureAndRequestId (const FutureAndRequestId & other) = delete ;
274- // / Move assignment.
275- FutureAndRequestId & operator =(FutureAndRequestId && other) noexcept = default ;
276- // / Deleted copy assignment, each instance is a unique owner of the future.
277- FutureAndRequestId & operator =(const FutureAndRequestId & other) = delete ;
278- // / Destructor.
279- ~FutureAndRequestId () = default ;
280-
281- private:
282- Future impl_;
283- int64_t req_id_;
284- };
285-
286198 // / Default constructor.
287199 /* *
288200 * The constructor for a Client is almost never called directly.
@@ -380,83 +292,34 @@ class Client : public ClientBase
380292 std::shared_ptr<rmw_request_id_t > request_header,
381293 std::shared_ptr<void > response) override
382294 {
383- auto opt = this ->get_and_erase_pending_request (request_header->sequence_number );
384- if (!opt) {
295+ std::unique_lock<std::mutex> lock (pending_requests_mutex_);
296+ auto typed_response = std::static_pointer_cast<typename ServiceT::Response>(response);
297+ int64_t sequence_number = request_header->sequence_number ;
298+ // TODO(esteve) this should throw instead since it is not expected to happen in the first place
299+ if (this ->pending_requests_ .count (sequence_number) == 0 ) {
300+ RCUTILS_LOG_ERROR_NAMED (
301+ " rclcpp" ,
302+ " Received invalid sequence number. Ignoring..." );
385303 return ;
386304 }
387- auto & value = *opt;
388- auto typed_response = std::static_pointer_cast<typename ServiceT::Response>(
389- std::move (response));
390- if (std::holds_alternative<Promise>(value)) {
391- auto & promise = std::get<Promise>(value);
392- promise.set_value (std::move (typed_response));
393- } else if (std::holds_alternative<CallbackType>(value)) {
394- Promise promise;
395- promise.set_value (std::move (typed_response));
396- const auto & callback = std::get<CallbackType>(value);
397- callback (promise.get_future ().share ());
398- } else if (std::holds_alternative<std::pair<CallbackWithRequestType, SharedRequest>>(value)) {
399- PromiseWithRequest promise;
400- const auto & pair = std::get<std::pair<CallbackWithRequestType, SharedRequest>>(value);
401- promise.set_value (std::make_pair (std::move (pair.second ), std::move (typed_response)));
402- pair.first (promise.get_future ().share ());
403- }
305+ auto tuple = this ->pending_requests_ [sequence_number];
306+ auto call_promise = std::get<0 >(tuple);
307+ auto callback = std::get<1 >(tuple);
308+ auto future = std::get<2 >(tuple);
309+ this ->pending_requests_ .erase (sequence_number);
310+ // Unlock here to allow the service to be called recursively from one of its callbacks.
311+ lock.unlock ();
312+
313+ call_promise->set_value (typed_response);
314+ callback (future);
404315 }
405316
406- // / Send a request to the service server.
407- /* *
408- * This method returns a `FutureAndRequestId` instance
409- * that can be passed to Executor::spin_until_future_complete() to
410- * wait until it has been completed.
411- *
412- * If the future never completes,
413- * e.g. the call to Executor::spin_until_future_complete() times out,
414- * Client::remove_pending_request() must be called to clean the client internal state.
415- * Not doing so will make the `Client` instance to use more memory each time a response is not
416- * received from the service server.
417- *
418- * ```cpp
419- * auto future = client->async_send_request(my_request);
420- * if (
421- * rclcpp::FutureReturnCode::TIMEOUT ==
422- * executor->spin_until_future_complete(future, timeout))
423- * {
424- * client->remove_pending_request(future);
425- * // handle timeout
426- * } else {
427- * handle_response(future.get());
428- * }
429- * ```
430- *
431- * \param[in] request request to be send.
432- * \return a FutureAndRequestId instance.
433- */
434- FutureAndRequestId
317+ SharedFuture
435318 async_send_request (SharedRequest request)
436319 {
437- Promise promise;
438- auto future = promise.get_future ();
439- auto req_id = async_send_request_impl (
440- *request,
441- std::move (promise));
442- return FutureAndRequestId (std::move (future), req_id);
320+ return async_send_request (request, [](SharedFuture) {});
443321 }
444322
445- // / Send a request to the service server and schedule a callback in the executor.
446- /* *
447- * Similar to the previous overload, but a callback will automatically be called when a response is received.
448- *
449- * If the callback is never called, because we never got a reply for the service server, remove_pending_request()
450- * has to be called with the returned request id or prune_pending_requests().
451- * Not doing so will make the `Client` instance use more memory each time a response is not
452- * received from the service server.
453- * In this case, it's convenient to setup a timer to cleanup the pending requests.
454- *
455- * \param[in] request request to be send.
456- * \param[in] cb callback that will be called when we get a response for this request.
457- * \return the request id representing the request just sent.
458- */
459- // TODO(ivanpauno): Link to example that shows how to cleanup requests.
460323 template <
461324 typename CallbackT,
462325 typename std::enable_if<
@@ -466,24 +329,23 @@ class Client : public ClientBase
466329 >::value
467330 >::type * = nullptr
468331 >
469- int64_t
332+ SharedFuture
470333 async_send_request (SharedRequest request, CallbackT && cb)
471334 {
472- return async_send_request_impl (
473- *request,
474- CallbackType{std::forward<CallbackT>(cb)});
335+ std::lock_guard<std::mutex> lock (pending_requests_mutex_);
336+ int64_t sequence_number;
337+ rcl_ret_t ret = rcl_send_request (get_client_handle ().get (), request.get (), &sequence_number);
338+ if (RCL_RET_OK != ret) {
339+ rclcpp::exceptions::throw_from_rcl_error (ret, " failed to send request" );
340+ }
341+
342+ SharedPromise call_promise = std::make_shared<Promise>();
343+ SharedFuture f (call_promise->get_future ());
344+ pending_requests_[sequence_number] =
345+ std::make_tuple (call_promise, std::forward<CallbackType>(cb), f);
346+ return f;
475347 }
476348
477- // / Send a request to the service server and schedule a callback in the executor.
478- /* *
479- * Similar to the previous method, but you can get both the request and response in the callback.
480- *
481- * \param[in] request request to be send.
482- * \param[in] cb callback that will be called when we get a response for this request.
483- * \return the request id representing the request just sent.
484- */
485- // TODO(ivanpauno): Deprecate this.
486- // If someone wants the request they can capture it in the lambda.
487349 template <
488350 typename CallbackT,
489351 typename std::enable_if<
@@ -493,100 +355,28 @@ class Client : public ClientBase
493355 >::value
494356 >::type * = nullptr
495357 >
496- int64_t
358+ SharedFutureWithRequest
497359 async_send_request (SharedRequest request, CallbackT && cb)
498360 {
499- auto & req = *request;
500- return async_send_request_impl (
501- req,
502- std::make_pair (CallbackWithRequestType{std::forward<CallbackT>(cb)}, std::move (request)));
503- }
504-
505- // / Cleanup a pending request.
506- /* *
507- * This notifies the client that we have waited long enough for a response from the server
508- * to come, we have given up and we are not waiting for a response anymore.
509- *
510- * Not calling this will make the client start using more memory for each request
511- * that never got a reply from the server.
512- *
513- * \param[in] request_id request id returned by async_send_request().
514- * \return true when a pending request was removed, false if not (e.g. a response was received).
515- */
516- bool
517- remove_pending_request (int64_t request_id)
518- {
519- std::lock_guard guard (pending_requests_mutex_);
520- return pending_requests_.erase (request_id) != 0u ;
521- }
522-
523- // / Cleanup a pending request.
524- /* *
525- * Convenient overload, same as:
526- *
527- * `Client::remove_pending_request(this, future.get_request_id())`.
528- */
529- bool
530- remove_pending_request (const FutureAndRequestId & future)
531- {
532- return this ->remove_pending_request (future.get_request_id ());
533- }
534-
535- // / Clean all pending requests.
536- /* *
537- * \return number of pending requests that were removed.
538- */
539- size_t
540- prune_requests ()
541- {
542- std::lock_guard guard (pending_requests_mutex_);
543- auto ret = pending_requests_.size ();
544- pending_requests_.clear ();
545- return ret;
546- }
361+ SharedPromiseWithRequest promise = std::make_shared<PromiseWithRequest>();
362+ SharedFutureWithRequest future_with_request (promise->get_future ());
547363
548- protected:
549- using PendingRequestsMapValue = std::variant<
550- std::promise<SharedResponse>,
551- CallbackType,
552- std::pair<CallbackWithRequestType, SharedRequest>>;
364+ auto wrapping_cb = [future_with_request, promise, request,
365+ cb = std::forward<CallbackWithRequestType>(cb)](SharedFuture future) {
366+ auto response = future.get ();
367+ promise->set_value (std::make_pair (request, response));
368+ cb (future_with_request);
369+ };
553370
554- RCLCPP_PUBLIC
555- int64_t
556- async_send_request_impl (const Request & request, PendingRequestsMapValue value)
557- {
558- int64_t sequence_number;
559- rcl_ret_t ret = rcl_send_request (get_client_handle ().get (), &request, &sequence_number);
560- if (RCL_RET_OK != ret) {
561- rclcpp::exceptions::throw_from_rcl_error (ret, " failed to send request" );
562- }
563- {
564- std::lock_guard<std::mutex> lock (pending_requests_mutex_);
565- pending_requests_.try_emplace (sequence_number, std::move (value));
566- }
567- return sequence_number;
568- }
371+ async_send_request (request, wrapping_cb);
569372
570- RCLCPP_PUBLIC
571- std::optional<PendingRequestsMapValue>
572- get_and_erase_pending_request (int64_t request_number)
573- {
574- std::unique_lock<std::mutex> lock (pending_requests_mutex_);
575- auto it = this ->pending_requests_ .find (request_number);
576- if (it == this ->pending_requests_ .end ()) {
577- RCUTILS_LOG_DEBUG_NAMED (
578- " rclcpp" ,
579- " Received invalid sequence number. Ignoring..." );
580- return std::nullopt ;
581- }
582- auto value = std::move (it->second );
583- this ->pending_requests_ .erase (request_number);
584- return value;
373+ return future_with_request;
585374 }
586375
376+ private:
587377 RCLCPP_DISABLE_COPY (Client)
588378
589- std::unordered_map <int64_t , PendingRequestsMapValue > pending_requests_;
379+ std::map <int64_t , std::tuple<SharedPromise, CallbackType, SharedFuture> > pending_requests_;
590380 std::mutex pending_requests_mutex_;
591381};
592382
0 commit comments