Skip to content

Commit 0510711

Browse files
authored
feat(bigtable): Use delay from RetryInfo (#11955)
* feat(bigtable): Use delay from RetryInfo * refactor * refactor * rename * fix apidiff * correct the names * rename
1 parent 5a89e14 commit 0510711

File tree

3 files changed

+155
-78
lines changed

3 files changed

+155
-78
lines changed

bigtable/bigtable.go

Lines changed: 122 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,14 @@ var crc32cTable = crc32.MakeTable(crc32.Castagnoli)
6868
//
6969
// A Client is safe to use concurrently, except for its Close method.
7070
type Client struct {
71-
connPool gtransport.ConnPool
72-
client btpb.BigtableClient
73-
project, instance string
74-
appProfile string
75-
metricsTracerFactory *builtinMetricsTracerFactory
71+
connPool gtransport.ConnPool
72+
client btpb.BigtableClient
73+
project, instance string
74+
appProfile string
75+
metricsTracerFactory *builtinMetricsTracerFactory
76+
disableRetryInfo bool
77+
retryOption gax.CallOption
78+
executeQueryRetryOption gax.CallOption
7679
}
7780

7881
// ClientConfig has configurations for the client.
@@ -157,13 +160,27 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C
157160
return nil, err
158161
}
159162

163+
disableRetryInfo := false
164+
165+
// If DISABLE_RETRY_INFO=1, library does not base retry decision and back off time on server returned RetryInfo value.
166+
disableRetryInfoEnv := os.Getenv("DISABLE_RETRY_INFO")
167+
disableRetryInfo = disableRetryInfoEnv == "1"
168+
retryOption := defaultRetryOption
169+
executeQueryRetryOption := defaultExecuteQueryRetryOption
170+
if disableRetryInfo {
171+
retryOption = clientOnlyRetryOption
172+
executeQueryRetryOption = clientOnlyExecuteQueryRetryOption
173+
}
160174
return &Client{
161-
connPool: connPool,
162-
client: btpb.NewBigtableClient(connPool),
163-
project: project,
164-
instance: instance,
165-
appProfile: config.AppProfile,
166-
metricsTracerFactory: metricsTracerFactory,
175+
connPool: connPool,
176+
client: btpb.NewBigtableClient(connPool),
177+
project: project,
178+
instance: instance,
179+
appProfile: config.AppProfile,
180+
metricsTracerFactory: metricsTracerFactory,
181+
disableRetryInfo: disableRetryInfo,
182+
retryOption: retryOption,
183+
executeQueryRetryOption: executeQueryRetryOption,
167184
}, nil
168185
}
169186

@@ -176,21 +193,8 @@ func (c *Client) Close() error {
176193
}
177194

