Skip to content

Commit d5f3d35

Browse files
authored
Revert "Updating client API to be able to remove pending requests (#1728)" (#1733)
This reverts commit bf752c7. Signed-off-by: Ivan Santiago Paunovic <ivanpauno@ekumenlabs.com>
1 parent bf752c7 commit d5f3d35

2 files changed

Lines changed: 49 additions & 271 deletions

File tree

rclcpp/include/rclcpp/client.hpp

Lines changed: 47 additions & 257 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,12 @@
1717

1818
#include <atomic>
1919
#include <future>
20-
#include <unordered_map>
20+
#include <map>
2121
#include <memory>
22-
#include <optional> // NOLINT, cpplint doesn't think this is a cpp std header
2322
#include <sstream>
2423
#include <string>
2524
#include <tuple>
2625
#include <utility>
27-
#include <variant> // NOLINT
2826

2927
#include "rcl/client.h"
3028
#include "rcl/error_handling.h"
@@ -180,9 +178,6 @@ template<typename ServiceT>
180178
class Client : public ClientBase
181179
{
182180
public:
183-
using Request = typename ServiceT::Request;
184-
using Response = typename ServiceT::Response;
185-
186181
using SharedRequest = typename ServiceT::Request::SharedPtr;
187182
using SharedResponse = typename ServiceT::Response::SharedPtr;
188183

@@ -192,7 +187,6 @@ class Client : public ClientBase
192187
using SharedPromise = std::shared_ptr<Promise>;
193188
using SharedPromiseWithRequest = std::shared_ptr<PromiseWithRequest>;
194189

195-
using Future = std::future<SharedResponse>;
196190
using SharedFuture = std::shared_future<SharedResponse>;
197191
using SharedFutureWithRequest = std::shared_future<std::pair<SharedRequest, SharedResponse>>;
198192

@@ -201,88 +195,6 @@ class Client : public ClientBase
201195

202196
RCLCPP_SMART_PTR_DEFINITIONS(Client)
203197

204-
/// A convenient Client::Future and request id pair.
205-
class FutureAndRequestId
206-
{
207-
public:
208-
FutureAndRequestId(Future impl, int64_t req_id)
209-
: impl_(std::move(impl)), req_id_(req_id)
210-
{}
211-
212-
/// Allow implicit conversions to `std::future` by reference.
213-
// TODO(ivanpauno): Maybe, deprecate this in favor of get_future() (?)
214-
operator Future &() {return impl_;}
215-
216-
/// Deprecated, use take_future() instead.
217-
/**
218-
* Allow implicit conversions to `std::future` by value.
219-
* \deprecated
220-
*/
221-
[[deprecated("FutureAndRequestId: use take_future() instead of an implicit conversion")]]
222-
operator Future() {return impl_;}
223-
224-
/// Returns the internal `std::future`, moving it out.
225-
/**
226-
* After calling this method, the internal future gets invalidated.
227-
*/
228-
Future
229-
take_future() {return impl_;}
230-
231-
/// Getter for the internal future.
232-
Future &
233-
get_future() {return impl_;}
234-
235-
/// Getter for the internal future.
236-
const Future &
237-
get_future() const {return impl_;}
238-
239-
/// Returns the request id associated with this future.
240-
int64_t get_request_id() const {return req_id_;}
241-
242-
// delegate future like methods in the std::future impl_
243-
244-
/// See std::future::share().
245-
SharedFuture share() noexcept {return impl_.share();}
246-
/// See std::future::get().
247-
SharedResponse get() {return impl_.get();}
248-
/// See std::future::valid().
249-
bool valid() const noexcept {return impl_.valid();}
250-
/// See std::future::wait().
251-
void wait() const {return impl_.wait();}
252-
/// See std::future::wait_for().
253-
template<class Rep, class Period>
254-
std::future_status wait_for(
255-
const std::chrono::duration<Rep, Period> & timeout_duration) const
256-
{
257-
return impl_.wait_for(timeout_duration);
258-
}
259-
/// See std::future::wait_until().
260-
template<class Clock, class Duration>
261-
std::future_status wait_until(
262-
const std::chrono::time_point<Clock, Duration> & timeout_time) const
263-
{
264-
return impl_.wait_until(timeout_time);
265-
}
266-
267-
// Rule of five, we could use the rule of zero here, but better be explicit as some of the
268-
// methods are deleted.
269-
270-
/// Move constructor.
271-
FutureAndRequestId(FutureAndRequestId && other) noexcept = default;
272-
/// Deleted copy constructor, each instance is a unique owner of the future.
273-
FutureAndRequestId(const FutureAndRequestId & other) = delete;
274-
/// Move assignment.
275-
FutureAndRequestId & operator=(FutureAndRequestId && other) noexcept = default;
276-
/// Deleted copy assignment, each instance is a unique owner of the future.
277-
FutureAndRequestId & operator=(const FutureAndRequestId & other) = delete;
278-
/// Destructor.
279-
~FutureAndRequestId() = default;
280-
281-
private:
282-
Future impl_;
283-
int64_t req_id_;
284-
};
285-
286198
/// Default constructor.
287199
/**
288200
* The constructor for a Client is almost never called directly.
@@ -380,83 +292,34 @@ class Client : public ClientBase
380292
std::shared_ptr<rmw_request_id_t> request_header,
381293
std::shared_ptr<void> response) override
382294
{
383-
auto opt = this->get_and_erase_pending_request(request_header->sequence_number);
384-
if (!opt) {
295+
std::unique_lock<std::mutex> lock(pending_requests_mutex_);
296+
auto typed_response = std::static_pointer_cast<typename ServiceT::Response>(response);
297+
int64_t sequence_number = request_header->sequence_number;
298+
// TODO(esteve) this should throw instead since it is not expected to happen in the first place
299+
if (this->pending_requests_.count(sequence_number) == 0) {
300+
RCUTILS_LOG_ERROR_NAMED(
301+
"rclcpp",
302+
"Received invalid sequence number. Ignoring...");
385303
return;
386304
}
387-
auto & value = *opt;
388-
auto typed_response = std::static_pointer_cast<typename ServiceT::Response>(
389-
std::move(response));
390-
if (std::holds_alternative<Promise>(value)) {
391-
auto & promise = std::get<Promise>(value);
392-
promise.set_value(std::move(typed_response));
393-
} else if (std::holds_alternative<CallbackType>(value)) {
394-
Promise promise;
395-
promise.set_value(std::move(typed_response));
396-
const auto & callback = std::get<CallbackType>(value);
397-
callback(promise.get_future().share());
398-
} else if (std::holds_alternative<std::pair<CallbackWithRequestType, SharedRequest>>(value)) {
399-
PromiseWithRequest promise;
400-
const auto & pair = std::get<std::pair<CallbackWithRequestType, SharedRequest>>(value);
401-
promise.set_value(std::make_pair(std::move(pair.second), std::move(typed_response)));
402-
pair.first(promise.get_future().share());
403-
}
305+
auto tuple = this->pending_requests_[sequence_number];
306+
auto call_promise = std::get<0>(tuple);
307+
auto callback = std::get<1>(tuple);
308+
auto future = std::get<2>(tuple);
309+
this->pending_requests_.erase(sequence_number);
310+
// Unlock here to allow the service to be called recursively from one of its callbacks.
311+
lock.unlock();
312+
313+
call_promise->set_value(typed_response);
314+
callback(future);
404315
}
405316

406-
/// Send a request to the service server.
407-
/**
408-
* This method returns a `FutureAndRequestId` instance
409-
* that can be passed to Executor::spin_until_future_complete() to
410-
* wait until it has been completed.
411-
*
412-
* If the future never completes,
413-
* e.g. the call to Executor::spin_until_future_complete() times out,
414-
* Client::remove_pending_request() must be called to clean the client internal state.
415-
* Not doing so will make the `Client` instance to use more memory each time a response is not
416-
* received from the service server.
417-
*
418-
* ```cpp
419-
* auto future = client->async_send_request(my_request);
420-
* if (
421-
* rclcpp::FutureReturnCode::TIMEOUT ==
422-
* executor->spin_until_future_complete(future, timeout))
423-
* {
424-
* client->remove_pending_request(future);
425-
* // handle timeout
426-
* } else {
427-
* handle_response(future.get());
428-
* }
429-
* ```
430-
*
431-
* \param[in] request request to be send.
432-
* \return a FutureAndRequestId instance.
433-
*/
434-
FutureAndRequestId
317+
SharedFuture
435318
async_send_request(SharedRequest request)
436319
{
437-
Promise promise;
438-
auto future = promise.get_future();
439-
auto req_id = async_send_request_impl(
440-
*request,
441-
std::move(promise));
442-
return FutureAndRequestId(std::move(future), req_id);
320+
return async_send_request(request, [](SharedFuture) {});
443321
}
444322

445-
/// Send a request to the service server and schedule a callback in the executor.
446-
/**
447-
* Similar to the previous overload, but a callback will automatically be called when a response is received.
448-
*
449-
* If the callback is never called, because we never got a reply for the service server, remove_pending_request()
450-
* has to be called with the returned request id or prune_pending_requests().
451-
* Not doing so will make the `Client` instance use more memory each time a response is not
452-
* received from the service server.
453-
* In this case, it's convenient to setup a timer to cleanup the pending requests.
454-
*
455-
* \param[in] request request to be send.
456-
* \param[in] cb callback that will be called when we get a response for this request.
457-
* \return the request id representing the request just sent.
458-
*/
459-
// TODO(ivanpauno): Link to example that shows how to cleanup requests.
460323
template<
461324
typename CallbackT,
462325
typename std::enable_if<
@@ -466,24 +329,23 @@ class Client : public ClientBase
466329
>::value
467330
>::type * = nullptr
468331
>
469-
int64_t
332+
SharedFuture
470333
async_send_request(SharedRequest request, CallbackT && cb)
471334
{
472-
return async_send_request_impl(
473-
*request,
474-
CallbackType{std::forward<CallbackT>(cb)});
335+
std::lock_guard<std::mutex> lock(pending_requests_mutex_);
336+
int64_t sequence_number;
337+
rcl_ret_t ret = rcl_send_request(get_client_handle().get(), request.get(), &sequence_number);
338+
if (RCL_RET_OK != ret) {
339+
rclcpp::exceptions::throw_from_rcl_error(ret, "failed to send request");
340+
}
341+
342+
SharedPromise call_promise = std::make_shared<Promise>();
343+
SharedFuture f(call_promise->get_future());
344+
pending_requests_[sequence_number] =
345+
std::make_tuple(call_promise, std::forward<CallbackType>(cb), f);
346+
return f;
475347
}
476348

477-
/// Send a request to the service server and schedule a callback in the executor.
478-
/**
479-
* Similar to the previous method, but you can get both the request and response in the callback.
480-
*
481-
* \param[in] request request to be send.
482-
* \param[in] cb callback that will be called when we get a response for this request.
483-
* \return the request id representing the request just sent.
484-
*/
485-
// TODO(ivanpauno): Deprecate this.
486-
// If someone wants the request they can capture it in the lambda.
487349
template<
488350
typename CallbackT,
489351
typename std::enable_if<
@@ -493,100 +355,28 @@ class Client : public ClientBase
493355
>::value
494356
>::type * = nullptr
495357
>
496-
int64_t
358+
SharedFutureWithRequest
497359
async_send_request(SharedRequest request, CallbackT && cb)
498360
{
499-
auto & req = *request;
500-
return async_send_request_impl(
501-
req,
502-
std::make_pair(CallbackWithRequestType{std::forward<CallbackT>(cb)}, std::move(request)));
503-
}
504-
505-
/// Cleanup a pending request.
506-
/**
507-
* This notifies the client that we have waited long enough for a response from the server
508-
* to come, we have given up and we are not waiting for a response anymore.
509-
*
510-
* Not calling this will make the client start using more memory for each request
511-
* that never got a reply from the server.
512-
*
513-
* \param[in] request_id request id returned by async_send_request().
514-
* \return true when a pending request was removed, false if not (e.g. a response was received).
515-
*/
516-
bool
517-
remove_pending_request(int64_t request_id)
518-
{
519-
std::lock_guard guard(pending_requests_mutex_);
520-
return pending_requests_.erase(request_id) != 0u;
521-
}
522-
523-
/// Cleanup a pending request.
524-
/**
525-
* Convenient overload, same as:
526-
*
527-
* `Client::remove_pending_request(this, future.get_request_id())`.
528-
*/
529-
bool
530-
remove_pending_request(const FutureAndRequestId & future)
531-
{
532-
return this->remove_pending_request(future.get_request_id());
533-
}
534-
535-
/// Clean all pending requests.
536-
/**
537-
* \return number of pending requests that were removed.
538-
*/
539-
size_t
540-
prune_requests()
541-
{
542-
std::lock_guard guard(pending_requests_mutex_);
543-
auto ret = pending_requests_.size();
544-
pending_requests_.clear();
545-
return ret;
546-
}
361+
SharedPromiseWithRequest promise = std::make_shared<PromiseWithRequest>();
362+
SharedFutureWithRequest future_with_request(promise->get_future());
547363

548-
protected:
549-
using PendingRequestsMapValue = std::variant<
550-
std::promise<SharedResponse>,
551-
CallbackType,
552-
std::pair<CallbackWithRequestType, SharedRequest>>;
364+
auto wrapping_cb = [future_with_request, promise, request,
365+
cb = std::forward<CallbackWithRequestType>(cb)](SharedFuture future) {
366+
auto response = future.get();
367+
promise->set_value(std::make_pair(request, response));
368+
cb(future_with_request);
369+
};
553370

554-
RCLCPP_PUBLIC
555-
int64_t
556-
async_send_request_impl(const Request & request, PendingRequestsMapValue value)
557-
{
558-
int64_t sequence_number;
559-
rcl_ret_t ret = rcl_send_request(get_client_handle().get(), &request, &sequence_number);
560-
if (RCL_RET_OK != ret) {
561-
rclcpp::exceptions::throw_from_rcl_error(ret, "failed to send request");
562-
}
563-
{
564-
std::lock_guard<std::mutex> lock(pending_requests_mutex_);
565-
pending_requests_.try_emplace(sequence_number, std::move(value));
566-
}
567-
return sequence_number;
568-
}
371+
async_send_request(request, wrapping_cb);
569372

570-
RCLCPP_PUBLIC
571-
std::optional<PendingRequestsMapValue>
572-
get_and_erase_pending_request(int64_t request_number)
573-
{
574-
std::unique_lock<std::mutex> lock(pending_requests_mutex_);
575-
auto it = this->pending_requests_.find(request_number);
576-
if (it == this->pending_requests_.end()) {
577-
RCUTILS_LOG_DEBUG_NAMED(
578-
"rclcpp",
579-
"Received invalid sequence number. Ignoring...");
580-
return std::nullopt;
581-
}
582-
auto value = std::move(it->second);
583-
this->pending_requests_.erase(request_number);
584-
return value;
373+
return future_with_request;
585374
}
586375

376+
private:
587377
RCLCPP_DISABLE_COPY(Client)
588378

589-
std::unordered_map<int64_t, PendingRequestsMapValue> pending_requests_;
379+
std::map<int64_t, std::tuple<SharedPromise, CallbackType, SharedFuture>> pending_requests_;
590380
std::mutex pending_requests_mutex_;
591381
};
592382

0 commit comments

Comments
 (0)