@@ -242,10 +242,76 @@ func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClie
242242 }
243243}
244244
245- // append handles the details of adding sending an append request on a stream. Appends are sent on a long
245+ // lockingAppend handles a single append attempt. When successful, it returns the number of rows
246+ // in the request for metrics tracking.
247+ func (ms * ManagedStream ) lockingAppend (requestCtx context.Context , pw * pendingWrite ) (int64 , error ) {
248+
249+ // Don't both calling/retrying if this append's context is already expired.
250+ if err := requestCtx .Err (); err != nil {
251+ return 0 , err
252+ }
253+
254+ // critical section: Things that need to happen inside the critical section:
255+ //
256+ // * Getting the stream connection (in case of reconnects)
257+ // * Issuing the append request
258+ // * Adding the pending write to the channel to keep ordering correct on response
259+ ms .mu .Lock ()
260+ defer ms .mu .Unlock ()
261+
262+ var arc * storagepb.BigQueryWrite_AppendRowsClient
263+ var ch chan * pendingWrite
264+ var err error
265+
266+ // If an updated schema is present, we need to reconnect the stream and update the reference
267+ // schema for the stream.
268+ reconnect := false
269+ if pw .newSchema != nil && ! proto .Equal (pw .newSchema , ms .schemaDescriptor ) {
270+ reconnect = true
271+ ms .schemaDescriptor = proto .Clone (pw .newSchema ).(* descriptorpb.DescriptorProto )
272+ }
273+ arc , ch , err = ms .getStream (arc , reconnect )
274+ if err != nil {
275+ return 0 , err
276+ }
277+
278+ // Resolve the special work for the first append on a stream.
279+ var req * storagepb.AppendRowsRequest
280+ ms .streamSetup .Do (func () {
281+ reqCopy := proto .Clone (pw .request ).(* storagepb.AppendRowsRequest )
282+ reqCopy .WriteStream = ms .streamSettings .streamID
283+ reqCopy .GetProtoRows ().WriterSchema = & storagepb.ProtoSchema {
284+ ProtoDescriptor : ms .schemaDescriptor ,
285+ }
286+ if ms .streamSettings .TraceID != "" {
287+ reqCopy .TraceId = ms .streamSettings .TraceID
288+ }
289+ req = reqCopy
290+ })
291+
292+ if req != nil {
293+ // First append in a new connection needs properties like schema and stream name set.
294+ err = (* arc ).Send (req )
295+ } else {
296+ // Subsequent requests need no modification.
297+ err = (* arc ).Send (pw .request )
298+ }
299+ if err != nil {
300+ return 0 , err
301+ }
302+ // Compute numRows, once we pass ownership to the channel the request may be
303+ // cleared.
304+ numRows := int64 (len (pw .request .GetProtoRows ().Rows .GetSerializedRows ()))
305+ ch <- pw
306+ return numRows , nil
307+ }
308+
309+ // appendWithRetry handles the details of adding sending an append request on a stream. Appends are sent on a long
246310// lived bidirectional network stream, with it's own managed context (ms.ctx). requestCtx is checked
247311// for expiry to enable faster failures, it is not propagated more deeply.
248- func (ms * ManagedStream ) append (requestCtx context.Context , pw * pendingWrite , opts ... gax.CallOption ) error {
312+ func (ms * ManagedStream ) appendWithRetry (requestCtx context.Context , pw * pendingWrite , opts ... gax.CallOption ) error {
313+
314+ // Resolve retry settings.
249315 var settings gax.CallSettings
250316 for _ , opt := range opts {
251317 opt .Resolve (& settings )
@@ -255,104 +321,43 @@ func (ms *ManagedStream) append(requestCtx context.Context, pw *pendingWrite, op
255321 r = settings .Retry ()
256322 }
257323
258- var arc * storagepb.BigQueryWrite_AppendRowsClient
259- var ch chan * pendingWrite
260- var err error
261-
262324 for {
263- // critical section: Things that need to happen inside the critical section:
264- //
265- // * Getting the stream connection (in case of reconnects)
266- // * Issuing the append request
267- // * Adding the pending write to the channel to keep ordering correct on response
268- ms .mu .Lock ()
269-
270- // Don't both calling/retrying if this append's context is already expired.
271- if err = requestCtx .Err (); err != nil {
272- return err
273- }
274-
275- // If an updated schema is present, we need to reconnect the stream and update the reference
276- // schema for the stream.
277- reconnect := false
278- if pw .newSchema != nil && ! proto .Equal (pw .newSchema , ms .schemaDescriptor ) {
279- reconnect = true
280- ms .schemaDescriptor = proto .Clone (pw .newSchema ).(* descriptorpb.DescriptorProto )
281- }
282- arc , ch , err = ms .getStream (arc , reconnect )
283- if err != nil {
284- return err
285- }
286-
287- // Resolve the special work for the first append on a stream.
288- var req * storagepb.AppendRowsRequest
289- ms .streamSetup .Do (func () {
290- reqCopy := proto .Clone (pw .request ).(* storagepb.AppendRowsRequest )
291- reqCopy .WriteStream = ms .streamSettings .streamID
292- reqCopy .GetProtoRows ().WriterSchema = & storagepb.ProtoSchema {
293- ProtoDescriptor : ms .schemaDescriptor ,
325+ numRows , appendErr := ms .lockingAppend (requestCtx , pw )
326+ if appendErr != nil {
327+ // Append yielded an error. Retry by continuing or return.
328+ status := grpcstatus .Convert (appendErr )
329+ if status != nil {
330+ ctx , _ := tag .New (ms .ctx , tag .Insert (keyError , status .Code ().String ()))
331+ recordStat (ctx , AppendRequestErrors , 1 )
294332 }
295- if ms .streamSettings .TraceID != "" {
296- reqCopy .TraceId = ms .streamSettings .TraceID
333+ bo , shouldRetry := r .Retry (appendErr )
334+ if shouldRetry {
335+ if err := gax .Sleep (ms .ctx , bo ); err != nil {
336+ return err
337+ }
338+ continue
297339 }
298- req = reqCopy
299- })
300-
301- if req != nil {
302- // First append in a new connection needs properties like schema and stream name set.
303- err = (* arc ).Send (req )
304- } else {
305- // Subsequent requests need no modification.
306- err = (* arc ).Send (pw .request )
307- }
308- if err == nil {
309- // Compute numRows, once we pass ownership to the channel the request may be
310- // cleared.
311- numRows := int64 (len (pw .request .GetProtoRows ().Rows .GetSerializedRows ()))
312- ch <- pw
313- // We've passed ownership of the pending write to the channel.
314- // It's now responsible for marking the request done, we're done
315- // with the critical section.
340+ // We've got a non-retriable error, so propagate that up. and mark the write done.
341+ ms .mu .Lock ()
342+ ms .err = appendErr
343+ pw .markDone (NoStreamOffset , appendErr , ms .fc )
316344 ms .mu .Unlock ()
317-
318- // Record stats and return.
319- recordStat (ms .ctx , AppendRequests , 1 )
320- recordStat (ms .ctx , AppendRequestBytes , int64 (pw .reqSize ))
321- recordStat (ms .ctx , AppendRequestRows , numRows )
322- return nil
323- }
324- // Unlock the mutex for error cases.
325- ms .mu .Unlock ()
326-
327- // Append yielded an error. Retry by continuing or return.
328- status := grpcstatus .Convert (err )
329- if status != nil {
330- ctx , _ := tag .New (ms .ctx , tag .Insert (keyError , status .Code ().String ()))
331- recordStat (ctx , AppendRequestErrors , 1 )
332- }
333- bo , shouldRetry := r .Retry (err )
334- if shouldRetry {
335- if err := gax .Sleep (ms .ctx , bo ); err != nil {
336- return err
337- }
338- continue
345+ return appendErr
339346 }
340- // We've got a non-retriable error, so propagate that up. and mark the write done.
341- ms .mu .Lock ()
342- ms .err = err
343- pw .markDone (NoStreamOffset , err , ms .fc )
344- ms .mu .Unlock ()
345- return err
347+ recordStat (ms .ctx , AppendRequests , 1 )
348+ recordStat (ms .ctx , AppendRequestBytes , int64 (pw .reqSize ))
349+ recordStat (ms .ctx , AppendRequestRows , numRows )
350+ return nil
346351 }
347352}
348353
349354// Close closes a managed stream.
350355func (ms * ManagedStream ) Close () error {
351-
352- var arc * storagepb.BigQueryWrite_AppendRowsClient
353-
354356 // Critical section: get connection, close, mark closed.
355357 ms .mu .Lock ()
358+ defer ms .mu .Unlock ()
359+
360+ var arc * storagepb.BigQueryWrite_AppendRowsClient
356361 arc , ch , err := ms .getStream (arc , false )
357362 if err != nil {
358363 return err
@@ -361,18 +366,22 @@ func (ms *ManagedStream) Close() error {
361366 return fmt .Errorf ("no stream exists" )
362367 }
363368 err = (* arc ).CloseSend ()
364- if err == nil {
365- close (ch )
366- }
367- ms .err = io .EOF
368-
369- // Done with the critical section.
370- ms .mu .Unlock ()
371- // Propagate cancellation.
369+ // Regardless of the outcome of CloseSend(), we're done with this channel.
370+ close (ch )
371+ // Additionally, cancel the underlying context for the stream, we don't allow re-open.
372372 if ms .cancel != nil {
373373 ms .cancel ()
374+ ms .cancel = nil
374375 }
375- return err
376+
377+ if err != nil {
378+ // For error on CloseSend, save that as the stream error and return.
379+ ms .err = err
380+ return err
381+ }
382+ // For normal operation, mark the stream error as io.EOF and return.
383+ ms .err = io .EOF
384+ return nil
376385}
377386
378387// AppendRows sends the append requests to the service, and returns a single AppendResult for tracking
@@ -401,7 +410,7 @@ func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ...
401410 var appendErr error
402411 go func () {
403412 select {
404- case errCh <- ms .append (ctx , pw ):
413+ case errCh <- ms .appendWithRetry (ctx , pw ):
405414 case <- ctx .Done ():
406415 }
407416 close (errCh )
0 commit comments