Skip to content

Commit bf752c7

Browse files
authored
Updating client API to be able to remove pending requests (#1728)
Signed-off-by: Ivan Santiago Paunovic <ivanpauno@ekumenlabs.com>
1 parent 01f6ebd commit bf752c7

2 files changed

Lines changed: 271 additions & 49 deletions

File tree

rclcpp/include/rclcpp/client.hpp

Lines changed: 257 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717

1818
#include <atomic>
1919
#include <future>
20-
#include <map>
20+
#include <unordered_map>
2121
#include <memory>
22+
#include <optional> // NOLINT, cpplint doesn't think this is a cpp std header
2223
#include <sstream>
2324
#include <string>
2425
#include <tuple>
2526
#include <utility>
27+
#include <variant> // NOLINT
2628

2729
#include "rcl/client.h"
2830
#include "rcl/error_handling.h"
@@ -178,6 +180,9 @@ template<typename ServiceT>
178180
class Client : public ClientBase
179181
{
180182
public:
183+
using Request = typename ServiceT::Request;
184+
using Response = typename ServiceT::Response;
185+
181186
using SharedRequest = typename ServiceT::Request::SharedPtr;
182187
using SharedResponse = typename ServiceT::Response::SharedPtr;
183188

@@ -187,6 +192,7 @@ class Client : public ClientBase
187192
using SharedPromise = std::shared_ptr<Promise>;
188193
using SharedPromiseWithRequest = std::shared_ptr<PromiseWithRequest>;
189194

195+
using Future = std::future<SharedResponse>;
190196
using SharedFuture = std::shared_future<SharedResponse>;
191197
using SharedFutureWithRequest = std::shared_future<std::pair<SharedRequest, SharedResponse>>;
192198

@@ -195,6 +201,88 @@ class Client : public ClientBase
195201

196202
RCLCPP_SMART_PTR_DEFINITIONS(Client)
197203

204+
/// A convenient Client::Future and request id pair.
205+
class FutureAndRequestId
206+
{
207+
public:
208+
FutureAndRequestId(Future impl, int64_t req_id)
209+
: impl_(std::move(impl)), req_id_(req_id)
210+
{}
211+
212+
/// Allow implicit conversions to `std::future` by reference.
213+
// TODO(ivanpauno): Maybe, deprecate this in favor of get_future() (?)
214+
operator Future &() {return impl_;}
215+
216+
/// Deprecated, use take_future() instead.
217+
/**
218+
* Allow implicit conversions to `std::future` by value.
219+
* \deprecated
220+
*/
221+
[[deprecated("FutureAndRequestId: use take_future() instead of an implicit conversion")]]
222+
operator Future() {return impl_;}
223+
224+
/// Returns the internal `std::future`, moving it out.
225+
/**
226+
* After calling this method, the internal future gets invalidated.
227+
*/
228+
Future
229+
take_future() {return impl_;}
230+
231+
/// Getter for the internal future.
232+
Future &
233+
get_future() {return impl_;}
234+
235+
/// Getter for the internal future.
236+
const Future &
237+
get_future() const {return impl_;}
238+
239+
/// Returns the request id associated with this future.
240+
int64_t get_request_id() const {return req_id_;}
241+
242+
// delegate future like methods in the std::future impl_
243+
244+
/// See std::future::share().
245+
SharedFuture share() noexcept {return impl_.share();}
246+
/// See std::future::get().
247+
SharedResponse get() {return impl_.get();}
248+
/// See std::future::valid().
249+
bool valid() const noexcept {return impl_.valid();}
250+
/// See std::future::wait().
251+
void wait() const {return impl_.wait();}
252+
/// See std::future::wait_for().
253+
template<class Rep, class Period>
254+
std::future_status wait_for(
255+
const std::chrono::duration<Rep, Period> & timeout_duration) const
256+
{
257+
return impl_.wait_for(timeout_duration);
258+
}
259+
/// See std::future::wait_until().
260+
template<class Clock, class Duration>
261+
std::future_status wait_until(
262+
const std::chrono::time_point<Clock, Duration> & timeout_time) const
263+
{
264+
return impl_.wait_until(timeout_time);
265+
}
266+
267+
// Rule of five, we could use the rule of zero here, but better be explicit as some of the
268+
// methods are deleted.
269+
270+
/// Move constructor.
271+
FutureAndRequestId(FutureAndRequestId && other) noexcept = default;
272+
/// Deleted copy constructor, each instance is a unique owner of the future.
273+
FutureAndRequestId(const FutureAndRequestId & other) = delete;
274+
/// Move assignment.
275+
FutureAndRequestId & operator=(FutureAndRequestId && other) noexcept = default;
276+
/// Deleted copy assignment, each instance is a unique owner of the future.
277+
FutureAndRequestId & operator=(const FutureAndRequestId & other) = delete;
278+
/// Destructor.
279+
~FutureAndRequestId() = default;
280+
281+
private:
282+
Future impl_;
283+
int64_t req_id_;
284+
};
285+
198286
/// Default constructor.
199287
/**
200288
* The constructor for a Client is almost never called directly.
@@ -292,34 +380,83 @@ class Client : public ClientBase
292380
std::shared_ptr<rmw_request_id_t> request_header,
293381
std::shared_ptr<void> response) override
294382
{
295-
std::unique_lock<std::mutex> lock(pending_requests_mutex_);
296-
auto typed_response = std::static_pointer_cast<typename ServiceT::Response>(response);
297-
int64_t sequence_number = request_header->sequence_number;
298-
// TODO(esteve) this should throw instead since it is not expected to happen in the first place
299-
if (this->pending_requests_.count(sequence_number) == 0) {
300-
RCUTILS_LOG_ERROR_NAMED(
301-
"rclcpp",
302-
"Received invalid sequence number. Ignoring...");
383+
auto opt = this->get_and_erase_pending_request(request_header->sequence_number);
384+
if (!opt) {
303385
return;
304386
}
305-
auto tuple = this->pending_requests_[sequence_number];
306-
auto call_promise = std::get<0>(tuple);
307-
auto callback = std::get<1>(tuple);
308-
auto future = std::get<2>(tuple);
309-
this->pending_requests_.erase(sequence_number);
310-
// Unlock here to allow the service to be called recursively from one of its callbacks.
311-
lock.unlock();
312-
313-
call_promise->set_value(typed_response);
314-
callback(future);
387+
auto & value = *opt;
388+
auto typed_response = std::static_pointer_cast<typename ServiceT::Response>(
389+
std::move(response));
390+
if (std::holds_alternative<Promise>(value)) {
391+
auto & promise = std::get<Promise>(value);
392+
promise.set_value(std::move(typed_response));
393+
} else if (std::holds_alternative<CallbackType>(value)) {
394+
Promise promise;
395+
promise.set_value(std::move(typed_response));
396+
const auto & callback = std::get<CallbackType>(value);
397+
callback(promise.get_future().share());
398+
} else if (std::holds_alternative<std::pair<CallbackWithRequestType, SharedRequest>>(value)) {
399+
PromiseWithRequest promise;
400+
const auto & pair = std::get<std::pair<CallbackWithRequestType, SharedRequest>>(value);
401+
promise.set_value(std::make_pair(std::move(pair.second), std::move(typed_response)));
402+
pair.first(promise.get_future().share());
403+
}
315404
}
316405

317-
SharedFuture
406+
/// Send a request to the service server.
407+
/**
408+
* This method returns a `FutureAndRequestId` instance
409+
* that can be passed to Executor::spin_until_future_complete() to
410+
* wait until it has been completed.
411+
*
412+
* If the future never completes,
413+
* e.g. the call to Executor::spin_until_future_complete() times out,
414+
* Client::remove_pending_request() must be called to clean the client internal state.
415+
* Not doing so will make the `Client` instance to use more memory each time a response is not
416+
* received from the service server.
417+
*
418+
* ```cpp
419+
* auto future = client->async_send_request(my_request);
420+
* if (
421+
* rclcpp::FutureReturnCode::TIMEOUT ==
422+
* executor->spin_until_future_complete(future, timeout))
423+
* {
424+
* client->remove_pending_request(future);
425+
* // handle timeout
426+
* } else {
427+
* handle_response(future.get());
428+
* }
429+
* ```
430+
*
431+
* \param[in] request request to be send.
432+
* \return a FutureAndRequestId instance.
433+
*/
434+
FutureAndRequestId
318435
async_send_request(SharedRequest request)
319436
{
320-
return async_send_request(request, [](SharedFuture) {});
437+
Promise promise;
438+
auto future = promise.get_future();
439+
auto req_id = async_send_request_impl(
440+
*request,
441+
std::move(promise));
442+
return FutureAndRequestId(std::move(future), req_id);
321443
}
322444

445+
/// Send a request to the service server and schedule a callback in the executor.
446+
/**
447+
* Similar to the previous overload, but a callback will automatically be called when a response is received.
448+
*
449+
* If the callback is never called, because we never got a reply for the service server, remove_pending_request()
450+
* has to be called with the returned request id or prune_pending_requests().
451+
* Not doing so will make the `Client` instance use more memory each time a response is not
452+
* received from the service server.
453+
* In this case, it's convenient to setup a timer to cleanup the pending requests.
454+
*
455+
* \param[in] request request to be send.
456+
* \param[in] cb callback that will be called when we get a response for this request.
457+
* \return the request id representing the request just sent.
458+
*/
459+
// TODO(ivanpauno): Link to example that shows how to cleanup requests.
323460
template<
324461
typename CallbackT,
325462
typename std::enable_if<
@@ -329,23 +466,24 @@ class Client : public ClientBase
329466
>::value
330467
>::type * = nullptr
331468
>
332-
SharedFuture
469+
int64_t
333470
async_send_request(SharedRequest request, CallbackT && cb)
334471
{
335-
std::lock_guard<std::mutex> lock(pending_requests_mutex_);
336-
int64_t sequence_number;
337-
rcl_ret_t ret = rcl_send_request(get_client_handle().get(), request.get(), &sequence_number);
338-
if (RCL_RET_OK != ret) {
339-
rclcpp::exceptions::throw_from_rcl_error(ret, "failed to send request");
340-
}
341-
342-
SharedPromise call_promise = std::make_shared<Promise>();
343-
SharedFuture f(call_promise->get_future());
344-
pending_requests_[sequence_number] =
345-
std::make_tuple(call_promise, std::forward<CallbackType>(cb), f);
346-
return f;
472+
return async_send_request_impl(
473+
*request,
474+
CallbackType{std::forward<CallbackT>(cb)});
347475
}
348476

477+
/// Send a request to the service server and schedule a callback in the executor.
478+
/**
479+
* Similar to the previous method, but you can get both the request and response in the callback.
480+
*
481+
* \param[in] request request to be send.
482+
* \param[in] cb callback that will be called when we get a response for this request.
483+
* \return the request id representing the request just sent.
484+
*/
485+
// TODO(ivanpauno): Deprecate this.
486+
// If someone wants the request they can capture it in the lambda.
349487
template<
350488
typename CallbackT,
351489
typename std::enable_if<
@@ -355,28 +493,100 @@ class Client : public ClientBase
355493
>::value
356494
>::type * = nullptr
357495
>
358-
SharedFutureWithRequest
496+
int64_t
359497
async_send_request(SharedRequest request, CallbackT && cb)
360498
{
361-
SharedPromiseWithRequest promise = std::make_shared<PromiseWithRequest>();
362-
SharedFutureWithRequest future_with_request(promise->get_future());
499+
auto & req = *request;
500+
return async_send_request_impl(
501+
req,
502+
std::make_pair(CallbackWithRequestType{std::forward<CallbackT>(cb)}, std::move(request)));
503+
}
504+
505+
/// Cleanup a pending request.
506+
/**
507+
* This notifies the client that we have waited long enough for a response from the server
508+
* to come, we have given up and we are not waiting for a response anymore.
509+
*
510+
* Not calling this will make the client start using more memory for each request
511+
* that never got a reply from the server.
512+
*
513+
* \param[in] request_id request id returned by async_send_request().
514+
* \return true when a pending request was removed, false if not (e.g. a response was received).
515+
*/
516+
bool
517+
remove_pending_request(int64_t request_id)
518+
{
519+
std::lock_guard guard(pending_requests_mutex_);
520+
return pending_requests_.erase(request_id) != 0u;
521+
}
522+
523+
/// Cleanup a pending request.
524+
/**
525+
* Convenient overload, same as:
526+
*
527+
* `Client::remove_pending_request(this, future.get_request_id())`.
528+
*/
529+
bool
530+
remove_pending_request(const FutureAndRequestId & future)
531+
{
532+
return this->remove_pending_request(future.get_request_id());
533+
}
534+
535+
/// Clean all pending requests.
536+
/**
537+
* \return number of pending requests that were removed.
538+
*/
539+
size_t
540+
prune_requests()
541+
{
542+
std::lock_guard guard(pending_requests_mutex_);
543+
auto ret = pending_requests_.size();
544+
pending_requests_.clear();
545+
return ret;
546+
}
363547

364-
auto wrapping_cb = [future_with_request, promise, request,
365-
cb = std::forward<CallbackWithRequestType>(cb)](SharedFuture future) {
366-
auto response = future.get();
367-
promise->set_value(std::make_pair(request, response));
368-
cb(future_with_request);
369-
};
548+
protected:
549+
using PendingRequestsMapValue = std::variant<
550+
std::promise<SharedResponse>,
551+
CallbackType,
552+
std::pair<CallbackWithRequestType, SharedRequest>>;
370553

371-
async_send_request(request, wrapping_cb);
554+
RCLCPP_PUBLIC
555+
int64_t
556+
async_send_request_impl(const Request & request, PendingRequestsMapValue value)
557+
{
558+
int64_t sequence_number;
559+
rcl_ret_t ret = rcl_send_request(get_client_handle().get(), &request, &sequence_number);
560+
if (RCL_RET_OK != ret) {
561+
rclcpp::exceptions::throw_from_rcl_error(ret, "failed to send request");
562+
}
563+
{
564+
std::lock_guard<std::mutex> lock(pending_requests_mutex_);
565+
pending_requests_.try_emplace(sequence_number, std::move(value));
566+
}
567+
return sequence_number;
568+
}
372569

373-
return future_with_request;
570+
RCLCPP_PUBLIC
571+
std::optional<PendingRequestsMapValue>
572+
get_and_erase_pending_request(int64_t request_number)
573+
{
574+
std::unique_lock<std::mutex> lock(pending_requests_mutex_);
575+
auto it = this->pending_requests_.find(request_number);
576+
if (it == this->pending_requests_.end()) {
577+
RCUTILS_LOG_DEBUG_NAMED(
578+
"rclcpp",
579+
"Received invalid sequence number. Ignoring...");
580+
return std::nullopt;
581+
}
582+
auto value = std::move(it->second);
583+
this->pending_requests_.erase(request_number);
584+
return value;
374585
}
375586

376-
private:
377587
RCLCPP_DISABLE_COPY(Client)
378588

379-
std::map<int64_t, std::tuple<SharedPromise, CallbackType, SharedFuture>> pending_requests_;
589+
std::unordered_map<int64_t, PendingRequestsMapValue> pending_requests_;
380590
std::mutex pending_requests_mutex_;
381591
};
382592

0 commit comments

Comments
 (0)