Skip to content

grpc: per-silo shared completion queues for Google gRPC client library.#2527

Merged
htuch merged 20 commits intoenvoyproxy:masterfrom
htuch:grpc-tls
Feb 12, 2018
Merged

grpc: per-silo shared completion queues for Google gRPC client library.#2527
htuch merged 20 commits intoenvoyproxy:masterfrom
htuch:grpc-tls

Conversation

@htuch
Copy link
Copy Markdown
Member

@htuch htuch commented Feb 4, 2018

Previously, we had one thread per stream, which is terrible for
performance but simple. In this patch, a per-TLS thread completion
thread is setup by the AsyncClientManager and shared amongst all clients
(and streams). There is now somewhat complex cross-thread shared state
(including some narrow locked sections) to deal with stream shutdown
during inflight operations.

Risk Level: Low (only Google gRPC client impacted, not in prod use).
Testing: existing tests, with --runs_per_test=1000 for {grpc_client_integration_test,ads_integration_test,metrics_service_integration_Test}. Also added some new unit tests for Google gRPC client call creation in google_async_client_impl_test and client destruction in grpc_client_integration_test.

Signed-off-by: Harvey Tuch htuch@google.com

Previously, we had one thread per stream, which is terrible for
performance but simple. In this patch, a per-TLS thread completion
thread is setup by the AsyncClientManager and shared amongst all clients
(and streams). There is now somewhat complex cross-thread shared state
(including some narrow locked sections) to deal with stream shutdown
during inflight operations.

Risk Level: Low (only Google gRPC client impacted, not in prod use).
Testing: existing tests only so far, will add new tests once design
  level issues worked through.

Signed-off-by: Harvey Tuch <htuch@google.com>
@htuch
Copy link
Copy Markdown
Member Author

htuch commented Feb 4, 2018

@mattklein123 this is WiP, I need to sort out some teardown issues first (will consult @vjpai). Meanwhile, can you take a look at the locking that's now happening between handleOpCompletion() on the silo threads and completionThread and LMK how sad it makes you.

I would ideally like to have this lock-free, and we can probably do that with atomics (and probably memory barriers), but wanted to start with something that would allow us to reason about correctness easily.

BTW, I plan to add the missing gRPC coverage in this PR also when I get around to writing new tests once the basic approach is finalized.