178195
var (
179-
idempotentRetryCodes = []codes.Code{codes.DeadlineExceeded, codes.Unavailable, codes.Aborted}
180-
isIdempotentRetryCode = make(map[codes.Code]bool)
181-
182-
defaultBackoff = gax.Backoff{
183-
Initial: 100 * time.Millisecond,
184-
Max: 2 * time.Second,
185-
Multiplier: 1.2,
186-
}
187-
retryOptions = []gax.CallOption{
188-
gax.WithRetry(func() gax.Retryer {
189-
return &bigtableRetryer{
190-
Backoff: defaultBackoff,
191-
}
192-
}),
193-
}
196+
idempotentRetryCodes = []codes.Code{codes.DeadlineExceeded, codes.Unavailable, codes.Aborted}
197+
isIdempotentRetryCode = make(map[codes.Code]bool)
194198
retryableInternalErrMsgs = []string{
195199
"stream terminated by RST_STREAM", // Retry similar to spanner client. Special case due to https://github.com/googleapis/google-cloud-go/issues/6476
196200

@@ -199,18 +203,56 @@ var (
199203
"RST_STREAM closed stream",
200204
"Received RST_STREAM",
201205
}
202-
203-
executeQueryRetryOptions = []gax.CallOption{
204-
gax.WithRetry(func() gax.Retryer {
205-
backoff := defaultBackoff
206-
return &bigtableRetryer{
207-
alternateRetryCondition: isQueryExpiredViolation,
208-
Backoff: backoff,
209-
}
210-
}),
206+
defaultBackoff = gax.Backoff{
207+
Initial: 100 * time.Millisecond,
208+
Max: 2 * time.Second,
209+
Multiplier: 1.2,
211210
}
211+
clientOnlyRetryOption = newRetryOption(clientOnlyRetry, true)
212+
clientOnlyExecuteQueryRetryOption = newRetryOption(clientOnlyExecuteQueryRetry, true)
213+
defaultRetryOption = newRetryOption(clientOnlyRetry, false)
214+
defaultExecuteQueryRetryOption = newRetryOption(clientOnlyExecuteQueryRetry, false)
212215
)
213216

217+
func newRetryOption(retryFn func(*gax.Backoff, error) (time.Duration, bool), disableRetryInfo bool) gax.CallOption {
218+
return gax.WithRetry(func() gax.Retryer {
219+
// Create a new Backoff instance for each retryer to ensure independent state.
220+
newBackoffInstance := gax.Backoff{
221+
Initial: defaultBackoff.Initial,
222+
Max: defaultBackoff.Max,
223+
Multiplier: defaultBackoff.Multiplier,
224+
}
225+
return &bigtableRetryer{
226+
baseRetryFn: retryFn,
227+
backoff: newBackoffInstance,
228+
disableRetryInfo: disableRetryInfo,
229+
}
230+
})
231+
}
232+
233+
func clientOnlyRetry(backoff *gax.Backoff, err error) (time.Duration, bool) {
234+
// Similar to gax.OnCodes but shares the backoff with INTERNAL retry messages check
235+
st, ok := status.FromError(err)
236+
if !ok {
237+
return 0, false
238+
}
239+
c := st.Code()
240+
_, isIdempotent := isIdempotentRetryCode[c]
241+
if isIdempotent ||
242+
(status.Code(err) == codes.Internal && containsAny(err.Error(), retryableInternalErrMsgs)) {
243+
pause := backoff.Pause()
244+
return pause, true
245+
}
246+
return 0, false
247+
}
248+
249+
func clientOnlyExecuteQueryRetry(backoff *gax.Backoff, err error) (time.Duration, bool) {
250+
if isQueryExpiredViolation(err) {
251+
return backoff.Pause(), true
252+
}
253+
return clientOnlyRetry(backoff, err)
254+
}
255+
214256
func isQueryExpiredViolation(err error) bool {
215257
apiErr, ok := apierror.FromError(err)
216258
if ok && apiErr != nil && apiErr.Details().PreconditionFailure != nil && status.Code(err) == codes.FailedPrecondition {
@@ -223,16 +265,39 @@ func isQueryExpiredViolation(err error) bool {
223265
return false
224266
}
225267

226-
// bigtableRetryer extends the generic gax Retryer, but also checks
227-
// error messages to check if operation can be retried
228-
//
229-
// Retry is made if :
230-
// - error code is one of the `idempotentRetryCodes` OR
231-
// - error code is internal and error message is one of the `retryableInternalErrMsgs` OR
232-
// - alternateRetryCondition returns true.
268+
// bigtableRetryer implements the gax.Retryer interface. It manages retry decisions,
269+
// incorporating server-sent RetryInfo if enabled, and client-side exponential backoff.
270+
// It specifically handles reseting the client-side backoff to its initial state if
271+
// RetryInfo was previously used for an operation and then stops being provided.
233272
type bigtableRetryer struct {
234-
alternateRetryCondition func(error) bool
235-
gax.Backoff
273+
baseRetryFn func(*gax.Backoff, error) (time.Duration, bool)
274+
backoff gax.Backoff
275+
disableRetryInfo bool // If true, this retryer will process server-sent RetryInfo.
276+
wasLastDelayFromRetryInfo bool // true if the previous retry delay for this operation was from RetryInfo.
277+
278+
}
279+
280+
// Retry determines if an operation should be retried and for how long to wait.
281+
func (r *bigtableRetryer) Retry(err error) (time.Duration, bool) {
282+
if !r.disableRetryInfo {
283+
apiErr, ok := apierror.FromError(err)
284+
if ok && apiErr != nil && apiErr.Details().RetryInfo != nil {
285+
// RetryInfo is present in the current error. Use its delay.
286+
r.wasLastDelayFromRetryInfo = true
287+
return apiErr.Details().RetryInfo.GetRetryDelay().AsDuration(), true
288+
}
289+
290+
if r.wasLastDelayFromRetryInfo {
291+
r.backoff = gax.Backoff{
292+
Initial: r.backoff.Initial,
293+
Max: r.backoff.Max,
294+
Multiplier: r.backoff.Multiplier,
295+
}
296+
}
297+
r.wasLastDelayFromRetryInfo = false
298+
}
299+
300+
return r.baseRetryFn(&r.backoff, err)
236301
}
237302

238303
func containsAny(str string, substrs []string) bool {
@@ -244,23 +309,6 @@ func containsAny(str string, substrs []string) bool {
244309
return false
245310
}
246311

247-
func (r *bigtableRetryer) Retry(err error) (time.Duration, bool) {
248-
// Similar to gax.OnCodes but shares the backoff with INTERNAL retry messages check
249-
st, ok := status.FromError(err)
250-
if !ok {
251-
return 0, false
252-
}
253-
c := st.Code()
254-
_, isIdempotent := isIdempotentRetryCode[c]
255-
if isIdempotent ||
256-
(status.Code(err) == codes.Internal && containsAny(err.Error(), retryableInternalErrMsgs)) ||
257-
(r.alternateRetryCondition != nil && r.alternateRetryCondition(err)) {
258-
pause := r.Backoff.Pause()
259-
return pause, true
260-
}
261-
return 0, false
262-
}
263-
264312
func init() {
265313
for _, code := range idempotentRetryCodes {
266314
isIdempotentRetryCode[code] = true
@@ -354,6 +402,7 @@ func (c *Client) newFeatureFlags() metadata.MD {
354402
ReverseScans: true,
355403
LastScannedRowResponses: true,
356404
ClientSideMetricsEnabled: c.metricsTracerFactory.enabled,
405+
RetryInfo: !c.disableRetryInfo,
357406
}
358407

359408
val := ""
@@ -517,7 +566,7 @@ func (c *Client) prepareStatement(ctx context.Context, mt *builtinMetricsTracer,
517566
var err error
518567
res, err = c.client.PrepareQuery(ctx, req, grpc.Header(headerMD), grpc.Trailer(trailerMD))
519568
return err
520-
}, retryOptions...)
569+
}, c.retryOption)
521570
if err != nil {
522571
return nil, err
523572
}
@@ -575,12 +624,12 @@ func (ps *PreparedStatement) Bind(values map[string]any) (*BoundStatement, error
575624

576625
func (ps *PreparedStatement) refreshIfInvalid(ctx context.Context) error {
577626
/*
578-
| valid | validEarly | behaviour |
579-
|-------|------------|----------------------|
580-
| true | true | nil |
581-
| false | true | impossible condition |
582-
| true | false | async refresh token |
583-
| false | false | sync refresh token |
627+
| valid | validEarly | behaviour |
628+
|-------|------------|----------------------|
629+
| true | true | nil |
630+
| false | true | impossible condition |
631+
| true | false | async refresh token |
632+
| false | false | sync refresh token |
584633
*/
585634
valid, validEarly := ps.valid()
586635
if validEarly {
@@ -859,7 +908,7 @@ func (bs *BoundStatement) execute(ctx context.Context, f func(ResultRow) bool, m
859908
}
860909
}
861910
}
862-
}, executeQueryRetryOptions...)
911+
}, bs.ps.c.executeQueryRetryOption)
863912
if err != nil {
864913
return err
865914
}
@@ -1063,7 +1112,7 @@ func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *
10631112
}
10641113
}
10651114
return err
1066-
}, retryOptions...)
1115+
}, t.c.retryOption)
10671116

