Skip to content

Commit c58299d

Browse files
authored
feat(GCS+gRPC): more efficient WriteObject() stall timeouts (#9576)
This changes the implementation of `WriteObject()` to use the blocking API and a watchdog timer to implement stall timeouts. If the timer expires before the `Write()` operation completes a background thread cancels the RPC. If the operation completes, we cancel the timer.
1 parent 3994860 commit c58299d

3 files changed

Lines changed: 116 additions & 154 deletions

File tree

google/cloud/storage/internal/grpc_client.cc

Lines changed: 40 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -91,15 +91,6 @@ StatusOr<ObjectAccessControl> FindDefaultObjectAccessControl(
9191
"> in object id " + response->id());
9292
}
9393

94-
std::chrono::milliseconds DefaultTransferStallTimeout(
95-
std::chrono::milliseconds value) {
96-
if (value != std::chrono::milliseconds(0)) return value;
97-
// We need a large value for `wait_for()`, but not so large that it can easily
98-
// overflow. Fortunately, uploads automatically cancel (server side) after
99-
// 7 days, so waiting for 14 days will not create spurious timeouts.
100-
return std::chrono::milliseconds(std::chrono::hours(24) * 14);
101-
}
102-
10394
// If this is the last `Write()` call of the last `UploadChunk()` set the flags
10495
// to finalize the request
10596
void MaybeFinalize(google::storage::v2::WriteObjectRequest& write_request,
@@ -129,39 +120,34 @@ Status TimeoutError(std::chrono::milliseconds timeout, std::string const& op) {
129120
"] while waiting for " + op);
130121
}
131122

