@@ -26,6 +26,7 @@ import {grpc} from 'google-gax';
2626
2727import { codec , JSONOptions , Json , Field , Value } from './codec' ;
2828import { google } from '../protos/protos' ;
29+ import * as stream from 'stream' ;
2930
3031export type ResumeToken = string | Uint8Array ;
3132
@@ -141,6 +142,7 @@ export class PartialResultStream extends Transform implements ResultEvents {
141142 private _fields ! : google . spanner . v1 . StructType . Field [ ] ;
142143 private _options : RowOptions ;
143144 private _pendingValue ?: p . IValue ;
145+ private _pendingValueForResume ?: p . IValue ;
144146 private _values : p . IValue [ ] ;
145147 private _numPushFailed = 0 ;
146148 constructor ( options = { } ) {
@@ -240,6 +242,15 @@ export class PartialResultStream extends Transform implements ResultEvents {
240242 }
241243 }
242244
245+ _clearPendingValues ( ) {
246+ this . _values = [ ] ;
247+ if ( this . _pendingValueForResume ) {
248+ this . _pendingValue = this . _pendingValueForResume ;
249+ } else {
250+ delete this . _pendingValue ;
251+ }
252+ }
253+
243254 /**
244255 * Manages any chunked values.
245256 *
@@ -268,6 +279,11 @@ export class PartialResultStream extends Transform implements ResultEvents {
268279 // chunk to be processed.
269280 if ( chunk . chunkedValue ) {
270281 this . _pendingValue = values . pop ( ) ;
282+ if ( _hasResumeToken ( chunk ) ) {
283+ this . _pendingValueForResume = this . _pendingValue ;
284+ }
285+ } else if ( _hasResumeToken ( chunk ) ) {
286+ delete this . _pendingValueForResume ;
271287 }
272288
273289 let res = true ;
@@ -419,33 +435,51 @@ export function partialResultStream(
419435 options ?: RowOptions
420436) : PartialResultStream {
421437 const retryableCodes = [ grpc . status . UNAVAILABLE ] ;
438+ const maxQueued = 10 ;
422439 let lastResumeToken : ResumeToken ;
423- let lastRetriedErr : grpc . ServiceError | undefined ;
424440 let lastRequestStream : Readable ;
425441
426442 // mergeStream allows multiple streams to be connected into one. This is good;
427443 // if we need to retry a request and pipe more data to the user's stream.
444+ // We also add an additional stream that can be used to flush any remaining
445+ // items in the checkpoint stream that have been received, and that did not
446+ // contain a resume token.
428447 const requestsStream = mergeStream ( ) ;
448+ const flushStream = new stream . PassThrough ( { objectMode : true } ) ;
449+ requestsStream . add ( flushStream ) ;
429450 const partialRSStream = new PartialResultStream ( options ) ;
430451 const userStream = streamEvents ( partialRSStream ) ;
452+ // We keep track of the number of PartialResultSets that did not include a
453+ // resume token, as that is an indication whether it is safe to retry the
454+ // stream halfway.
455+ let withoutCheckpointCount = 0 ;
431456 const batchAndSplitOnTokenStream = checkpointStream . obj ( {
432- maxQueued : 10 ,
433- isCheckpointFn : ( row : google . spanner . v1 . PartialResultSet ) : boolean => {
434- return is . defined ( row . resumeToken ) ;
457+ maxQueued,
458+ isCheckpointFn : ( chunk : google . spanner . v1 . PartialResultSet ) : boolean => {
459+ const withCheckpoint = _hasResumeToken ( chunk ) ;
460+ if ( withCheckpoint ) {
461+ withoutCheckpointCount = 0 ;
462+ } else {
463+ withoutCheckpointCount ++ ;
464+ }
465+ return withCheckpoint ;
435466 } ,
436467 } ) ;
437468
438469 // This listener ensures that the last request that executed successfully
439470 // after one or more retries will end the requestsStream.
440471 const endListener = ( ) => {
441- if ( lastRetriedErr ) {
442- setImmediate ( ( ) => requestsStream . end ( ) ) ;
443- }
472+ setImmediate ( ( ) => {
473+ // Push a fake PartialResultSet without any values but with a resume token
474+ // into the stream to ensure that the checkpoint stream is emptied, and
475+ // then push `null` to end the stream.
476+ flushStream . push ( { resumeToken : '_' } ) ;
477+ flushStream . push ( null ) ;
478+ requestsStream . end ( ) ;
479+ } ) ;
444480 } ;
445481 const makeRequest = ( ) : void => {
446- if ( lastRequestStream ) {
447- lastRequestStream . removeListener ( 'end' , endListener ) ;
448- }
482+ partialRSStream . _clearPendingValues ( ) ;
449483 lastRequestStream = requestFn ( lastResumeToken ) ;
450484 lastRequestStream . on ( 'end' , endListener ) ;
451485 requestsStream . add ( lastRequestStream ) ;
@@ -456,30 +490,44 @@ export function partialResultStream(
456490 ! (
457491 err . code &&
458492 ( retryableCodes ! . includes ( err . code ) || isRetryableInternalError ( err ) )
459- )
493+ ) ||
494+ // If we have received too many chunks without a resume token, it is not
495+ // safe to retry.
496+ withoutCheckpointCount > maxQueued
460497 ) {
461- // This is not a retryable error, so this will flush any rows the
498+ // This is not a retryable error so this will flush any rows the
462499 // checkpoint stream has queued. After that, we will destroy the
463500 // user's stream with the same error.
464501 setImmediate ( ( ) => batchAndSplitOnTokenStream . destroy ( err ) ) ;
465502 return ;
466503 }
467504
468- // We're going to retry from where we left off.
469- // Keep track of the fact that we retried an error in order to end the
470- // merged result stream.
471- lastRetriedErr = err ;
472- // Empty queued rows on the checkpoint stream (will not emit them to user).
473- batchAndSplitOnTokenStream . reset ( ) ;
474- makeRequest ( ) ;
505+ if ( lastRequestStream ) {
506+ lastRequestStream . removeListener ( 'end' , endListener ) ;
507+ lastRequestStream . destroy ( ) ;
508+ }
509+ // Delay the retry until all the values that are already in the stream
510+ // pipeline have been handled. This ensures that the checkpoint stream is
511+ // reset to the correct point. Calling .reset() directly here could cause
512+ // any values that are currently in the pipeline and that have not been
513+ // handled yet, to be pushed twice into the entire stream.
514+ setImmediate ( ( ) => {
515+ // Empty queued rows on the checkpoint stream (will not emit them to user).
516+ batchAndSplitOnTokenStream . reset ( ) ;
517+ makeRequest ( ) ;
518+ } ) ;
475519 } ;
476520
477521 userStream . once ( 'reading' , makeRequest ) ;
478522 eventsIntercept . patch ( requestsStream ) ;
479523
480524 // need types for events-intercept
481525 // eslint-disable-next-line @typescript-eslint/no-explicit-any
482- ( requestsStream as any ) . intercept ( 'error' , retry ) ;
526+ ( requestsStream as any ) . intercept ( 'error' , err =>
527+ // Retry __after__ all pending data has been processed to ensure that the
528+ // checkpoint stream is reset at the correct position.
529+ setImmediate ( ( ) => retry ( err ) )
530+ ) ;
483531
484532 return (
485533 requestsStream
@@ -497,6 +545,10 @@ export function partialResultStream(
497545 ) ;
498546}
499547
548+ function _hasResumeToken ( chunk : google . spanner . v1 . PartialResultSet ) : boolean {
549+ return is . defined ( chunk . resumeToken ) && chunk . resumeToken . length > 0 ;
550+ }
551+
500552function isRetryableInternalError ( err : grpc . ServiceError ) : boolean {
501553 return (
502554 err . code === grpc . status . INTERNAL &&
0 commit comments