@@ -68,11 +68,14 @@ var crc32cTable = crc32.MakeTable(crc32.Castagnoli)
6868//
6969// A Client is safe to use concurrently, except for its Close method.
7070type Client struct {
71- connPool gtransport.ConnPool
72- client btpb.BigtableClient
73- project , instance string
74- appProfile string
75- metricsTracerFactory * builtinMetricsTracerFactory
71+ connPool gtransport.ConnPool
72+ client btpb.BigtableClient
73+ project , instance string
74+ appProfile string
75+ metricsTracerFactory * builtinMetricsTracerFactory
76+ disableRetryInfo bool
77+ retryOption gax.CallOption
78+ executeQueryRetryOption gax.CallOption
7679}
7780
7881// ClientConfig has configurations for the client.
@@ -157,13 +160,27 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C
157160 return nil , err
158161 }
159162
163+ disableRetryInfo := false
164+
165+ // If DISABLE_RETRY_INFO=1, library does not base retry decision and back off time on server returned RetryInfo value.
166+ disableRetryInfoEnv := os .Getenv ("DISABLE_RETRY_INFO" )
167+ disableRetryInfo = disableRetryInfoEnv == "1"
168+ retryOption := defaultRetryOption
169+ executeQueryRetryOption := defaultExecuteQueryRetryOption
170+ if disableRetryInfo {
171+ retryOption = clientOnlyRetryOption
172+ executeQueryRetryOption = clientOnlyExecuteQueryRetryOption
173+ }
160174 return & Client {
161- connPool : connPool ,
162- client : btpb .NewBigtableClient (connPool ),
163- project : project ,
164- instance : instance ,
165- appProfile : config .AppProfile ,
166- metricsTracerFactory : metricsTracerFactory ,
175+ connPool : connPool ,
176+ client : btpb .NewBigtableClient (connPool ),
177+ project : project ,
178+ instance : instance ,
179+ appProfile : config .AppProfile ,
180+ metricsTracerFactory : metricsTracerFactory ,
181+ disableRetryInfo : disableRetryInfo ,
182+ retryOption : retryOption ,
183+ executeQueryRetryOption : executeQueryRetryOption ,
167184 }, nil
168185}
169186
@@ -176,21 +193,8 @@ func (c *Client) Close() error {
176193}
177194
178195var (
179- idempotentRetryCodes = []codes.Code {codes .DeadlineExceeded , codes .Unavailable , codes .Aborted }
180- isIdempotentRetryCode = make (map [codes.Code ]bool )
181-
182- defaultBackoff = gax.Backoff {
183- Initial : 100 * time .Millisecond ,
184- Max : 2 * time .Second ,
185- Multiplier : 1.2 ,
186- }
187- retryOptions = []gax.CallOption {
188- gax .WithRetry (func () gax.Retryer {
189- return & bigtableRetryer {
190- Backoff : defaultBackoff ,
191- }
192- }),
193- }
196+ idempotentRetryCodes = []codes.Code {codes .DeadlineExceeded , codes .Unavailable , codes .Aborted }
197+ isIdempotentRetryCode = make (map [codes.Code ]bool )
194198 retryableInternalErrMsgs = []string {
195199 "stream terminated by RST_STREAM" , // Retry similar to spanner client. Special case due to https://github.com/googleapis/google-cloud-go/issues/6476
196200
@@ -199,18 +203,56 @@ var (
199203 "RST_STREAM closed stream" ,
200204 "Received RST_STREAM" ,
201205 }
202-
203- executeQueryRetryOptions = []gax.CallOption {
204- gax .WithRetry (func () gax.Retryer {
205- backoff := defaultBackoff
206- return & bigtableRetryer {
207- alternateRetryCondition : isQueryExpiredViolation ,
208- Backoff : backoff ,
209- }
210- }),
206+ defaultBackoff = gax.Backoff {
207+ Initial : 100 * time .Millisecond ,
208+ Max : 2 * time .Second ,
209+ Multiplier : 1.2 ,
211210 }
211+ clientOnlyRetryOption = newRetryOption (clientOnlyRetry , true )
212+ clientOnlyExecuteQueryRetryOption = newRetryOption (clientOnlyExecuteQueryRetry , true )
213+ defaultRetryOption = newRetryOption (clientOnlyRetry , false )
214+ defaultExecuteQueryRetryOption = newRetryOption (clientOnlyExecuteQueryRetry , false )
212215)
213216
217+ func newRetryOption (retryFn func (* gax.Backoff , error ) (time.Duration , bool ), disableRetryInfo bool ) gax.CallOption {
218+ return gax .WithRetry (func () gax.Retryer {
219+ // Create a new Backoff instance for each retryer to ensure independent state.
220+ newBackoffInstance := gax.Backoff {
221+ Initial : defaultBackoff .Initial ,
222+ Max : defaultBackoff .Max ,
223+ Multiplier : defaultBackoff .Multiplier ,
224+ }
225+ return & bigtableRetryer {
226+ baseRetryFn : retryFn ,
227+ backoff : newBackoffInstance ,
228+ disableRetryInfo : disableRetryInfo ,
229+ }
230+ })
231+ }
232+
233+ func clientOnlyRetry (backoff * gax.Backoff , err error ) (time.Duration , bool ) {
234+ // Similar to gax.OnCodes but shares the backoff with INTERNAL retry messages check
235+ st , ok := status .FromError (err )
236+ if ! ok {
237+ return 0 , false
238+ }
239+ c := st .Code ()
240+ _ , isIdempotent := isIdempotentRetryCode [c ]
241+ if isIdempotent ||
242+ (status .Code (err ) == codes .Internal && containsAny (err .Error (), retryableInternalErrMsgs )) {
243+ pause := backoff .Pause ()
244+ return pause , true
245+ }
246+ return 0 , false
247+ }
248+
249+ func clientOnlyExecuteQueryRetry (backoff * gax.Backoff , err error ) (time.Duration , bool ) {
250+ if isQueryExpiredViolation (err ) {
251+ return backoff .Pause (), true
252+ }
253+ return clientOnlyRetry (backoff , err )
254+ }
255+
214256func isQueryExpiredViolation (err error ) bool {
215257 apiErr , ok := apierror .FromError (err )
216258 if ok && apiErr != nil && apiErr .Details ().PreconditionFailure != nil && status .Code (err ) == codes .FailedPrecondition {
@@ -223,16 +265,39 @@ func isQueryExpiredViolation(err error) bool {
223265 return false
224266}
225267
226- // bigtableRetryer extends the generic gax Retryer, but also checks
227- // error messages to check if operation can be retried
228- //
229- // Retry is made if :
230- // - error code is one of the `idempotentRetryCodes` OR
231- // - error code is internal and error message is one of the `retryableInternalErrMsgs` OR
232- // - alternateRetryCondition returns true.
268+ // bigtableRetryer implements the gax.Retryer interface. It manages retry decisions,
269+ // incorporating server-sent RetryInfo if enabled, and client-side exponential backoff.
270+ // It specifically handles reseting the client-side backoff to its initial state if
271+ // RetryInfo was previously used for an operation and then stops being provided.
233272type bigtableRetryer struct {
234- alternateRetryCondition func (error ) bool
235- gax.Backoff
273+ baseRetryFn func (* gax.Backoff , error ) (time.Duration , bool )
274+ backoff gax.Backoff
275+ disableRetryInfo bool // If true, this retryer will process server-sent RetryInfo.
276+ wasLastDelayFromRetryInfo bool // true if the previous retry delay for this operation was from RetryInfo.
277+
278+ }
279+
280+ // Retry determines if an operation should be retried and for how long to wait.
281+ func (r * bigtableRetryer ) Retry (err error ) (time.Duration , bool ) {
282+ if ! r .disableRetryInfo {
283+ apiErr , ok := apierror .FromError (err )
284+ if ok && apiErr != nil && apiErr .Details ().RetryInfo != nil {
285+ // RetryInfo is present in the current error. Use its delay.
286+ r .wasLastDelayFromRetryInfo = true
287+ return apiErr .Details ().RetryInfo .GetRetryDelay ().AsDuration (), true
288+ }
289+
290+ if r .wasLastDelayFromRetryInfo {
291+ r .backoff = gax.Backoff {
292+ Initial : r .backoff .Initial ,
293+ Max : r .backoff .Max ,
294+ Multiplier : r .backoff .Multiplier ,
295+ }
296+ }
297+ r .wasLastDelayFromRetryInfo = false
298+ }
299+
300+ return r .baseRetryFn (& r .backoff , err )
236301}
237302
238303func containsAny (str string , substrs []string ) bool {
@@ -244,23 +309,6 @@ func containsAny(str string, substrs []string) bool {
244309 return false
245310}
246311
247- func (r * bigtableRetryer ) Retry (err error ) (time.Duration , bool ) {
248- // Similar to gax.OnCodes but shares the backoff with INTERNAL retry messages check
249- st , ok := status .FromError (err )
250- if ! ok {
251- return 0 , false
252- }
253- c := st .Code ()
254- _ , isIdempotent := isIdempotentRetryCode [c ]
255- if isIdempotent ||
256- (status .Code (err ) == codes .Internal && containsAny (err .Error (), retryableInternalErrMsgs )) ||
257- (r .alternateRetryCondition != nil && r .alternateRetryCondition (err )) {
258- pause := r .Backoff .Pause ()
259- return pause , true
260- }
261- return 0 , false
262- }
263-
264312func init () {
265313 for _ , code := range idempotentRetryCodes {
266314 isIdempotentRetryCode [code ] = true
@@ -354,6 +402,7 @@ func (c *Client) newFeatureFlags() metadata.MD {
354402 ReverseScans : true ,
355403 LastScannedRowResponses : true ,
356404 ClientSideMetricsEnabled : c .metricsTracerFactory .enabled ,
405+ RetryInfo : ! c .disableRetryInfo ,
357406 }
358407
359408 val := ""
@@ -517,7 +566,7 @@ func (c *Client) prepareStatement(ctx context.Context, mt *builtinMetricsTracer,
517566 var err error
518567 res , err = c .client .PrepareQuery (ctx , req , grpc .Header (headerMD ), grpc .Trailer (trailerMD ))
519568 return err
520- }, retryOptions ... )
569+ }, c . retryOption )
521570 if err != nil {
522571 return nil , err
523572 }
@@ -575,12 +624,12 @@ func (ps *PreparedStatement) Bind(values map[string]any) (*BoundStatement, error
575624
576625func (ps * PreparedStatement ) refreshIfInvalid (ctx context.Context ) error {
577626 /*
578- | valid | validEarly | behaviour |
579- |-------|------------|----------------------|
580- | true | true | nil |
581- | false | true | impossible condition |
582- | true | false | async refresh token |
583- | false | false | sync refresh token |
627+ | valid | validEarly | behaviour |
628+ |-------|------------|----------------------|
629+ | true | true | nil |
630+ | false | true | impossible condition |
631+ | true | false | async refresh token |
632+ | false | false | sync refresh token |
584633 */
585634 valid , validEarly := ps .valid ()
586635 if validEarly {
@@ -859,7 +908,7 @@ func (bs *BoundStatement) execute(ctx context.Context, f func(ResultRow) bool, m
859908 }
860909 }
861910 }
862- }, executeQueryRetryOptions ... )
911+ }, bs . ps . c . executeQueryRetryOption )
863912 if err != nil {
864913 return err
865914 }
@@ -1063,7 +1112,7 @@ func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *
10631112 }
10641113 }
10651114 return err
1066- }, retryOptions ... )
1115+ }, t . c . retryOption )
10671116
10681117 return err
10691118}
@@ -1616,7 +1665,7 @@ func (t *Table) apply(ctx context.Context, mt *builtinMetricsTracer, row string,
16161665 req .AuthorizedViewName = t .c .fullAuthorizedViewName (t .table , t .authorizedView )
16171666 }
16181667 if mutationsAreRetryable (m .ops ) {
1619- callOptions = retryOptions
1668+ callOptions = append ( callOptions , t . c . retryOption )
16201669 }
16211670 var res * btpb.MutateRowResponse
16221671 err := gaxInvokeWithRecorder (ctx , mt , "MutateRow" , func (ctx context.Context , headerMD , trailerMD * metadata.MD , _ gax.CallSettings ) error {
@@ -1884,7 +1933,7 @@ func (t *Table) applyGroup(ctx context.Context, group []*entryErr, opts ...Apply
18841933 return status .Errorf (idempotentRetryCodes [0 ], "Synthetic error: partial failure of ApplyBulk" )
18851934 }
18861935 return nil
1887- }, retryOptions ... )
1936+ }, t . c . retryOption )
18881937
18891938 statusCode , statusErr := convertToGrpcStatusErr (err )
18901939 mt .currOp .setStatus (statusCode .String ())
@@ -2161,7 +2210,7 @@ func (t *Table) sampleRowKeys(ctx context.Context, mt *builtinMetricsTracer) ([]
21612210 sampledRowKeys = append (sampledRowKeys , key )
21622211 }
21632212 return nil
2164- }, retryOptions ... )
2213+ }, t . c . retryOption )
21652214
21662215 return sampledRowKeys , err
21672216}
0 commit comments