10681117
return err
10691118
}
@@ -1616,7 +1665,7 @@ func (t *Table) apply(ctx context.Context, mt *builtinMetricsTracer, row string,
16161665
req.AuthorizedViewName = t.c.fullAuthorizedViewName(t.table, t.authorizedView)
16171666
}
16181667
if mutationsAreRetryable(m.ops) {
1619-
callOptions = retryOptions
1668+
callOptions = append(callOptions, t.c.retryOption)
16201669
}
16211670
var res *btpb.MutateRowResponse
16221671
err := gaxInvokeWithRecorder(ctx, mt, "MutateRow", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error {
@@ -1884,7 +1933,7 @@ func (t *Table) applyGroup(ctx context.Context, group []*entryErr, opts ...Apply
18841933
return status.Errorf(idempotentRetryCodes[0], "Synthetic error: partial failure of ApplyBulk")
18851934
}
18861935
return nil
1887-
}, retryOptions...)
1936+
}, t.c.retryOption)
18881937

18891938
statusCode, statusErr := convertToGrpcStatusErr(err)
18901939
mt.currOp.setStatus(statusCode.String())
@@ -2161,7 +2210,7 @@ func (t *Table) sampleRowKeys(ctx context.Context, mt *builtinMetricsTracer) ([]
21612210
sampledRowKeys = append(sampledRowKeys, key)
21622211
}
21632212
return nil
2164-
}, retryOptions...)
2213+
}, t.c.retryOption)
21652214

