Skip to content

Commit a2925ce

Browse files
authored
fix(bigquery/storage/managedstorage): improve internal locking (#6304)
* fix(bigquery/storage/managedwriter): address potential deadlocks This PR addresses some potential deadlocks introduced as part of a refactor of critical section code. It adds a testing to help isolate deadlock code, and makes a small change to the retry predicate to disallow retries of context-based errors (cancelations/expirations).
1 parent c2eb4d9 commit a2925ce

3 files changed

Lines changed: 206 additions & 99 deletions

File tree

bigquery/storage/managedwriter/managed_stream.go

Lines changed: 107 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -242,10 +242,76 @@ func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClie
242242
}
243243
}
244244

245-
// append handles the details of adding sending an append request on a stream. Appends are sent on a long
245+
// lockingAppend handles a single append attempt. When successful, it returns the number of rows
246+
// in the request for metrics tracking.
247+
func (ms *ManagedStream) lockingAppend(requestCtx context.Context, pw *pendingWrite) (int64, error) {
248+
249+
// Don't both calling/retrying if this append's context is already expired.
250+
if err := requestCtx.Err(); err != nil {
251+
return 0, err
252+
}
253+
254+
// critical section: Things that need to happen inside the critical section:
255+
//
256+
// * Getting the stream connection (in case of reconnects)
257+
// * Issuing the append request
258+
// * Adding the pending write to the channel to keep ordering correct on response
259+
ms.mu.Lock()
260+
defer ms.mu.Unlock()
261+
262+
var arc *storagepb.BigQueryWrite_AppendRowsClient
263+
var ch chan *pendingWrite
264+
var err error
265+
266+
// If an updated schema is present, we need to reconnect the stream and update the reference
267+
// schema for the stream.
268+
reconnect := false
269+
if pw.newSchema != nil && !proto.Equal(pw.newSchema, ms.schemaDescriptor) {
270+
reconnect = true
271+
ms.schemaDescriptor = proto.Clone(pw.newSchema).(*descriptorpb.DescriptorProto)
272+
}
273+
arc, ch, err = ms.getStream(arc, reconnect)
274+
if err != nil {
275+
return 0, err
276+
}
277+
278+
// Resolve the special work for the first append on a stream.
279+
var req *storagepb.AppendRowsRequest
280+
ms.streamSetup.Do(func() {
281+
reqCopy := proto.Clone(pw.request).(*storagepb.AppendRowsRequest)
282+
reqCopy.WriteStream = ms.streamSettings.streamID
283+
reqCopy.GetProtoRows().WriterSchema = &storagepb.ProtoSchema{
284+
ProtoDescriptor: ms.schemaDescriptor,
285+
}
286+
if ms.streamSettings.TraceID != "" {
287+
reqCopy.TraceId = ms.streamSettings.TraceID
288+
}
289+
req = reqCopy
290+
})
291+
292+
if req != nil {
293+
// First append in a new connection needs properties like schema and stream name set.
294+
err = (*arc).Send(req)
295+
} else {
296+
// Subsequent requests need no modification.
297+
err = (*arc).Send(pw.request)
298+
}
299+
if err != nil {
300+
return 0, err
301+
}
302+
// Compute numRows, once we pass ownership to the channel the request may be
303+
// cleared.
304+
numRows := int64(len(pw.request.GetProtoRows().Rows.GetSerializedRows()))
305+
ch <- pw
306+
return numRows, nil
307+
}
308+
309+
// appendWithRetry handles the details of adding sending an append request on a stream. Appends are sent on a long
246310
// lived bidirectional network stream, with it's own managed context (ms.ctx). requestCtx is checked
247311
// for expiry to enable faster failures, it is not propagated more deeply.
248-
func (ms *ManagedStream) append(requestCtx context.Context, pw *pendingWrite, opts ...gax.CallOption) error {
312+
func (ms *ManagedStream) appendWithRetry(requestCtx context.Context, pw *pendingWrite, opts ...gax.CallOption) error {
313+
314+
// Resolve retry settings.
249315
var settings gax.CallSettings
250316
for _, opt := range opts {
251317
opt.Resolve(&settings)
@@ -255,104 +321,43 @@ func (ms *ManagedStream) append(requestCtx context.Context, pw *pendingWrite, op
255321
r = settings.Retry()
256322
}
257323

258-
var arc *storagepb.BigQueryWrite_AppendRowsClient
259-
var ch chan *pendingWrite
260-
var err error
261-
262324
for {
263-
// critical section: Things that need to happen inside the critical section:
264-
//
265-
// * Getting the stream connection (in case of reconnects)
266-
// * Issuing the append request
267-
// * Adding the pending write to the channel to keep ordering correct on response
268-
ms.mu.Lock()
269-
270-
// Don't both calling/retrying if this append's context is already expired.
271-
if err = requestCtx.Err(); err != nil {
272-
return err
273-
}
274-
275-
// If an updated schema is present, we need to reconnect the stream and update the reference
276-
// schema for the stream.
277-
reconnect := false
278-
if pw.newSchema != nil && !proto.Equal(pw.newSchema, ms.schemaDescriptor) {
279-
reconnect = true
280-
ms.schemaDescriptor = proto.Clone(pw.newSchema).(*descriptorpb.DescriptorProto)
281-
}
282-
arc, ch, err = ms.getStream(arc, reconnect)
283-
if err != nil {
284-
return err
285-
}
286-
287-
// Resolve the special work for the first append on a stream.
288-
var req *storagepb.AppendRowsRequest
289-
ms.streamSetup.Do(func() {
290-
reqCopy := proto.Clone(pw.request).(*storagepb.AppendRowsRequest)
291-
reqCopy.WriteStream = ms.streamSettings.streamID
292-
reqCopy.GetProtoRows().WriterSchema = &storagepb.ProtoSchema{
293-
ProtoDescriptor: ms.schemaDescriptor,
325+
numRows, appendErr := ms.lockingAppend(requestCtx, pw)
326+
if appendErr != nil {
327+
// Append yielded an error. Retry by continuing or return.
328+
status := grpcstatus.Convert(appendErr)
329+
if status != nil {
330+
ctx, _ := tag.New(ms.ctx, tag.Insert(keyError, status.Code().String()))
331+
recordStat(ctx, AppendRequestErrors, 1)
294332
}
295-
if ms.streamSettings.TraceID != "" {
296-
reqCopy.TraceId = ms.streamSettings.TraceID
333+
bo, shouldRetry := r.Retry(appendErr)
334+
if shouldRetry {
335+
if err := gax.Sleep(ms.ctx, bo); err != nil {
336+
return err
337+
}
338+
continue
297339
}
298-
req = reqCopy
299-
})
300-
301-
if req != nil {
302-
// First append in a new connection needs properties like schema and stream name set.
303-
err = (*arc).Send(req)
304-
} else {
305-
// Subsequent requests need no modification.
306-
err = (*arc).Send(pw.request)
307-
}
308-
if err == nil {
309-
// Compute numRows, once we pass ownership to the channel the request may be
310-
// cleared.
311-
numRows := int64(len(pw.request.GetProtoRows().Rows.GetSerializedRows()))
312-
ch <- pw
313-
// We've passed ownership of the pending write to the channel.
314-
// It's now responsible for marking the request done, we're done
315-
// with the critical section.
340+
// We've got a non-retriable error, so propagate that up. and mark the write done.
341+
ms.mu.Lock()
342+
ms.err = appendErr
343+
pw.markDone(NoStreamOffset, appendErr, ms.fc)
316344
ms.mu.Unlock()
317-
318-
// Record stats and return.
319-
recordStat(ms.ctx, AppendRequests, 1)
320-
recordStat(ms.ctx, AppendRequestBytes, int64(pw.reqSize))
321-
recordStat(ms.ctx, AppendRequestRows, numRows)
322-
return nil
323-
}
324-
// Unlock the mutex for error cases.
325-
ms.mu.Unlock()
326-
327-
// Append yielded an error. Retry by continuing or return.
328-
status := grpcstatus.Convert(err)
329-
if status != nil {
330-
ctx, _ := tag.New(ms.ctx, tag.Insert(keyError, status.Code().String()))
331-
recordStat(ctx, AppendRequestErrors, 1)
332-
}
333-
bo, shouldRetry := r.Retry(err)
334-
if shouldRetry {
335-
if err := gax.Sleep(ms.ctx, bo); err != nil {
336-
return err
337-
}
338-
continue
345+
return appendErr
339346
}
340-
// We've got a non-retriable error, so propagate that up. and mark the write done.
341-
ms.mu.Lock()
342-
ms.err = err
343-
pw.markDone(NoStreamOffset, err, ms.fc)
344-
ms.mu.Unlock()
345-
return err
347+
recordStat(ms.ctx, AppendRequests, 1)
348+
recordStat(ms.ctx, AppendRequestBytes, int64(pw.reqSize))
349+
recordStat(ms.ctx, AppendRequestRows, numRows)
350+
return nil
346351
}
347352
}
348353

349354
// Close closes a managed stream.
350355
func (ms *ManagedStream) Close() error {
351-
352-
var arc *storagepb.BigQueryWrite_AppendRowsClient
353-
354356
// Critical section: get connection, close, mark closed.
355357
ms.mu.Lock()
358+
defer ms.mu.Unlock()
359+
360+
var arc *storagepb.BigQueryWrite_AppendRowsClient
356361
arc, ch, err := ms.getStream(arc, false)
357362
if err != nil {
358363
return err
@@ -361,18 +366,22 @@ func (ms *ManagedStream) Close() error {
361366
return fmt.Errorf("no stream exists")
362367
}
363368
err = (*arc).CloseSend()
364-
if err == nil {
365-
close(ch)
366-
}
367-
ms.err = io.EOF
368-
369-
// Done with the critical section.
370-
ms.mu.Unlock()
371-
// Propagate cancellation.
369+
// Regardless of the outcome of CloseSend(), we're done with this channel.
370+
close(ch)
371+
// Additionally, cancel the underlying context for the stream, we don't allow re-open.
372372
if ms.cancel != nil {
373373
ms.cancel()
374+
ms.cancel = nil
374375
}
375-
return err
376+
377+
if err != nil {
378+
// For error on CloseSend, save that as the stream error and return.
379+
ms.err = err
380+
return err
381+
}
382+
// For normal operation, mark the stream error as io.EOF and return.
383+
ms.err = io.EOF
384+
return nil
376385
}
377386

378387
// AppendRows sends the append requests to the service, and returns a single AppendResult for tracking
@@ -401,7 +410,7 @@ func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ...
401410
var appendErr error
402411
go func() {
403412
select {
404-
case errCh <- ms.append(ctx, pw):
413+
case errCh <- ms.appendWithRetry(ctx, pw):
405414
case <-ctx.Done():
406415
}
407416
close(errCh)

bigquery/storage/managedwriter/managed_stream_test.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package managedwriter
1616

1717
import (
1818
"context"
19+
"errors"
1920
"runtime"
2021
"testing"
2122
"time"
@@ -94,6 +95,7 @@ type testAppendRowsClient struct {
9495
requests []*storagepb.AppendRowsRequest
9596
sendF func(*storagepb.AppendRowsRequest) error
9697
recvF func() (*storagepb.AppendRowsResponse, error)
98+
closeF func() error
9799
}
98100

99101
func (tarc *testAppendRowsClient) Send(req *storagepb.AppendRowsRequest) error {
@@ -104,6 +106,10 @@ func (tarc *testAppendRowsClient) Recv() (*storagepb.AppendRowsResponse, error)
104106
return tarc.recvF()
105107
}
106108

109+
func (tarc *testAppendRowsClient) CloseSend() error {
110+
return tarc.closeF()
111+
}
112+
107113
// openTestArc handles wiring in a test AppendRowsClient into a managedstream by providing the open function.
108114
func openTestArc(testARC *testAppendRowsClient, sendF func(req *storagepb.AppendRowsRequest) error, recvF func() (*storagepb.AppendRowsResponse, error)) func(s string, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
109115
sF := func(req *storagepb.AppendRowsRequest) error {
@@ -123,6 +129,9 @@ func openTestArc(testARC *testAppendRowsClient, sendF func(req *storagepb.Append
123129
}
124130
testARC.sendF = sF
125131
testARC.recvF = rF
132+
testARC.closeF = func() error {
133+
return nil
134+
}
126135
return func(s string, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
127136
testARC.openCount = testARC.openCount + 1
128137
return testARC, nil
@@ -291,6 +300,89 @@ func TestManagedStream_AppendWithDeadline(t *testing.T) {
291300

292301
}
293302

303+
func TestManagedStream_AppendDeadlocks(t *testing.T) {
304+
// Ensure we don't deadlock by issing two appends.
305+
testCases := []struct {
306+
desc string
307+
openErrors []error
308+
ctx context.Context
309+
respErr error
310+
}{
311+
{
312+
desc: "no errors",
313+
openErrors: []error{nil, nil},
314+
ctx: context.Background(),
315+
respErr: nil,
316+
},
317+
{
318+
desc: "cancelled caller context",
319+
openErrors: []error{nil, nil},
320+
ctx: func() context.Context {
321+
cctx, cancel := context.WithCancel(context.Background())
322+
cancel()
323+
return cctx
324+
}(),
325+
respErr: context.Canceled,
326+
},
327+
{
328+
desc: "expired caller context",
329+
openErrors: []error{nil, nil},
330+
ctx: func() context.Context {
331+
cctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
332+
defer cancel()
333+
time.Sleep(2 * time.Millisecond)
334+
return cctx
335+
}(),
336+
respErr: context.DeadlineExceeded,
337+
},
338+
{
339+
desc: "errored getstream",
340+
openErrors: []error{status.Errorf(codes.ResourceExhausted, "some error"), status.Errorf(codes.ResourceExhausted, "some error")},
341+
ctx: context.Background(),
342+
respErr: status.Errorf(codes.ResourceExhausted, "some error"),
343+
},
344+
}
345+
346+
for _, tc := range testCases {
347+
openF := openTestArc(&testAppendRowsClient{}, nil, nil)
348+
ms := &ManagedStream{
349+
ctx: context.Background(),
350+
open: func(s string, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
351+
if len(tc.openErrors) == 0 {
352+
panic("out of open errors")
353+
}
354+
curErr := tc.openErrors[0]
355+
tc.openErrors = tc.openErrors[1:]
356+
if curErr == nil {
357+
return openF(s, opts...)
358+
}
359+
return nil, curErr
360+
},
361+
streamSettings: &streamSettings{
362+
streamID: "foo",
363+
},
364+
}
365+
366+
// first append
367+
pw := newPendingWrite([][]byte{[]byte("foo")})
368+
gotErr := ms.appendWithRetry(tc.ctx, pw)
369+
if !errors.Is(gotErr, tc.respErr) {
370+
t.Errorf("%s first response: got %v, want %v", tc.desc, gotErr, tc.respErr)
371+
}
372+
// second append
373+
pw = newPendingWrite([][]byte{[]byte("bar")})
374+
gotErr = ms.appendWithRetry(tc.ctx, pw)
375+
if !errors.Is(gotErr, tc.respErr) {
376+
t.Errorf("%s second response: got %v, want %v", tc.desc, gotErr, tc.respErr)
377+
}
378+
379+
// Issue two closes, to ensure we're not deadlocking there either.
380+
ms.Close()
381+
ms.Close()
382+
}
383+
384+
}
385+
294386
func TestManagedStream_LeakingGoroutines(t *testing.T) {
295387
ctx := context.Background()
296388

bigquery/storage/managedwriter/retry.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
package managedwriter
1616

1717
import (
18+
"context"
19+
"errors"
1820
"time"
1921

2022
"github.com/googleapis/gax-go/v2"
@@ -31,7 +33,11 @@ func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool
3133
// retry predicates in addition to statuscode-based.
3234
s, ok := status.FromError(err)
3335
if !ok {
34-
// non-status based errors as retryable
36+
// Treat context errors as non-retriable.
37+
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
38+
return r.bo.Pause(), false
39+
}
40+
// Any other non-status based errors treated as retryable.
3541
return r.bo.Pause(), true
3642
}
3743
switch s.Code() {

0 commit comments

Comments
 (0)