-
Notifications
You must be signed in to change notification settings - Fork 5.4k
Expand file tree
/
Copy pathgoogle_async_client_impl.cc
More file actions
571 lines (522 loc) · 24.3 KB
/
google_async_client_impl.cc
File metadata and controls
571 lines (522 loc) · 24.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
#include "source/common/grpc/google_async_client_impl.h"
#include "envoy/common/time.h"
#include "envoy/config/core/v3/grpc_service.pb.h"
#include "envoy/http/protocol.h"
#include "envoy/stats/scope.h"
#include "source/common/common/base64.h"
#include "source/common/common/empty_string.h"
#include "source/common/common/lock_guard.h"
#include "source/common/common/utility.h"
#include "source/common/config/datasource.h"
#include "source/common/grpc/common.h"
#include "source/common/grpc/google_grpc_creds_impl.h"
#include "source/common/grpc/google_grpc_utils.h"
#include "source/common/router/header_parser.h"
#include "source/common/tracing/http_tracer_impl.h"
#include "absl/strings/str_cat.h"
#include "grpcpp/support/proto_buffer_reader.h"
namespace Envoy {
namespace Grpc {
namespace {
static constexpr int DefaultBufferLimitBytes = 1024 * 1024;
}
GoogleAsyncClientThreadLocal::GoogleAsyncClientThreadLocal(Api::Api& api)
: completion_thread_(api.threadFactory().createThread([this] { completionThread(); },
Thread::Options{"GrpcGoogClient"})) {}
GoogleAsyncClientThreadLocal::~GoogleAsyncClientThreadLocal() {
// Force streams to shutdown and invoke TryCancel() to start the drain of
// pending op. If we don't do this, Shutdown() below can jam on pending ops.
// This is also required to satisfy the contract that once Shutdown is called,
// streams no longer queue any additional tags.
for (auto it = streams_.begin(); it != streams_.end();) {
// resetStream() may result in immediate unregisterStream() and erase(),
// which would invalidate the iterator for the current element, so make sure
// we point to the next one first.
(*it++)->resetStream();
}
cq_.Shutdown();
ENVOY_LOG(debug, "Joining completionThread");
completion_thread_->join();
ENVOY_LOG(debug, "Joined completionThread");
// Ensure that we have cleaned up all orphan streams, now that CQ is gone.
while (!streams_.empty()) {
(*streams_.begin())->onCompletedOps();
}
}
void GoogleAsyncClientThreadLocal::completionThread() {
ENVOY_LOG(debug, "completionThread running");
void* tag;
bool ok;
while (cq_.Next(&tag, &ok)) {
const auto& google_async_tag = *reinterpret_cast<GoogleAsyncTag*>(tag);
const GoogleAsyncTag::Operation op = google_async_tag.op_;
GoogleAsyncStreamImpl& stream = google_async_tag.stream_;
ENVOY_LOG(trace, "completionThread CQ event {} {}", static_cast<int>(op), ok);
Thread::LockGuard lock(stream.completed_ops_lock_);
// It's an invariant that there must only be one pending post for arbitrary
// length completed_ops_, otherwise we can race in stream destruction, where
// we process multiple events in onCompletedOps() but have only partially
// consumed the posts on the dispatcher.
// TODO(htuch): This may result in unbounded processing on the silo thread
// in onCompletedOps() in extreme cases, when we emplace_back() in
// completionThread() at a high rate, consider bounding the length of such
// sequences if this behavior becomes an issue.
if (stream.completed_ops_.empty()) {
stream.dispatcher_.post([&stream] { stream.onCompletedOps(); });
}
stream.completed_ops_.emplace_back(op, ok);
}
ENVOY_LOG(debug, "completionThread exiting");
}
GoogleAsyncClientImpl::GoogleAsyncClientImpl(Event::Dispatcher& dispatcher,
GoogleAsyncClientThreadLocal& tls,
GoogleStubFactory& stub_factory,
Stats::ScopeSharedPtr scope,
const envoy::config::core::v3::GrpcService& config,
Server::Configuration::CommonFactoryContext& context,
const StatNames& stat_names)
: dispatcher_(dispatcher), tls_(tls), stat_prefix_(config.google_grpc().stat_prefix()),
target_uri_(config.google_grpc().target_uri()), scope_(scope),
per_stream_buffer_limit_bytes_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
config.google_grpc(), per_stream_buffer_limit_bytes, DefaultBufferLimitBytes)),
metadata_parser_(THROW_OR_RETURN_VALUE(
Router::HeaderParser::configure(
config.initial_metadata(),
envoy::config::core::v3::HeaderValueOption::OVERWRITE_IF_EXISTS_OR_ADD),
Router::HeaderParserPtr)) {
// We rebuild the channel each time we construct the channel. It appears that the gRPC library is
// smart enough to do connection pooling and reuse with identical channel args, so this should
// have comparable overhead to what we are doing in Grpc::AsyncClientImpl, i.e. no expensive
// new connection implied.
std::shared_ptr<grpc::Channel> channel = GoogleGrpcUtils::createChannel(config, context);
// Get state with try_to_connect = true to try connection at channel creation.
// This is for initializing gRPC channel at channel creation. This GetState(true) is used to poke
// the gRPC lb at channel creation, it doesn't have any effect no matter it succeeds or fails. But
// it helps on initialization. Otherwise, the channel establishment still happens at the first
// request, no matter when we create the channel.
channel->GetState(true);
stub_ = stub_factory.createStub(channel);
scope_->counterFromStatName(stat_names.google_grpc_client_creation_).inc();
// Initialize client stats.
// TODO(jmarantz): Capture these names in async_client_manager_impl.cc and
// pass in a struct of StatName objects so we don't have to take locks here.
stats_.streams_total_ = &scope_->counterFromStatName(stat_names.streams_total_);
for (uint32_t i = 0; i <= Status::WellKnownGrpcStatus::MaximumKnown; ++i) {
stats_.streams_closed_[i] = &scope_->counterFromStatName(stat_names.streams_closed_[i]);
}
}
GoogleAsyncClientImpl::~GoogleAsyncClientImpl() {
ASSERT(isThreadSafe());
ENVOY_LOG(debug, "Client teardown, resetting streams");
while (!active_streams_.empty()) {
active_streams_.front()->resetStream();
}
}
AsyncRequest* GoogleAsyncClientImpl::sendRaw(absl::string_view service_full_name,
absl::string_view method_name,
Buffer::InstancePtr&& request,
RawAsyncRequestCallbacks& callbacks,
Tracing::Span& parent_span,
const Http::AsyncClient::RequestOptions& options) {
ASSERT(isThreadSafe());
auto* const async_request = new GoogleAsyncRequestImpl(
*this, service_full_name, method_name, std::move(request), callbacks, parent_span, options);
GoogleAsyncStreamImplPtr grpc_stream{async_request};
grpc_stream->initialize(true);
if (grpc_stream->callFailed()) {
return nullptr;
}
LinkedList::moveIntoList(std::move(grpc_stream), active_streams_);
return async_request;
}
RawAsyncStream* GoogleAsyncClientImpl::startRaw(absl::string_view service_full_name,
absl::string_view method_name,
RawAsyncStreamCallbacks& callbacks,
const Http::AsyncClient::StreamOptions& options) {
ASSERT(isThreadSafe());
auto grpc_stream = std::make_unique<GoogleAsyncStreamImpl>(*this, service_full_name, method_name,
callbacks, options);
grpc_stream->initialize(false);
if (grpc_stream->callFailed()) {
return nullptr;
}
LinkedList::moveIntoList(std::move(grpc_stream), active_streams_);
return active_streams_.front().get();
}
GoogleAsyncStreamImpl::GoogleAsyncStreamImpl(GoogleAsyncClientImpl& parent,
absl::string_view service_full_name,
absl::string_view method_name,
RawAsyncStreamCallbacks& callbacks,
const Http::AsyncClient::StreamOptions& options)
: parent_(parent), tls_(parent_.tls_), dispatcher_(parent_.dispatcher_), stub_(parent_.stub_),
service_full_name_(service_full_name), method_name_(method_name), callbacks_(callbacks),
options_(options), unused_stream_info_(Http::Protocol::Http2, dispatcher_.timeSource(),
Network::ConnectionInfoProviderSharedPtr{},
StreamInfo::FilterState::LifeSpan::FilterChain) {
// TODO(cainelli): add a common library for tracing tags between gRPC implementations.
if (options.parent_span_ != nullptr) {
const std::string child_span_name =
options.child_span_name_.empty()
? absl::StrCat("async ", service_full_name, ".", method_name, " egress")
: options.child_span_name_;
current_span_ = options.parent_span_->spawnChild(Tracing::EgressConfig::get(), child_span_name,
parent.timeSource().systemTime());
current_span_->setTag(Tracing::Tags::get().UpstreamCluster, parent.stat_prefix_);
current_span_->setTag(Tracing::Tags::get().UpstreamAddress, parent.target_uri_);
current_span_->setTag(Tracing::Tags::get().Component, Tracing::Tags::get().Proxy);
} else {
current_span_ = std::make_unique<Tracing::NullSpan>();
}
if (options.sampled_.has_value()) {
current_span_->setSampled(options.sampled_.value());
}
}
GoogleAsyncStreamImpl::~GoogleAsyncStreamImpl() {
ENVOY_LOG(debug, "GoogleAsyncStreamImpl destruct");
if (options_.on_delete_callback_for_test_only) {
options_.on_delete_callback_for_test_only();
}
}
GoogleAsyncStreamImpl::PendingMessage::PendingMessage(Buffer::InstancePtr request, bool end_stream)
: buf_(GoogleGrpcUtils::makeByteBuffer(std::move(request))), end_stream_(end_stream) {}
// TODO(htuch): figure out how to propagate "this request should be buffered for
// retry" bit to Google gRPC library.
void GoogleAsyncStreamImpl::initialize(bool /*buffer_body_for_retry*/) {
parent_.stats_.streams_total_->inc();
gpr_timespec abs_deadline =
options_.timeout
? gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_millis(options_.timeout.value().count(), GPR_TIMESPAN))
: gpr_inf_future(GPR_CLOCK_REALTIME);
ctxt_.set_deadline(abs_deadline);
// Fill service-wide initial metadata.
auto initial_metadata = Http::RequestHeaderMapImpl::create();
// TODO(cpakulski): Find a better way to access requestHeaders
// request headers should not be stored in stream_info.
// Maybe put it to parent_context?
parent_.metadata_parser_->evaluateHeaders(*initial_metadata, options_.parent_context.stream_info);
Tracing::HttpTraceContext trace_context(*initial_metadata);
Tracing::UpstreamContext upstream_context(nullptr, // host_
nullptr, // cluster_
Tracing::ServiceType::GoogleGrpc, // service_type_
true // async_client_span_
);
current_span_->injectContext(trace_context, upstream_context);
callbacks_.onCreateInitialMetadata(*initial_metadata);
initial_metadata->iterate([this](const Http::HeaderEntry& header) {
ctxt_.AddMetadata(std::string(header.key().getStringView()),
std::string(header.value().getStringView()));
return Http::HeaderMap::Iterate::Continue;
});
// Invoke stub call.
rw_ = parent_.stub_->PrepareCall(&ctxt_, "/" + service_full_name_ + "/" + method_name_,
&parent_.tls_.completionQueue());
if (rw_ == nullptr) {
notifyRemoteClose(Status::WellKnownGrpcStatus::Unavailable, nullptr, EMPTY_STRING);
call_failed_ = true;
return;
}
parent_.tls_.registerStream(this);
rw_->StartCall(&init_tag_);
++inflight_tags_;
}
void GoogleAsyncStreamImpl::notifyRemoteClose(Status::GrpcStatus grpc_status,
Http::ResponseTrailerMapPtr trailing_metadata,
const std::string& message) {
if (grpc_status > Status::WellKnownGrpcStatus::MaximumKnown || grpc_status < 0) {
ENVOY_LOG(error, "notifyRemoteClose invalid gRPC status code {}", grpc_status);
// Set the grpc_status as InvalidCode but increment the Unknown stream to avoid out-of-range
// crash..
grpc_status = Status::WellKnownGrpcStatus::InvalidCode;
parent_.stats_.streams_closed_[Status::WellKnownGrpcStatus::Unknown]->inc();
} else {
parent_.stats_.streams_closed_[grpc_status]->inc();
}
ENVOY_LOG(debug, "notifyRemoteClose {} {}", grpc_status, message);
current_span_->setTag(Tracing::Tags::get().GrpcStatusCode, std::to_string(grpc_status));
if (grpc_status != Status::WellKnownGrpcStatus::Ok) {
current_span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True);
}
current_span_->finishSpan();
if (!waiting_to_delete_on_remote_close_) {
callbacks_.onReceiveTrailingMetadata(
trailing_metadata ? std::move(trailing_metadata) : Http::ResponseTrailerMapImpl::create());
callbacks_.onRemoteClose(grpc_status, message);
}
}
void GoogleAsyncStreamImpl::sendMessageRaw(Buffer::InstancePtr&& request, bool end_stream) {
write_pending_queue_.emplace(std::move(request), end_stream);
ENVOY_LOG(trace, "Queued message to write ({} bytes)",
write_pending_queue_.back().buf_.value().Length());
bytes_in_write_pending_queue_ += write_pending_queue_.back().buf_.value().Length();
writeQueued();
}
void GoogleAsyncStreamImpl::closeStream() {
// Empty EOS write queued.
current_span_->setTag(Tracing::Tags::get().Status, Tracing::Tags::get().Canceled);
current_span_->finishSpan();
write_pending_queue_.emplace();
writeQueued();
}
void GoogleAsyncStreamImpl::resetStream() {
ENVOY_LOG(debug, "resetStream");
// The gRPC API requires calling Finish() at the end of a stream, even
// if the stream is cancelled.
if (!finish_pending_) {
finish_pending_ = true;
rw_->Finish(&status_, &finish_tag_);
++inflight_tags_;
}
cleanup();
}
void GoogleAsyncStreamImpl::waitForRemoteCloseAndDelete() {
if (!waiting_to_delete_on_remote_close_) {
waiting_to_delete_on_remote_close_ = true;
remote_close_timer_ = dispatcher_.createTimer([this] { resetStream(); });
remote_close_timer_->enableTimer(options_.remote_close_timeout);
}
}
void GoogleAsyncStreamImpl::writeQueued() {
if (!call_initialized_ || finish_pending_ || write_pending_ || write_pending_queue_.empty() ||
draining_cq_) {
return;
}
write_pending_ = true;
const PendingMessage& msg = write_pending_queue_.front();
if (!msg.buf_) {
ASSERT(msg.end_stream_);
rw_->WritesDone(&write_last_tag_);
++inflight_tags_;
} else if (msg.end_stream_) {
grpc::WriteOptions write_options;
rw_->WriteLast(msg.buf_.value(), write_options, &write_last_tag_);
++inflight_tags_;
} else {
rw_->Write(msg.buf_.value(), &write_tag_);
++inflight_tags_;
}
ENVOY_LOG(trace, "Write op dispatched");
}
void GoogleAsyncStreamImpl::onCompletedOps() {
// The items in completed_ops_ execute in the order they were originally added to the queue since
// both the post callback scheduled by the completionThread and the deferred deletion of the
// GoogleAsyncClientThreadLocal happen on the dispatcher thread.
std::deque<std::pair<GoogleAsyncTag::Operation, bool>> completed_ops;
{
Thread::LockGuard lock(completed_ops_lock_);
completed_ops = std::move(completed_ops_);
// completed_ops_ should be empty after the move.
ASSERT(completed_ops_.empty());
}
while (!completed_ops.empty()) {
GoogleAsyncTag::Operation op;
bool ok;
std::tie(op, ok) = completed_ops.front();
completed_ops.pop_front();
handleOpCompletion(op, ok);
}
}
void GoogleAsyncStreamImpl::handleOpCompletion(GoogleAsyncTag::Operation op, bool ok) {
ENVOY_LOG(trace, "handleOpCompletion op={} ok={} inflight={}", static_cast<int>(op), ok,
inflight_tags_);
ASSERT(inflight_tags_ > 0);
--inflight_tags_;
if (draining_cq_) {
if (inflight_tags_ == 0) {
deferredDelete();
}
// Ignore op completions while draining CQ.
return;
}
// Consider failure cases first.
if (!ok) {
// Early fails can be just treated as Internal.
if (op == GoogleAsyncTag::Operation::Init ||
op == GoogleAsyncTag::Operation::ReadInitialMetadata) {
notifyRemoteClose(Status::WellKnownGrpcStatus::Internal, nullptr, EMPTY_STRING);
resetStream();
return;
}
// Remote server has closed, we can pick up some meaningful status.
// TODO(htuch): We're assuming here that a failed Write/WriteLast operation will result in
// stream termination, and pick up on the failed Read here. Confirm that this assumption is
// valid.
if (op == GoogleAsyncTag::Operation::Read) {
finish_pending_ = true;
rw_->Finish(&status_, &finish_tag_);
++inflight_tags_;
}
return;
}
switch (op) {
case GoogleAsyncTag::Operation::Init: {
ASSERT(ok);
ASSERT(!call_initialized_);
call_initialized_ = true;
rw_->ReadInitialMetadata(&read_initial_metadata_tag_);
++inflight_tags_;
writeQueued();
break;
}
case GoogleAsyncTag::Operation::ReadInitialMetadata: {
ASSERT(ok);
ASSERT(call_initialized_);
rw_->Read(&read_buf_, &read_tag_);
++inflight_tags_;
Http::ResponseHeaderMapPtr initial_metadata = Http::ResponseHeaderMapImpl::create();
metadataTranslate(ctxt_.GetServerInitialMetadata(), *initial_metadata);
if (!waiting_to_delete_on_remote_close_) {
callbacks_.onReceiveInitialMetadata(std::move(initial_metadata));
}
break;
}
case GoogleAsyncTag::Operation::Write: {
ASSERT(ok);
write_pending_ = false;
bytes_in_write_pending_queue_ -= write_pending_queue_.front().buf_.value().Length();
write_pending_queue_.pop();
writeQueued();
break;
}
case GoogleAsyncTag::Operation::WriteLast: {
ASSERT(ok);
write_pending_ = false;
break;
}
case GoogleAsyncTag::Operation::Read: {
ASSERT(ok);
auto buffer = GoogleGrpcUtils::makeBufferInstance(read_buf_);
if (!buffer || (!waiting_to_delete_on_remote_close_ &&
!callbacks_.onReceiveMessageRaw(std::move(buffer)))) {
// This is basically streamError in Grpc::AsyncClientImpl.
notifyRemoteClose(Status::WellKnownGrpcStatus::Internal, nullptr, EMPTY_STRING);
resetStream();
break;
}
rw_->Read(&read_buf_, &read_tag_);
++inflight_tags_;
break;
}
case GoogleAsyncTag::Operation::Finish: {
ASSERT(finish_pending_);
ENVOY_LOG(debug, "Finish with grpc-status code {}", static_cast<int>(status_.error_code()));
Http::ResponseTrailerMapPtr trailing_metadata = Http::ResponseTrailerMapImpl::create();
metadataTranslate(ctxt_.GetServerTrailingMetadata(), *trailing_metadata);
notifyRemoteClose(static_cast<Status::GrpcStatus>(status_.error_code()),
std::move(trailing_metadata), status_.error_message());
cleanup();
break;
}
}
}
void GoogleAsyncStreamImpl::metadataTranslate(
const std::multimap<grpc::string_ref, grpc::string_ref>& grpc_metadata,
Http::HeaderMap& header_map) {
// More painful copying, this time due to the mismatch in header
// representation data structures in Envoy and Google gRPC.
for (const auto& it : grpc_metadata) {
auto key = Http::LowerCaseString(std::string(it.first.data(), it.first.size()));
if (absl::EndsWith(key.get(), "-bin")) {
auto value = Base64::encode(it.second.data(), it.second.size());
header_map.addCopy(key, value);
continue;
}
header_map.addCopy(key, std::string(it.second.data(), it.second.size()));
}
}
void GoogleAsyncStreamImpl::deferredDelete() {
ENVOY_LOG(debug, "Deferred delete");
tls_.unregisterStream(this);
// We only get here following cleanup(), which has performed a
// remoteFromList(), resulting in self-ownership of the object's memory.
// Hence, it is safe here to create a unique_ptr to this and transfer
// ownership to dispatcher_.deferredDelete(). After this call, no further
// methods may be invoked on this object.
dispatcher_.deferredDelete(GoogleAsyncStreamImplPtr(this));
}
void GoogleAsyncStreamImpl::cleanup() {
ENVOY_LOG(debug, "Stream cleanup with {} in-flight tags", inflight_tags_);
// We can get here if the client has already issued resetStream() and, while
// this is in progress, the destructor runs.
if (draining_cq_) {
ENVOY_LOG(debug, "Cleanup already in progress");
return;
}
draining_cq_ = true;
ctxt_.TryCancel();
if (LinkedObject<GoogleAsyncStreamImpl>::inserted()) {
// We take ownership of our own memory at this point.
LinkedObject<GoogleAsyncStreamImpl>::removeFromList(parent_.active_streams_).release();
if (inflight_tags_ == 0) {
deferredDelete();
}
}
remote_close_timer_ = nullptr;
}
GoogleAsyncRequestImpl::GoogleAsyncRequestImpl(
GoogleAsyncClientImpl& parent, absl::string_view service_full_name,
absl::string_view method_name, Buffer::InstancePtr request, RawAsyncRequestCallbacks& callbacks,
Tracing::Span& parent_span, const Http::AsyncClient::RequestOptions& options)
: GoogleAsyncStreamImpl(parent, service_full_name, method_name, *this, options),
request_(std::move(request)), callbacks_(callbacks) {
current_span_ =
parent_span.spawnChild(Tracing::EgressConfig::get(),
absl::StrCat("async ", service_full_name, ".", method_name, " egress"),
parent.timeSource().systemTime());
current_span_->setTag(Tracing::Tags::get().UpstreamCluster, parent.stat_prefix_);
current_span_->setTag(Tracing::Tags::get().UpstreamAddress, parent.target_uri_);
current_span_->setTag(Tracing::Tags::get().Component, Tracing::Tags::get().Proxy);
}
void GoogleAsyncRequestImpl::initialize(bool buffer_body_for_retry) {
GoogleAsyncStreamImpl::initialize(buffer_body_for_retry);
if (callFailed()) {
return;
}
sendMessageRaw(std::move(request_), true);
}
void GoogleAsyncRequestImpl::cancel() {
current_span_->setTag(Tracing::Tags::get().Status, Tracing::Tags::get().Canceled);
current_span_->finishSpan();
resetStream();
}
void GoogleAsyncRequestImpl::detach() {
// TODO(wbpcode): In most tracers the span will hold a reference to the tracer self
// and it's possible that become a dangling reference for long time async request.
// This require further PR to resolve.
options_.sidestream_watermark_callbacks = nullptr;
options_.parent_span_ = nullptr;
options_.parent_context.stream_info = nullptr;
streamInfo().clearParentStreamInfo();
}
void GoogleAsyncRequestImpl::onCreateInitialMetadata(Http::RequestHeaderMap& metadata) {
Tracing::HttpTraceContext trace_context(metadata);
Tracing::UpstreamContext upstream_context(nullptr, // host_
nullptr, // cluster_
Tracing::ServiceType::GoogleGrpc, // service_type_
true // async_client_span_
);
current_span_->injectContext(trace_context, upstream_context);
callbacks_.onCreateInitialMetadata(metadata);
}
void GoogleAsyncRequestImpl::onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) {}
bool GoogleAsyncRequestImpl::onReceiveMessageRaw(Buffer::InstancePtr&& response) {
response_ = std::move(response);
return true;
}
void GoogleAsyncRequestImpl::onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) {}
void GoogleAsyncRequestImpl::onRemoteClose(Grpc::Status::GrpcStatus status,
const std::string& message) {
current_span_->setTag(Tracing::Tags::get().GrpcStatusCode, std::to_string(status));
if (status != Grpc::Status::WellKnownGrpcStatus::Ok) {
current_span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True);
callbacks_.onFailure(status, message, *current_span_);
} else if (response_ == nullptr) {
current_span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True);
callbacks_.onFailure(Status::Internal, EMPTY_STRING, *current_span_);
} else {
callbacks_.onSuccessRaw(std::move(response_), *current_span_);
}
current_span_->finishSpan();
}
} // namespace Grpc
} // namespace Envoy