21662215
return sampledRowKeys, err
21672216
}
Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
11
TestMutateRows_Retry_WithRoutingCookie\|
2-
TestMutateRows_Retry_WithRetryInfo\|
32
TestReadRow_Retry_WithRoutingCookie\|
4-
TestReadRow_Retry_WithRetryInfo\|
53
TestReadRows_Retry_WithRoutingCookie\|
64
TestReadRows_Retry_WithRoutingCookie_MultipleErrorResponses\|
7-
TestReadRows_Retry_WithRetryInfo\|
85
TestReadRows_Retry_WithRetryInfo_MultipleErrorResponse\|
9-
TestSampleRowKeys_Retry_WithRoutingCookie\|
10-
TestSampleRowKeys_Retry_WithRetryInfo
6+
TestSampleRowKeys_Retry_WithRoutingCookie\|

bigtable/retry_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -592,6 +592,38 @@ func TestRetryReverseReadRows(t *testing.T) {
592592
}
593593
}
594594

595+
func TestRetryOptionSelection(t *testing.T) {
596+
ctx := context.Background()
597+
project := "test-project"
598+
instance := "test-instance"
599+
600+
t.Run("DefaultRetryLogic", func(t *testing.T) {
601+
client, err := NewClientWithConfig(ctx, project, instance, disableMetricsConfig)
602+
if err != nil {
603+
t.Fatalf("NewClientWithConfig: %v", err)
604+
}
605+
defer client.Close()
606+
607+
if client.disableRetryInfo {
608+
t.Errorf("client.disableRetryInfo got: true, want: false")
609+
}
610+
})
611+
612+
t.Run("ClientOnlyRetryLogic", func(t *testing.T) {
613+
t.Setenv("DISABLE_RETRY_INFO", "1")
614+
615+
client, err := NewClientWithConfig(ctx, project, instance, disableMetricsConfig)
616+
if err != nil {
617+
t.Fatalf("NewClientWithConfig: %v", err)
618+
}
619+
defer client.Close()
620+
621+
if !client.disableRetryInfo {
622+
t.Errorf("client.disableRetryInfo got: false, want: true")
623+
}
624+
})
625+
}
626+
595627
func writeReadRowsResponse(ss grpc.ServerStream, rowKeys ...string) error {
596628
var chunks []*btpb.ReadRowsResponse_CellChunk
597629
for _, key := range rowKeys {

0 commit comments

Comments
 (0)