@@ -29,6 +29,7 @@ import (
2929 "cloud.google.com/go/internal/trace"
3030 gapic "cloud.google.com/go/storage/internal/apiv2"
3131 "cloud.google.com/go/storage/internal/apiv2/storagepb"
32+ "github.com/google/uuid"
3233 "github.com/googleapis/gax-go/v2"
3334 "google.golang.org/api/googleapi"
3435 "google.golang.org/api/iterator"
@@ -1223,7 +1224,7 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage
12231224 }
12241225 }
12251226
1226- o , off , err := gw .uploadBuffer (recvd , offset , doneReading )
1227+ o , off , err := gw .uploadBuffer (recvd , offset , doneReading , newUploadBufferRetryConfig ( gw . settings ) )
12271228 if err != nil {
12281229 err = checkCanceled (err )
12291230 errorf (err )
@@ -2091,12 +2092,7 @@ func (w *gRPCWriter) queryProgress() (int64, error) {
20912092// completed.
20922093//
20932094// Returns object, persisted size, and any error that is not retriable.
2094- func (w * gRPCWriter ) uploadBuffer (recvd int , start int64 , doneReading bool ) (* storagepb.Object , int64 , error ) {
2095- var shouldRetry = ShouldRetry
2096- if w .settings .retry != nil && w .settings .retry .shouldRetry != nil {
2097- shouldRetry = w .settings .retry .shouldRetry
2098- }
2099-
2095+ func (w * gRPCWriter ) uploadBuffer (recvd int , start int64 , doneReading bool , retryConfig * uploadBufferRetryConfig ) (* storagepb.Object , int64 , error ) {
21002096 var err error
21012097 var lastWriteOfEntireObject bool
21022098
@@ -2143,6 +2139,7 @@ sendBytes: // label this loop so that we can use a continue statement from a nes
21432139 if w .stream == nil {
21442140 hds := []string {"x-goog-request-params" , fmt .Sprintf ("bucket=projects/_/buckets/%s" , url .QueryEscape (w .bucket ))}
21452141 ctx := gax .InsertMetadataIntoOutgoingContext (w .ctx , hds ... )
2142+ ctx = setInvocationHeaders (ctx , retryConfig .invocationID , retryConfig .attempts )
21462143
21472144 w .stream , err = w .c .raw .BidiWriteObject (ctx )
21482145 if err != nil {
@@ -2188,7 +2185,11 @@ sendBytes: // label this loop so that we can use a continue statement from a nes
21882185 // Retriable errors mean we should start over and attempt to
21892186 // resend the entire buffer via a new stream.
21902187 // If not retriable, falling through will return the error received.
2191- if shouldRetry (err ) {
2188+ err = retryConfig .retriable (w .ctx , err )
2189+
2190+ if err == nil {
2191+ retryConfig .doBackOff (w .ctx )
2192+
21922193 // TODO: Add test case for failure modes of querying progress.
21932194 writeOffset , err = w .determineOffset (start )
21942195 if err != nil {
@@ -2230,11 +2231,17 @@ sendBytes: // label this loop so that we can use a continue statement from a nes
22302231 if ! lastWriteOfEntireObject {
22312232 resp , err := w .stream .Recv ()
22322233
2233- // Retriable errors mean we should start over and attempt to
2234- // resend the entire buffer via a new stream.
2235- // If not retriable, falling through will return the error received
2236- // from closing the stream.
2237- if shouldRetry (err ) {
2234+ if err != nil {
2235+ // Retriable errors mean we should start over and attempt to
2236+ // resend the entire buffer via a new stream.
2237+ // If not retriable, falling through will return the error received
2238+ // from closing the stream.
2239+ err = retryConfig .retriable (w .ctx , err )
2240+ if err != nil {
2241+ return nil , 0 , err
2242+ }
2243+
2244+ retryConfig .doBackOff (w .ctx )
22382245 writeOffset , err = w .determineOffset (start )
22392246 if err != nil {
22402247 return nil , 0 , err
@@ -2246,9 +2253,6 @@ sendBytes: // label this loop so that we can use a continue statement from a nes
22462253
22472254 continue sendBytes
22482255 }
2249- if err != nil {
2250- return nil , 0 , err
2251- }
22522256
22532257 if resp .GetPersistedSize () != writeOffset {
22542258 // Retry if not all bytes were persisted.
@@ -2274,7 +2278,14 @@ sendBytes: // label this loop so that we can use a continue statement from a nes
22742278 var obj * storagepb.Object
22752279 for obj == nil {
22762280 resp , err := w .stream .Recv ()
2277- if shouldRetry (err ) {
2281+
2282+ if err != nil {
2283+ err = retryConfig .retriable (w .ctx , err )
2284+ if err != nil {
2285+ return nil , 0 , err
2286+ }
2287+ retryConfig .doBackOff (w .ctx )
2288+
22782289 writeOffset , err = w .determineOffset (start )
22792290 if err != nil {
22802291 return nil , 0 , err
@@ -2283,9 +2294,6 @@ sendBytes: // label this loop so that we can use a continue statement from a nes
22832294 w .stream = nil
22842295 continue sendBytes
22852296 }
2286- if err != nil {
2287- return nil , 0 , err
2288- }
22892297
22902298 obj = resp .GetResource ()
22912299 }
@@ -2370,3 +2378,89 @@ func checkCanceled(err error) error {
23702378
23712379 return err
23722380}
2381+
2382+ type uploadBufferRetryConfig struct {
2383+ attempts int
2384+ invocationID string
2385+ config * retryConfig
2386+ lastErr error
2387+ }
2388+
2389+ func newUploadBufferRetryConfig (settings * settings ) * uploadBufferRetryConfig {
2390+ config := settings .retry
2391+
2392+ if config == nil {
2393+ config = defaultRetry .clone ()
2394+ }
2395+
2396+ if config .shouldRetry == nil {
2397+ config .shouldRetry = ShouldRetry
2398+ }
2399+
2400+ if config .backoff == nil {
2401+ config .backoff = & gaxBackoff {}
2402+ } else {
2403+ config .backoff .SetMultiplier (settings .retry .backoff .GetMultiplier ())
2404+ config .backoff .SetInitial (settings .retry .backoff .GetInitial ())
2405+ config .backoff .SetMax (settings .retry .backoff .GetMax ())
2406+ }
2407+
2408+ return & uploadBufferRetryConfig {
2409+ attempts : 1 ,
2410+ invocationID : uuid .New ().String (),
2411+ config : config ,
2412+ }
2413+ }
2414+
2415+ // retriable determines if a retry is necessary and if so returns a nil error;
2416+ // otherwise it returns the error to be surfaced to the user.
2417+ func (retry * uploadBufferRetryConfig ) retriable (ctx context.Context , err error ) error {
2418+ if err == nil {
2419+ // a nil err does not need to be retried
2420+ return nil
2421+ }
2422+ if err != context .Canceled && err != context .DeadlineExceeded {
2423+ retry .lastErr = err
2424+ }
2425+
2426+ if retry .config .policy == RetryNever {
2427+ return err
2428+ }
2429+
2430+ if retry .config .maxAttempts != nil && retry .attempts >= * retry .config .maxAttempts {
2431+ return fmt .Errorf ("storage: retry failed after %v attempts; last error: %w" , retry .attempts , err )
2432+ }
2433+
2434+ retry .attempts ++
2435+
2436+ // Explicitly check context cancellation so that we can distinguish between a
2437+ // DEADLINE_EXCEEDED error from the server and a user-set context deadline.
2438+ // Unfortunately gRPC will codes.DeadlineExceeded (which may be retryable if it's
2439+ // sent by the server) in both cases.
2440+ ctxErr := ctx .Err ()
2441+ if errors .Is (ctxErr , context .Canceled ) || errors .Is (ctxErr , context .DeadlineExceeded ) {
2442+ if retry .lastErr != nil {
2443+ return fmt .Errorf ("retry failed with %v; last error: %w" , ctxErr , retry .lastErr )
2444+ }
2445+ return ctxErr
2446+ }
2447+
2448+ if ! retry .config .shouldRetry (err ) {
2449+ return err
2450+ }
2451+ return nil
2452+ }
2453+
2454+ // doBackOff pauses for the appropriate amount of time; it should be called after
2455+ // encountering a retriable error.
2456+ func (retry * uploadBufferRetryConfig ) doBackOff (ctx context.Context ) error {
2457+ p := retry .config .backoff .Pause ()
2458+
2459+ if ctxErr := gax .Sleep (ctx , p ); ctxErr != nil {
2460+ if retry .lastErr != nil {
2461+ return fmt .Errorf ("retry failed with %v; last error: %w" , ctxErr , retry .lastErr )
2462+ }
2463+ return ctxErr
2464+ }
2465+ return nil
2466+ }
0 commit comments