Skip to content

Commit a7db927

Browse files
authored
fix(storage): add backoff to gRPC write retries (#11200)
1 parent 364b639 commit a7db927

File tree

9 files changed

+411
-80
lines changed

9 files changed

+411
-80
lines changed

storage/bucket_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1116,11 +1116,11 @@ func TestBucketRetryer(t *testing.T) {
11161116
WithErrorFunc(func(err error) bool { return false }))
11171117
},
11181118
want: &retryConfig{
1119-
backoff: &gax.Backoff{
1119+
backoff: gaxBackoffFromStruct(&gax.Backoff{
11201120
Initial: 2 * time.Second,
11211121
Max: 30 * time.Second,
11221122
Multiplier: 3,
1123-
},
1123+
}),
11241124
policy: RetryAlways,
11251125
maxAttempts: expectedAttempts(5),
11261126
shouldRetry: func(err error) bool { return false },
@@ -1135,9 +1135,9 @@ func TestBucketRetryer(t *testing.T) {
11351135
}))
11361136
},
11371137
want: &retryConfig{
1138-
backoff: &gax.Backoff{
1138+
backoff: gaxBackoffFromStruct(&gax.Backoff{
11391139
Multiplier: 3,
1140-
}},
1140+
})},
11411141
},
11421142
{
11431143
name: "set policy only",

storage/client_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1396,7 +1396,7 @@ func TestRetryMaxAttemptsEmulated(t *testing.T) {
13961396
instructions := map[string][]string{"storage.buckets.get": {"return-503", "return-503", "return-503", "return-503", "return-503"}}
13971397
testID := createRetryTest(t, client, instructions)
13981398
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)
1399-
config := &retryConfig{maxAttempts: expectedAttempts(3), backoff: &gax.Backoff{Initial: 10 * time.Millisecond}}
1399+
config := &retryConfig{maxAttempts: expectedAttempts(3), backoff: gaxBackoffFromStruct(&gax.Backoff{Initial: 10 * time.Millisecond})}
14001400
_, err = client.GetBucket(ctx, bucket, nil, idempotent(true), withRetryConfig(config))
14011401

14021402
var ae *apierror.APIError
@@ -1421,7 +1421,7 @@ func TestTimeoutErrorEmulated(t *testing.T) {
14211421
ctx, cancel := context.WithTimeout(ctx, time.Nanosecond)
14221422
defer cancel()
14231423
time.Sleep(5 * time.Nanosecond)
1424-
config := &retryConfig{backoff: &gax.Backoff{Initial: 10 * time.Millisecond}}
1424+
config := &retryConfig{backoff: gaxBackoffFromStruct(&gax.Backoff{Initial: 10 * time.Millisecond})}
14251425
_, err := client.GetBucket(ctx, bucket, nil, idempotent(true), withRetryConfig(config))
14261426

14271427
// Error may come through as a context.DeadlineExceeded (HTTP) or status.DeadlineExceeded (gRPC)
@@ -1447,7 +1447,7 @@ func TestRetryDeadlineExceedeEmulated(t *testing.T) {
14471447
instructions := map[string][]string{"storage.buckets.get": {"return-504", "return-504"}}
14481448
testID := createRetryTest(t, client, instructions)
14491449
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)
1450-
config := &retryConfig{maxAttempts: expectedAttempts(4), backoff: &gax.Backoff{Initial: 10 * time.Millisecond}}
1450+
config := &retryConfig{maxAttempts: expectedAttempts(4), backoff: gaxBackoffFromStruct(&gax.Backoff{Initial: 10 * time.Millisecond})}
14511451
if _, err := client.GetBucket(ctx, bucket, nil, idempotent(true), withRetryConfig(config)); err != nil {
14521452
t.Fatalf("GetBucket: got unexpected error %v, want nil", err)
14531453
}

storage/grpc_client.go

Lines changed: 114 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"cloud.google.com/go/internal/trace"
3030
gapic "cloud.google.com/go/storage/internal/apiv2"
3131
"cloud.google.com/go/storage/internal/apiv2/storagepb"
32+
"github.com/google/uuid"
3233
"github.com/googleapis/gax-go/v2"
3334
"google.golang.org/api/googleapi"
3435
"google.golang.org/api/iterator"
@@ -1223,7 +1224,7 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage
12231224
}
12241225
}
12251226