ENVOY_LOG(debug, "completionThread running");
void* tag;
bool ok;
while (cq_.Next(&tag, &ok)) {
Copy link
Copy Markdown
Member Author

@htuch htuch Feb 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vjpai this is the one thread per core (approximately) addition to the Envoy Google gRPC client. I've hit some issues that I'd like to get your thoughts on. As implemented, when I execute with --runs_per_test=1000, I get occasional crashes in cq_.Next().

Previously, each stream had its own completion thread, which was simple but slow. When the stream was destroyed, it would join the completion thread.

Now, we have one completion thread that is shared by many streams. When the stream is destroyed, it sets a draining_cq_ flag to prevent further completions and issues a TryCancel() on it client context. However, it seems that some in-flight ops for the stream may still be occurring in Next, since TryCancel is only best effort. These depend on stats in the stream's AsyncReaderWriter, hence we have use-after-free.

The underlying issue here is that when an op is issued via AsyncReaderWriter, it's unclear (to me at least) what will happen following a TryCancel(). Will we always receive a completion via Next (success or failure) for every pending read/write/final op? In that case, we could reference count pending ops per stream and solve this by deferring the stream deletion until this is zero. Anecdotally, looking at logs, it didn't seem that this is not the case, hence the current implementation, where we expect some in-flights ops to disappear, and some to come back as success/fail via Next.

Any clarification on the above would be helpful, as would recommendations on the best practices for destruction of stream objects that have their completion queue outlive them.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the first issue, our contract is that every tag in leads to a tag out, so even if a ctx gets TryCancel'ed, all of its tags are still supposed to post on Next. If this isn't happening, please let us know so that we can call it a bug.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the more general issue, I think I see an issue that I should have raised previously. For many classes of multithreaded code (essentially any code where the CQ thread could be different from the op-invoking thread), you shouldn't use stub_->Call . We really consider that a deprecated method since it is error prone. The problem is that that can cause things to start happening before rw_ is fully constructed. If a tag for this struct posts, then you might be in trouble interpreting this. Even a hero programmer (and I can tell you a name offline) found the need to use CVs to protect against this.

Instead, you should use rw_ = stub_->PrepareCall(....) with all the arguments except for the tag and then in the next line do rw_->StartCall(TAG).

Copy link
Copy Markdown
Member

@mattklein123 mattklein123 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LMK how sad it makes you.

This whole situation makes me very sad, but I'm not sure there is much we can do about that right now. Seems fine to start out with.

ThreadLocal::Instance& tls)
: cm_(cm), tls_(tls) {}
: cm_(cm), tls_(tls) {
#ifdef ENVOY_GOOGLE_GRPC
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should probably split into two different concrete implementations at this point. Not super crazy abotu the ifdefs and things like google_tls_slot, etc. I think as the logic gets more and more complicated it will be easier to reason about to just split them now?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem there is that we can dynamically switch between using the two different gRPC client libraries in the API. Right now, the AsyncClientManager hides this, you just feed it config. So, if we want to split implementation, we'd need to export out from ClusterManager a getter for both of them and we'll still need a function to figure out from the config which one to use. I can do this, but it feels a bit cleaner to encapsulate all the multi-client logic in the single module here.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah OK. Alright that's fine.

// posting, it might post a new completion after cleanup() is invoked. So, we
// need to share state here to allow the completion thread to avoid doing
// this.
bool draining_cq_{}; // Guarded by cq_lock.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the clang thread annotations potentially?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to do this. Do you know if the Clang Mac OS X issues that we previously hit are still a thing?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only reason I had an issue was due to using C++14 R/W lock . I think you should probably be fine.

ENVOY_LOG(debug, "completionThread running");
void* tag;
bool ok;
while (cq_.Next(&tag, &ok)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the first issue, our contract is that every tag in leads to a tag out, so even if a ctx gets TryCancel'ed, all of its tags are still supposed to post on Next. If this isn't happening, please let us know so that we can call it a bug.

ENVOY_LOG(debug, "completionThread running");
void* tag;
bool ok;
while (cq_.Next(&tag, &ok)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the more general issue, I think I see an issue that I should have raised previously. For many classes of multithreaded code (essentially any code where the CQ thread could be different from the op-invoking thread), you shouldn't use stub_->Call . We really consider that a deprecated method since it is error prone. The problem is that that can cause things to start happening before rw_ is fully constructed. If a tag for this struct posts, then you might be in trouble interpreting this. Even a hero programmer (and I can tell you a name offline) found the need to use CVs to protect against this.

Instead, you should use rw_ = stub_->PrepareCall(....) with all the arguments except for the tag and then in the next line do rw_->StartCall(TAG).

@htuch
Copy link
Copy Markdown
Member Author

htuch commented Feb 8, 2018

@vjpai I've incorporated your feedback, PTAL for gRPC library sanity if you get a chance. Thanks.

// Note, we expect that we're not in a dispatcher loop here, and participating
// in global teardown.
if (!streams_.empty()) {
dispatcher_.run(Event::Dispatcher::RunType::NonBlock);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattklein123 I couldn't find a better way to do this off-the-to-of-my-head. Basically, we have this situation:

  1. Per-silo TLS for GoogleAsyncClient is being torn down during server exit, as the slot gets blown away. We're on a given silo and issue resetStream() to the various streams still in existence.
  2. Streams may have in-flight ops on their CQ thread that is buddy to the silo thread.
  3. We shutdown the CQ thread and join it. There may still be in-flight completions that the CQ thread has posted back to the silo thread.
  4. To avoid leaking memory (which gets picked up in tests) and bound lifetime of the stream cleanup, we need to execute some dispatcher events on the loop to cleanup up these inflight completions on the dispatcher.

The solution here is a bit weird, we let the dispatcher run a bit. However, this is likely happening after the usual dispatcher main loop finish for a worker, so seems like there might be dragons, and it's kind of special snowflake behavior.

We can maybe chat today when you visit Google a bit about this.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Harvey and I just talked about this IRL. I think the plan is to not use dispatcher post for this but to implement posting functionality directly in this code which will make cleanup easier.

htuch added 9 commits February 8, 2018 17:12
Signed-off-by: Harvey Tuch <htuch@google.com>
Signed-off-by: Harvey Tuch <htuch@google.com>
Signed-off-by: Harvey Tuch <htuch@google.com>
Signed-off-by: Harvey Tuch <htuch@google.com>
Signed-off-by: Harvey Tuch <htuch@google.com>
Signed-off-by: Harvey Tuch <htuch@google.com>
Signed-off-by: Harvey Tuch <htuch@google.com>
@htuch
Copy link
Copy Markdown
Member Author

htuch commented Feb 9, 2018

@mattklein123 @vjpai this is now ready for final review.

@htuch
Copy link
Copy Markdown
Member Author

htuch commented Feb 9, 2018

@mattklein123 @zuercher FWIW, GUARDED_BY is still broken on OS X, see https://circleci.com/gh/envoyproxy/envoy/25147?utm_campaign=vcs-integration-link&utm_medium=referral&utm_source=github-build-link. TF also had this issue, tensorflow/tensorflow#955, I don't think the warnings caused failures for them (?).

This reverts commit 0bd36f9.

Signed-off-by: Harvey Tuch <htuch@google.com>
Copy link
Copy Markdown
Contributor

@vjpai vjpai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good from a gRPC usage pov. I am interested in the particular mechanism choice for delivering completed ops and will have to ping you offline for context on this; I found that quite interesting.

@htuch
Copy link
Copy Markdown
Member Author

htuch commented Feb 9, 2018

@dnoe can you take a pass on this?

@dnoe
Copy link
Copy Markdown
Contributor

dnoe commented Feb 9, 2018

Will take a look.

@zuercher
Copy link
Copy Markdown
Member

zuercher commented Feb 9, 2018

I think the reason GUARDED_BY didn't fail the TF builds is that they don't have -Werror set. So they got a lot of warnings.

I think if you had

LOCKABLE std::mutex completed_ops_lock_;

then GUARDED_BY should work. You'll also need to tag locks on the mutex with SCOPED_LOCKABLE, e.g.:

SCOPED_LOCKABLE std::unique_lock<std::mutex> lock(stream.completed_ops_lock_);

These could be simplified with some typedefs. for the mutex and unique_lock.

@mattklein123
Copy link
Copy Markdown
Member

Another option if we can't get it to work on OSX is to just have a private include for thread_guards.h which defines some things and then makes them nothing for OSX.

Copy link
Copy Markdown
Member

@mattklein123 mattklein123 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't go through in a super high level of detail, but generally looks sane to me. I left a few random comments.


GoogleAsyncStreamImpl::~GoogleAsyncStreamImpl() { ASSERT(rw_ == nullptr); }
GoogleAsyncStreamImpl::~GoogleAsyncStreamImpl() {
ENVOY_LOG(debug, "GoogleAsyncStreamImpl destruct");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not sure if it's worth leaving this debug print here or not. Up to you.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's been fairly useful when chasing down races, on balance I think it's better to leave it.

// Ignore op completions while CQ is shutting down.
if (cq_shutdown_in_progress_) {
void GoogleAsyncStreamImpl::onCompletedOp() {
std::unique_lock<std::mutex> lock(completed_ops_lock_);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to drop the lock when calling handleOpCompletion()? Or does it not matter that much for contention?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good point, should release it.

ENVOY_LOG(trace, "completionThread CQ event {} {}", op, ok);
std::unique_lock<std::mutex> lock(stream.completed_ops_lock_);
stream.completed_ops_.emplace_back(op, ok);
stream.dispatcher_.post([&stream] { stream.onCompletedOp(); });
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For perf reasons, you might want to only post if the number of completed ops is going from 0 -> 1, though from below it seems like you only want to process 1 at a time. I'm not exactly sure why that is, but might be worth throwing in a TODO here for future perf follow ups. (You could just keep track of number of per silo pending events, and post into that queue at the thread level).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was actually the original design. Unfortunately, if you batch process completed_ops_, rather than have a 1:1 onCompleteOp association with completed_ops_, you get into a race, where the streams may have processed the batched ops and then destroyed itself (when inflight ops hits zero), and then a dangling dispatcher post is executed and tries to access the freed stream.

Copy link
Copy Markdown
Member Author

@htuch htuch Feb 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, if we maintain the invariant that there is at most one post pending when the queue is non-empty, I think this could be safe. Will see if this holds in a bit.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what's written here is safe, while not the most efficient. I'd prefer a TODO now rather than complicating this PR further. It would be easier to reason about the improved behavior separately.

@htuch
Copy link
Copy Markdown
Member Author

htuch commented Feb 9, 2018

@zuercher @mattklein123 thanks for the ideas, let's handle the thread annotation issue separately in #2571, since I don't want to conflate with this already large PR.

@htuch
Copy link
Copy Markdown
Member Author

htuch commented Feb 9, 2018

This is ready for another pass, I didn't remove locking of handleOpCompletion, since I wanted to ensure we atomically handle an entire post set of op completions before another one is possible, per the invariant.

// This is also required to satisfy the contract that once Shutdown is called,
// streams no longer queue any additional tags.
for (auto it = streams_.begin(); it != streams_.end();) {
(*it++)->resetStream();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason why you haven't written this as (*it)->resetStream() and done the iterator increment in the for loop params?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment; it has to do with resetStream() performing a potential erase(), which would invalidate the current iterator.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome. I've fixed at least one bug of that sort in Envoy, so I appreciate both that you got it right and the comment.

ENVOY_LOG(trace, "completionThread CQ event {} {}", op, ok);
std::unique_lock<std::mutex> lock(stream.completed_ops_lock_);
stream.completed_ops_.emplace_back(op, ok);
stream.dispatcher_.post([&stream] { stream.onCompletedOp(); });
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what's written here is safe, while not the most efficient. I'd prefer a TODO now rather than complicating this PR further. It would be easier to reason about the improved behavior separately.

void GoogleAsyncStreamImpl::deferredDelete() {
ENVOY_LOG(debug, "Deferred delete");
tls_.unregisterStream(this);
dispatcher_.deferredDelete(std::unique_ptr<GoogleAsyncStreamImpl>(this));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::unique_ptr<FooType>(this) looks really wrong. Are we confident nothing else can hold a pointer or reference to this object? A comment here would make it look less shocking.

Copy link
Copy Markdown
Contributor

@dnoe dnoe Feb 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seem to be cases in the Envoy code base where there is already a unique_ptr<GoogleAsyncStreamImpl>. Am I missing something or couldn't this cause a double free?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment; we only get here after cleanup(), which hands the object self-ownership of memory. After this method completes, nothing should be invoked on the stream, it's dead.

// It's an invariant that there must only be one pending post for arbitrary
// length completed_ops_, otherwise we can race in stream destruction, where
// we process multiple events in onCompletedOps() but have only partially
// consumed the posts on the dispatcher.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only problem I see with this approach is that it isn't compatible with limiting the number of completed ops processed in the dispatcher post before relinquishing control back to the dispatcher, which could lead to starving the silo thread from processing other events if there are a huge number of completed ops. I think that's OK for now.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, fair point. I've added a TODO, we can be smarter about bounding completion run sizes in the future if this does manifest as an issue.

GoogleAsyncClientImpl& parent_;
GoogleAsyncClientThreadLocal& tls_;
// Latch our own version of this reference, so that completionThread() doesn't
// try and access via parent_, which might not exist in teardown.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During tear down is the dispatcher itself guaranteed to still be valid? What protects against this case?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment; dispatchers outlive threads in the server loop.

uint32_t inflight_tags_{};
// Queue of completed (op, ok) passed from completionThread() to
// handleOpCompletion().
std::list<std::pair<GoogleAsyncTag::Operation, bool>> completed_ops_;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you consider an std::deque for this?

// This is also required to satisfy the contract that once Shutdown is called,
// streams no longer queue any additional tags.
for (auto it = streams_.begin(); it != streams_.end();) {
(*it++)->resetStream();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome. I've fixed at least one bug of that sort in Envoy, so I appreciate both that you got it right and the comment.

@htuch htuch merged commit 104219c into envoyproxy:master Feb 12, 2018
@htuch htuch deleted the grpc-tls branch February 12, 2018 14:49
Shikugawa pushed a commit to Shikugawa/envoy that referenced this pull request Mar 28, 2020
* add response flag

* update stats plugin

* comment and format

* response flag

* license
jpsim added a commit that referenced this pull request Nov 28, 2022
Follow-up on envoyproxy/envoy-mobile#2526

Some instances were missed.

Signed-off-by: JP Simard <jp@jpsim.com>
jpsim added a commit that referenced this pull request Nov 29, 2022
Follow-up on envoyproxy/envoy-mobile#2526

Some instances were missed.

Signed-off-by: JP Simard <jp@jpsim.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants