-
Notifications
You must be signed in to change notification settings - Fork 5.4k
Expand file tree
/
Copy pathasync_client_impl.h
More file actions
174 lines (142 loc) · 6.46 KB
/
async_client_impl.h
File metadata and controls
174 lines (142 loc) · 6.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
#pragma once
#include <memory>
#include "envoy/config/core/v3/base.pb.h"
#include "envoy/config/core/v3/grpc_service.pb.h"
#include "envoy/config/route/v3/route_components.pb.h"
#include "envoy/grpc/async_client.h"
#include "envoy/stream_info/stream_info.h"
#include "source/common/common/linked_object.h"
#include "source/common/grpc/codec.h"
#include "source/common/grpc/typed_async_client.h"
#include "source/common/http/async_client_impl.h"
#include "source/common/router/header_parser.h"
namespace Envoy {
namespace Grpc {
class AsyncRequestImpl;
class AsyncStreamImpl;
using AsyncStreamImplPtr = std::unique_ptr<AsyncStreamImpl>;
class AsyncClientImpl final : public RawAsyncClient {
public:
static absl::StatusOr<std::unique_ptr<AsyncClientImpl>>
create(const envoy::config::core::v3::GrpcService& config,
Server::Configuration::CommonFactoryContext& context);
~AsyncClientImpl() override;
// Grpc::AsyncClient
AsyncRequest* sendRaw(absl::string_view service_full_name, absl::string_view method_name,
Buffer::InstancePtr&& request, RawAsyncRequestCallbacks& callbacks,
Tracing::Span& parent_span,
const Http::AsyncClient::RequestOptions& options) override;
RawAsyncStream* startRaw(absl::string_view service_full_name, absl::string_view method_name,
RawAsyncStreamCallbacks& callbacks,
const Http::AsyncClient::StreamOptions& options) override;
absl::string_view destination() override { return remote_cluster_name_; }
const Router::RetryPolicyConstSharedPtr& retryPolicy() { return retry_policy_; }
protected:
AsyncClientImpl(const envoy::config::core::v3::GrpcService& config,
Server::Configuration::CommonFactoryContext& context,
absl::Status& creation_status);
private:
const uint32_t max_recv_message_length_;
const bool skip_envoy_headers_;
Upstream::ClusterManager& cm_;
const std::string remote_cluster_name_;
// The host header value in the http transport.
const std::string host_name_;
std::list<AsyncStreamImplPtr> active_streams_;
TimeSource& time_source_;
Router::HeaderParserPtr metadata_parser_;
// Default per service retry policy.
Router::RetryPolicyConstSharedPtr retry_policy_;
friend class AsyncRequestImpl;
friend class AsyncStreamImpl;
};
class AsyncStreamImpl : public RawAsyncStream,
Http::AsyncClient::StreamCallbacks,
public Event::DeferredDeletable,
public LinkedObject<AsyncStreamImpl> {
public:
AsyncStreamImpl(AsyncClientImpl& parent, absl::string_view service_full_name,
absl::string_view method_name, RawAsyncStreamCallbacks& callbacks,
const Http::AsyncClient::StreamOptions& options);
~AsyncStreamImpl() override;
virtual void initialize(bool buffer_body_for_retry);
// Http::AsyncClient::StreamCallbacks
void onHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) override;
void onData(Buffer::Instance& data, bool end_stream) override;
void onTrailers(Http::ResponseTrailerMapPtr&& trailers) override;
void onComplete() override;
void onReset() override;
void waitForRemoteCloseAndDelete() override;
// Grpc::AsyncStream
void sendMessageRaw(Buffer::InstancePtr&& request, bool end_stream) override;
void closeStream() override;
void resetStream() override;
bool isAboveWriteBufferHighWatermark() const override {
return stream_ && stream_->isAboveWriteBufferHighWatermark();
}
bool hasResetStream() const { return http_reset_; }
const StreamInfo::StreamInfo& streamInfo() const override { return stream_->streamInfo(); }
StreamInfo::StreamInfo& streamInfo() override { return stream_->streamInfo(); }
void setWatermarkCallbacks(Http::SidestreamWatermarkCallbacks& callbacks) override {
stream_->setWatermarkCallbacks(callbacks);
}
void removeWatermarkCallbacks() override {
if (options_.sidestream_watermark_callbacks != nullptr) {
stream_->removeWatermarkCallbacks();
options_.sidestream_watermark_callbacks = nullptr;
}
}
protected:
Upstream::ClusterInfoConstSharedPtr cluster_info_;
private:
void streamError(Status::GrpcStatus grpc_status, const std::string& message);
void streamError(Status::GrpcStatus grpc_status) { streamError(grpc_status, EMPTY_STRING); }
void cleanup();
void trailerResponse(absl::optional<Status::GrpcStatus> grpc_status,
const std::string& grpc_message);
// Deliver notification and update span when the connection closes.
void notifyRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message);
protected:
Event::Dispatcher* dispatcher_{};
Http::RequestMessagePtr headers_message_;
AsyncClientImpl& parent_;
std::string service_full_name_;
std::string method_name_;
Tracing::SpanPtr current_span_;
RawAsyncStreamCallbacks& callbacks_;
Http::AsyncClient::StreamOptions options_;
bool http_reset_{};
bool waiting_to_delete_on_remote_close_{};
Http::AsyncClient::Stream* stream_{};
Decoder decoder_;
// This is a member to avoid reallocation on every onData().
std::vector<Frame> decoded_frames_;
Event::TimerPtr remote_close_timer_;
friend class AsyncClientImpl;
};
class AsyncRequestImpl : public AsyncRequest, public AsyncStreamImpl, RawAsyncStreamCallbacks {
public:
AsyncRequestImpl(AsyncClientImpl& parent, absl::string_view service_full_name,
absl::string_view method_name, Buffer::InstancePtr&& request,
RawAsyncRequestCallbacks& callbacks, Tracing::Span& parent_span,
const Http::AsyncClient::RequestOptions& options);
void initialize(bool buffer_body_for_retry) override;
// Grpc::AsyncRequest
void cancel() override;
const StreamInfo::StreamInfo& streamInfo() const override;
void detach() override;
private:
using AsyncStreamImpl::streamInfo;
// Grpc::AsyncStreamCallbacks
void onCreateInitialMetadata(Http::RequestHeaderMap& metadata) override;
void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override;
bool onReceiveMessageRaw(Buffer::InstancePtr&& response) override;
void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override;
void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override;
Buffer::InstancePtr request_;
RawAsyncRequestCallbacks& callbacks_;
Tracing::SpanPtr current_span_;
Buffer::InstancePtr response_;
};
} // namespace Grpc
} // namespace Envoy