1226-
o, off, err := gw.uploadBuffer(recvd, offset, doneReading)
1227+
o, off, err := gw.uploadBuffer(recvd, offset, doneReading, newUploadBufferRetryConfig(gw.settings))
12271228
if err != nil {
12281229
err = checkCanceled(err)
12291230
errorf(err)
@@ -2091,12 +2092,7 @@ func (w *gRPCWriter) queryProgress() (int64, error) {
20912092
// completed.
20922093
//
20932094
// Returns object, persisted size, and any error that is not retriable.
2094-
func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*storagepb.Object, int64, error) {
2095-
var shouldRetry = ShouldRetry
2096-
if w.settings.retry != nil && w.settings.retry.shouldRetry != nil {
2097-
shouldRetry = w.settings.retry.shouldRetry
2098-
}
2099-
2095+
func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool, retryConfig *uploadBufferRetryConfig) (*storagepb.Object, int64, error) {
21002096
var err error
21012097
var lastWriteOfEntireObject bool
21022098

@@ -2143,6 +2139,7 @@ sendBytes: // label this loop so that we can use a continue statement from a nes
21432139
if w.stream == nil {
21442140
hds := []string{"x-goog-request-params", fmt.Sprintf("bucket=projects/_/buckets/%s", url.QueryEscape(w.bucket))}
21452141
ctx := gax.InsertMetadataIntoOutgoingContext(w.ctx, hds...)
2142+
ctx = setInvocationHeaders(ctx, retryConfig.invocationID, retryConfig.attempts)
21462143

21472144
w.stream, err = w.c.raw.BidiWriteObject(ctx)
21482145
if err != nil {
@@ -2188,7 +2185,11 @@ sendBytes: // label this loop so that we can use a continue statement from a nes
21882185
// Retriable errors mean we should start over and attempt to
21892186
// resend the entire buffer via a new stream.
21902187
// If not retriable, falling through will return the error received.
2191-
if shouldRetry(err) {
2188+
err = retryConfig.retriable(w.ctx, err)
2189+
2190+
if err == nil {
2191+
retryConfig.doBackOff(w.ctx)
2192+
21922193
// TODO: Add test case for failure modes of querying progress.
21932194
writeOffset, err = w.determineOffset(start)
21942195
if err != nil {
@@ -2230,11 +2231,17 @@ sendBytes: // label this loop so that we can use a continue statement from a nes
22302231
if !lastWriteOfEntireObject {
22312232
resp, err := w.stream.Recv()
22322233

2233-
// Retriable errors mean we should start over and attempt to
2234-
// resend the entire buffer via a new stream.
2235-
// If not retriable, falling through will return the error received
2236-
// from closing the stream.
2237-
if shouldRetry(err) {
2234+
if err != nil {
2235+
// Retriable errors mean we should start over and attempt to
2236+
// resend the entire buffer via a new stream.
2237+
// If not retriable, falling through will return the error received
2238+
// from closing the stream.
2239+
err = retryConfig.retriable(w.ctx, err)
2240+
if err != nil {
2241+
return nil, 0, err
2242+
}
2243+
2244+
retryConfig.doBackOff(w.ctx)
22382245
writeOffset, err = w.determineOffset(start)
22392246
if err != nil {
22402247
return nil, 0, err
@@ -2246,9 +2253,6 @@ sendBytes: // label this loop so that we can use a continue statement from a nes
22462253

22472254
continue sendBytes
22482255
}
2249-
if err != nil {
2250-
return nil, 0, err
2251-
}
22522256

22532257
if resp.GetPersistedSize() != writeOffset {
22542258
// Retry if not all bytes were persisted.
@@ -2274,7 +2278,14 @@ sendBytes: // label this loop so that we can use a continue statement from a nes
22742278
var obj *storagepb.Object
22752279
for obj == nil {
22762280
resp, err := w.stream.Recv()
2277-
if shouldRetry(err) {
2281+
2282+
if err != nil {
2283+
err = retryConfig.retriable(w.ctx, err)
2284+
if err != nil {
2285+
return nil, 0, err
2286+
}
2287+
retryConfig.doBackOff(w.ctx)
2288+
22782289
writeOffset, err = w.determineOffset(start)
22792290
if err != nil {
22802291
return nil, 0, err
@@ -2283,9 +2294,6 @@ sendBytes: // label this loop so that we can use a continue statement from a nes
22832294
w.stream = nil
22842295
continue sendBytes
22852296
}
2286-
if err != nil {
2287-
return nil, 0, err
2288-
}
22892297

22902298
obj = resp.GetResource()
22912299
}
@@ -2370,3 +2378,89 @@ func checkCanceled(err error) error {
23702378

23712379
return err
23722380
}
2381+
2382+
type uploadBufferRetryConfig struct {
2383+
attempts int
2384+
invocationID string
2385+
config *retryConfig
2386+
lastErr error
2387+
}
2388+
2389+
func newUploadBufferRetryConfig(settings *settings) *uploadBufferRetryConfig {
2390+
config := settings.retry
2391+
2392+
if config == nil {
2393+
config = defaultRetry.clone()
2394+
}
2395+
2396+
if config.shouldRetry == nil {
2397+
config.shouldRetry = ShouldRetry
2398+
}
2399+
2400+
if config.backoff == nil {
2401+
config.backoff = &gaxBackoff{}
2402+
} else {
2403+
config.backoff.SetMultiplier(settings.retry.backoff.GetMultiplier())
2404+
config.backoff.SetInitial(settings.retry.backoff.GetInitial())
2405+
config.backoff.SetMax(settings.retry.backoff.GetMax())
2406+
}
2407+
2408+
return &uploadBufferRetryConfig{
2409+
attempts: 1,
2410+
invocationID: uuid.New().String(),
2411+
config: config,
2412+
}
2413+
}
2414+
2415+
// retriable determines if a retry is necessary and if so returns a nil error;
2416+
// otherwise it returns the error to be surfaced to the user.
2417+
func (retry *uploadBufferRetryConfig) retriable(ctx context.Context, err error) error {
2418+
if err == nil {
2419+
// a nil err does not need to be retried
2420+
return nil
2421+
}
2422+
if err != context.Canceled && err != context.DeadlineExceeded {
2423+
retry.lastErr = err
2424+
}
2425+
2426+
if retry.config.policy == RetryNever {
2427+
return err
2428+
}
2429+
2430+
if retry.config.maxAttempts != nil && retry.attempts >= *retry.config.maxAttempts {
2431+
return fmt.Errorf("storage: retry failed after %v attempts; last error: %w", retry.attempts, err)
2432+
}
2433+
2434+
retry.attempts++
2435+
2436+
// Explicitly check context cancellation so that we can distinguish between a
2437+
// DEADLINE_EXCEEDED error from the server and a user-set context deadline.
2438+
// Unfortunately gRPC will codes.DeadlineExceeded (which may be retryable if it's
2439+
// sent by the server) in both cases.
2440+
ctxErr := ctx.Err()
2441+
if errors.Is(ctxErr, context.Canceled) || errors.Is(ctxErr, context.DeadlineExceeded) {
2442+
if retry.lastErr != nil {
2443+
return fmt.Errorf("retry failed with %v; last error: %w", ctxErr, retry.lastErr)
2444+
}
2445+
return ctxErr
2446+
}
2447+
2448+
if !retry.config.shouldRetry(err) {
2449+
return err
2450+
}
2451+
return nil
2452+
}
2453+
2454+
// doBackOff pauses for the appropriate amount of time; it should be called after
2455+
// encountering a retriable error.
2456+
func (retry *uploadBufferRetryConfig) doBackOff(ctx context.Context) error {
2457+
p := retry.config.backoff.Pause()
2458+
2459+
if ctxErr := gax.Sleep(ctx, p); ctxErr != nil {
2460+
if retry.lastErr != nil {
2461+
return fmt.Errorf("retry failed with %v; last error: %w", ctxErr, retry.lastErr)
2462+
}
2463+
return ctxErr
2464+
}
2465+
return nil
2466+
}

0 commit comments

Comments
 (0)