132-
struct WaitForFinish {
133-
std::unique_ptr<GrpcClient::WriteObjectStream> stream;
134-
void operator()(future<StatusOr<google::storage::v2::WriteObjectResponse>>) {}
135-
};
136-
137-
struct WaitForIdle {
138-
std::unique_ptr<GrpcClient::WriteObjectStream> stream;
139-
void operator()(future<bool>) {
140-
auto finish = stream->Finish();
141-
(void)finish.then(WaitForFinish{std::move(stream)});
142-
}
143-
};
144-
145123
StatusOr<QueryResumableUploadResponse> CloseWriteObjectStream(
146124
std::chrono::milliseconds timeout,
125+
std::function<future<bool>()> const& create_watchdog,
147126
std::unique_ptr<GrpcClient::WriteObjectStream> writer,
148127
bool sent_last_message, google::cloud::Options const& options) {
149128
if (!writer) return TimeoutError(timeout, "Write()");
150129
if (!sent_last_message) {
151-
auto pending = writer->WritesDone();
152-
if (pending.wait_for(timeout) == std::future_status::timeout) {
130+
auto watchdog = create_watchdog().then([&writer](auto f) {
131+
if (!f.get()) return false;
153132
writer->Cancel();
154-
pending.then(WaitForIdle{std::move(writer)});
133+
return true;
134+
});
135+
(void)writer->Write(google::storage::v2::WriteObjectRequest{},
136+
grpc::WriteOptions().set_last_message());
137+
watchdog.cancel();
138+
if (watchdog.get()) {
139+
writer->Close();
155140
return TimeoutError(timeout, "WritesDone()");
156141
}
157142
}
158-
auto pending = writer->Finish();
159-
if (pending.wait_for(timeout) == std::future_status::timeout) {
143+
auto watchdog = create_watchdog().then([&writer](auto f) {
144+
if (!f.get()) return false;
160145
writer->Cancel();
161-
pending.then(WaitForFinish{std::move(writer)});
162-
return TimeoutError(timeout, "Finish()");
163-
}
164-
auto response = pending.get();
146+
return true;
147+
});
148+
auto response = writer->Close();
149+
watchdog.cancel();
150+
if (watchdog.get()) return TimeoutError(timeout, "Close()");
165151
if (!response) return std::move(response).status();
166152
return GrpcObjectRequestParser::FromProto(*std::move(response), options);
167153
}
@@ -619,20 +605,19 @@ StatusOr<EmptyResponse> GrpcClient::DeleteResumableUpload(
619605

620606
StatusOr<QueryResumableUploadResponse> GrpcClient::UploadChunk(
621607
UploadChunkRequest const& request) {
622-
auto const timeout =
623-
DefaultTransferStallTimeout(google::cloud::internal::CurrentOptions()
624-
.get<TransferStallTimeoutOption>());
608+
auto const timeout = google::cloud::internal::CurrentOptions()
609+
.get<TransferStallTimeoutOption>();
610+
auto create_watchdog = [cq = background_->cq(), timeout]() mutable {
611+
if (timeout == std::chrono::seconds(0)) {
612+
return make_ready_future(false);
613+
}
614+
return cq.MakeRelativeTimer(timeout).then(
615+
[](auto f) { return f.get().ok(); });
616+
};
625617

626618
auto context = absl::make_unique<grpc::ClientContext>();
627619
ApplyQueryParameters(*context, request, "resource");
628-
auto writer = stub_->AsyncWriteObject(background_->cq(), std::move(context));
629-
630-
auto pending_start = writer->Start();
631-
if (pending_start.wait_for(timeout) == std::future_status::timeout) {
632-
writer->Cancel();
633-
pending_start.then(WaitForIdle{std::move(writer)});
634-
return TimeoutError(timeout, "Start()");
635-
}
620+
auto writer = stub_->WriteObject(std::move(context));
636621

637622
std::size_t const maximum_chunk_size =
638623
google::storage::v2::ServiceConstants::MAX_WRITE_CHUNK_BYTES;
@@ -660,15 +645,22 @@ StatusOr<QueryResumableUploadResponse> GrpcClient::UploadChunk(
660645
auto options = grpc::WriteOptions();
661646
MaybeFinalize(write_request, options, request, has_more);
662647

663-
auto pending = writer->Write(write_request, options);
664-
if (pending.wait_for(timeout) == std::future_status::timeout) {
648+
auto watchdog = create_watchdog().then([&writer](auto f) {
649+
if (!f.get()) return false;
665650
writer->Cancel();
666-
pending.then(WaitForIdle{std::move(writer)});
651+
return true;
652+
});
653+
auto success = writer->Write(write_request, options);
654+
watchdog.cancel();
655+
if (watchdog.get()) {
656+
// The stream is cancelled, but we need to close it explicitly.
657+
writer->Close();
658+
writer.reset();
667659
return false;
668660
}
669661

670662
sent_last_message = options.is_last_message();
671-
if (!pending.get()) return false;
663+
if (!success) return false;
672664
// After the first message, clear the object specification and checksums,
673665
// there is no need to resend it.
674666
write_request.clear_write_object_spec();
@@ -679,8 +671,8 @@ StatusOr<QueryResumableUploadResponse> GrpcClient::UploadChunk(
679671
};
680672

681673
auto close_writer = [&]() -> StatusOr<QueryResumableUploadResponse> {
682-
return CloseWriteObjectStream(timeout, std::move(writer), sent_last_message,
683-
options());
674+
return CloseWriteObjectStream(timeout, create_watchdog, std::move(writer),
675+
sent_last_message, options());
684676
};
685677

686678
auto buffers = request.payload();

google/cloud/storage/internal/grpc_client.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
#include "google/cloud/storage/internal/raw_client.h"
1919
#include "google/cloud/storage/version.h"
2020
#include "google/cloud/background_threads.h"
21-
#include "google/cloud/internal/async_streaming_write_rpc.h"
21+
#include "google/cloud/internal/streaming_write_rpc.h"
2222
#include <google/storage/v2/storage.pb.h>
2323
#include <functional>
2424
#include <memory>
@@ -56,7 +56,7 @@ class GrpcClient : public RawClient,
5656

5757
~GrpcClient() override = default;
5858

59-
using WriteObjectStream = ::google::cloud::internal::AsyncStreamingWriteRpc<
59+
using WriteObjectStream = ::google::cloud::internal::StreamingWriteRpc<
6060
google::storage::v2::WriteObjectRequest,
6161
google::storage::v2::WriteObjectResponse>;
6262

google/cloud/storage/internal/grpc_client_upload_chunk_test.cc

Lines changed: 74 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "google/cloud/grpc_options.h"
1818
#include "google/cloud/options.h"
1919
#include "google/cloud/testing_util/is_proto_equal.h"
20+
#include "google/cloud/testing_util/mock_completion_queue_impl.h"
2021
#include "google/cloud/testing_util/status_matchers.h"
2122
#include "absl/memory/memory.h"
2223
#include <gmock/gmock.h>
@@ -28,76 +29,42 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
2829
namespace internal {
2930
namespace {
3031

31-
using ::google::cloud::storage::testing::MockAsyncInsertStream;
32+
using ::google::cloud::storage::testing::MockInsertStream;
3233
using ::google::cloud::storage::testing::MockStorageStub;
34+
using ::google::cloud::testing_util::MockCompletionQueueImpl;
3335
using ::google::cloud::testing_util::StatusIs;
3436
using ::testing::ByMove;
3537
using ::testing::HasSubstr;
3638
using ::testing::Return;
3739

38-
/// @verify that stall timeouts are reported correctly.
39-
TEST(GrpcClientUploadChunkTest, StallTimeoutStart) {
40-
// The mock will satisfy this promise when `Cancel()` is called.
41-
promise<void> hold_response;
42-
43-
auto mock = std::make_shared<MockStorageStub>();
44-
EXPECT_CALL(*mock, AsyncWriteObject)
45-
.WillOnce([&](google::cloud::CompletionQueue const&,
46-
std::unique_ptr<grpc::ClientContext>) {
47-
::testing::InSequence sequence;
48-
auto stream = absl::make_unique<MockAsyncInsertStream>();
49-
EXPECT_CALL(*stream, Start).WillOnce([&] {
50-
return hold_response.get_future().then(
51-
[](future<void>) { return false; });
52-
});
53-
EXPECT_CALL(*stream, Cancel).WillOnce([&] {
54-
hold_response.set_value();
55-
});
56-
EXPECT_CALL(*stream, Finish)
57-
.WillOnce(Return(ByMove(make_ready_future(
58-
make_status_or(google::storage::v2::WriteObjectResponse{})))));
59-
return stream;
60-
});
61-
62-
auto client = GrpcClient::CreateMock(mock);
63-
google::cloud::internal::OptionsSpan const span(
64-
Options{}.set<TransferStallTimeoutOption>(std::chrono::seconds(1)));
65-
auto const payload = std::string(UploadChunkRequest::kChunkSizeQuantum, 'A');
66-
auto response = client->UploadChunk(UploadChunkRequest(
67-
"test-only-upload-id", /*offset=*/0, {ConstBuffer{payload}}));
68-
EXPECT_THAT(response,
69-
StatusIs(StatusCode::kDeadlineExceeded, HasSubstr("Start()")));
70-
}
71-
7240
/// @verify that stall timeouts are reported correctly.
7341
TEST(GrpcClientUploadChunkTest, StallTimeoutWrite) {
74-
// The mock will satisfy this promise when `Cancel()` is called.
75-
promise<void> hold_response;
76-
7742
auto mock = std::make_shared<MockStorageStub>();
78-
EXPECT_CALL(*mock, AsyncWriteObject)
79-
.WillOnce([&](google::cloud::CompletionQueue const&,
80-
std::unique_ptr<grpc::ClientContext>) {
43+
EXPECT_CALL(*mock, WriteObject)
44+
.WillOnce([&](std::unique_ptr<grpc::ClientContext>) {
8145
::testing::InSequence sequence;
82-
auto stream = absl::make_unique<MockAsyncInsertStream>();
83-
EXPECT_CALL(*stream, Start)
84-
.WillOnce(Return(ByMove(make_ready_future(true))));
85-
EXPECT_CALL(*stream, Write).WillOnce([&] {
86-
return hold_response.get_future().then(
87-
[](future<void>) { return false; });
88-
});
89-
EXPECT_CALL(*stream, Cancel).WillOnce([&] {
90-
hold_response.set_value();
91-
});
92-
EXPECT_CALL(*stream, Finish)
93-
.WillOnce(Return(ByMove(make_ready_future(
94-
make_status_or(google::storage::v2::WriteObjectResponse{})))));
46+
auto stream = absl::make_unique<MockInsertStream>();
47+
EXPECT_CALL(*stream, Cancel).Times(1);
48+
EXPECT_CALL(*stream, Write).WillOnce(Return(false));
49+
EXPECT_CALL(*stream, Close)
50+
.WillOnce(Return(
51+
make_status_or(google::storage::v2::WriteObjectResponse{})));
9552
return stream;
9653
});
9754

98-
auto client = GrpcClient::CreateMock(mock);
55+
auto const expected = std::chrono::seconds(42);
56+
auto mock_cq = std::make_shared<MockCompletionQueueImpl>();
57+
EXPECT_CALL(*mock_cq, MakeRelativeTimer(std::chrono::nanoseconds(expected)))
58+
.WillOnce(Return(ByMove(make_ready_future(
59+
make_status_or(std::chrono::system_clock::now())))));
60+
auto cq = CompletionQueue(mock_cq);
61+
62+
auto client = GrpcClient::CreateMock(
63+
mock, Options{}
64+
.set<TransferStallTimeoutOption>(expected)
65+
.set<GrpcCompletionQueueOption>(cq));
9966
google::cloud::internal::OptionsSpan const span(
100-
Options{}.set<TransferStallTimeoutOption>(std::chrono::seconds(1)));
67+
Options{}.set<TransferStallTimeoutOption>(expected));
10168
auto const payload = std::string(UploadChunkRequest::kChunkSizeQuantum, 'A');
10269
auto response = client->UploadChunk(UploadChunkRequest(
10370
"test-only-upload-id", /*offset=*/0, {ConstBuffer{payload}}));
@@ -107,35 +74,35 @@ TEST(GrpcClientUploadChunkTest, StallTimeoutWrite) {
10774

10875
/// @verify that stall timeouts are reported correctly.
10976
TEST(GrpcClientUploadChunkTest, StallTimeoutWritesDone) {
110-
// The mock will satisfy this promise when `Cancel()` is called.
111-
promise<void> hold_response;
112-
11377
auto mock = std::make_shared<MockStorageStub>();
114-
EXPECT_CALL(*mock, AsyncWriteObject)
115-
.WillOnce([&](google::cloud::CompletionQueue const&,
116-
std::unique_ptr<grpc::ClientContext>) {
78+
EXPECT_CALL(*mock, WriteObject)
79+
.WillOnce([&](std::unique_ptr<grpc::ClientContext>) {
11780
::testing::InSequence sequence;
118-
auto stream = absl::make_unique<MockAsyncInsertStream>();
119-
EXPECT_CALL(*stream, Start)
120-
.WillOnce(Return(ByMove(make_ready_future(true))));
121-
EXPECT_CALL(*stream, Write)
122-
.WillOnce(Return(ByMove(make_ready_future(true))));
123-
EXPECT_CALL(*stream, WritesDone).WillOnce([&] {
124-
return hold_response.get_future().then(
125-
[](future<void>) { return false; });
126-
});
127-
EXPECT_CALL(*stream, Cancel).WillOnce([&] {
128-
hold_response.set_value();
129-
});
130-
EXPECT_CALL(*stream, Finish)
131-
.WillOnce(Return(ByMove(make_ready_future(
132-
make_status_or(google::storage::v2::WriteObjectResponse{})))));
81+
auto stream = absl::make_unique<MockInsertStream>();
82+
EXPECT_CALL(*stream, Write).WillOnce(Return(true));
83+
EXPECT_CALL(*stream, Cancel).Times(1);
84+
EXPECT_CALL(*stream, Write).WillOnce(Return(false));
85+
EXPECT_CALL(*stream, Close)
86+
.WillOnce(Return(google::storage::v2::WriteObjectResponse{}));
13387
return stream;
13488
});
13589

136-
auto client = GrpcClient::CreateMock(mock);
90+
auto const expected = std::chrono::seconds(42);
91+
auto mock_cq = std::make_shared<MockCompletionQueueImpl>();
92+
EXPECT_CALL(*mock_cq, MakeRelativeTimer(std::chrono::nanoseconds(expected)))
93+
.WillOnce(Return(ByMove(
94+
make_ready_future(StatusOr<std::chrono::system_clock::time_point>(
95+
Status{StatusCode::kCancelled, "test-only"})))))
96+
.WillOnce(Return(ByMove(make_ready_future(
97+
make_status_or(std::chrono::system_clock::now())))));
98+
auto cq = CompletionQueue(mock_cq);
99+
100+
auto client = GrpcClient::CreateMock(
101+
mock, Options{}
102+
.set<TransferStallTimeoutOption>(expected)
103+
.set<GrpcCompletionQueueOption>(cq));
137104
google::cloud::internal::OptionsSpan const span(
138-
Options{}.set<TransferStallTimeoutOption>(std::chrono::seconds(1)));
105+
Options{}.set<TransferStallTimeoutOption>(expected));
139106
auto const payload = std::string(UploadChunkRequest::kChunkSizeQuantum, 'A');
140107
auto response = client->UploadChunk(UploadChunkRequest(
141108
"test-only-upload-id", /*offset=*/0, {ConstBuffer{payload}}));
@@ -144,41 +111,44 @@ TEST(GrpcClientUploadChunkTest, StallTimeoutWritesDone) {
144111
}
145112

146113
/// @verify that stall timeouts are reported correctly.
147-
TEST(GrpcClientUploadChunkTest, StallTimeoutFinish) {
148-
// The mock will satisfy this promise when `Cancel()` is called.
149-
promise<void> hold_response;
150-
114+
TEST(GrpcClientUploadChunkTest, StallTimeoutClose) {
151115
auto mock = std::make_shared<MockStorageStub>();
152-
EXPECT_CALL(*mock, AsyncWriteObject)
153-
.WillOnce([&](google::cloud::CompletionQueue const&,
154-
std::unique_ptr<grpc::ClientContext>) {
116+
EXPECT_CALL(*mock, WriteObject)
117+
.WillOnce([&](std::unique_ptr<grpc::ClientContext>) {
155118
::testing::InSequence sequence;
156-
auto stream = absl::make_unique<MockAsyncInsertStream>();
157-
EXPECT_CALL(*stream, Start)
158-
.WillOnce(Return(ByMove(make_ready_future(true))));
159-
EXPECT_CALL(*stream, Write)
160-
.WillOnce(Return(ByMove(make_ready_future(true))));
161-
EXPECT_CALL(*stream, WritesDone)
162-
.WillOnce(Return(ByMove(make_ready_future(true))));
163-
EXPECT_CALL(*stream, Finish).WillOnce([&] {
164-
return hold_response.get_future().then([](future<void>) {
165-
return make_status_or(google::storage::v2::WriteObjectResponse{});
166-
});
167-
});
168-
EXPECT_CALL(*stream, Cancel).WillOnce([&] {
169-
hold_response.set_value();
170-
});
119+
auto stream = absl::make_unique<MockInsertStream>();
120+
EXPECT_CALL(*stream, Write).Times(2).WillRepeatedly(Return(true));
121+
EXPECT_CALL(*stream, Cancel).Times(1);
122+
EXPECT_CALL(*stream, Close)
123+
.WillOnce(Return(
124+
make_status_or(google::storage::v2::WriteObjectResponse{})));
171125
return stream;
172126
});
173127

174-
auto client = GrpcClient::CreateMock(mock);
128+
auto const expected = std::chrono::seconds(42);
129+
auto mock_cq = std::make_shared<MockCompletionQueueImpl>();
130+
EXPECT_CALL(*mock_cq, MakeRelativeTimer(std::chrono::nanoseconds(expected)))
131+
.WillOnce(Return(ByMove(
132+
make_ready_future(StatusOr<std::chrono::system_clock::time_point>(
133+
Status{StatusCode::kCancelled, "test-only"})))))
134+
.WillOnce(Return(ByMove(
135+
make_ready_future(StatusOr<std::chrono::system_clock::time_point>(
136+
Status{StatusCode::kCancelled, "test-only"})))))
137+
.WillOnce(Return(ByMove(make_ready_future(
138+
make_status_or(std::chrono::system_clock::now())))));
139+
auto cq = CompletionQueue(mock_cq);
140+
141+
auto client = GrpcClient::CreateMock(
142+
mock, Options{}
143+
.set<TransferStallTimeoutOption>(expected)
144+
.set<GrpcCompletionQueueOption>(cq));
175145
google::cloud::internal::OptionsSpan const span(
176-
Options{}.set<TransferStallTimeoutOption>(std::chrono::seconds(1)));
146+
Options{}.set<TransferStallTimeoutOption>(expected));
177147
auto const payload = std::string(UploadChunkRequest::kChunkSizeQuantum, 'A');
178148
auto response = client->UploadChunk(UploadChunkRequest(
179149
"test-only-upload-id", /*offset=*/0, {ConstBuffer{payload}}));
180150
EXPECT_THAT(response,
181-
StatusIs(StatusCode::kDeadlineExceeded, HasSubstr("Finish()")));
151+
StatusIs(StatusCode::kDeadlineExceeded, HasSubstr("Close()")));
182152
}
183153

184154
} // namespace

0 commit comments

Comments
 (0)