Skip to content
This repository was archived by the owner on Mar 4, 2026. It is now read-only.

Commit da2ca7b

Browse files
olavloiteAVaksman
andauthored
fix: reset buffered chunked value before retry (#1397)
* fix: reset buffered chunked value before retry Retrying a stream of PartialResultSets must correctly reset to the last state that it had when it received a PartialResultSet with a resume token. That means: 1. The pending value that should be merged with the next result should be reset to the pending value of the last PartialResultSet with a resume token. 2. The stream must correctly check for the existence of a resume token. 3. The stream must ensure that all values that are already in the pipeline must be handled before resetting the stream to the last resume token to ensure that no PartialResultSet is added twice to the stream. Fixes #1392 Co-authored-by: Alex <7764119+AVaksman@users.noreply.github.com>
1 parent 3c0de39 commit da2ca7b

3 files changed

Lines changed: 1020 additions & 30 deletions

File tree

src/partial-result-stream.ts

Lines changed: 72 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import {grpc} from 'google-gax';
2626

2727
import {codec, JSONOptions, Json, Field, Value} from './codec';
2828
import {google} from '../protos/protos';
29+
import * as stream from 'stream';
2930

3031
export type ResumeToken = string | Uint8Array;
3132

@@ -141,6 +142,7 @@ export class PartialResultStream extends Transform implements ResultEvents {
141142
private _fields!: google.spanner.v1.StructType.Field[];
142143
private _options: RowOptions;
143144
private _pendingValue?: p.IValue;
145+
private _pendingValueForResume?: p.IValue;
144146
private _values: p.IValue[];
145147
private _numPushFailed = 0;
146148
constructor(options = {}) {
@@ -240,6 +242,15 @@ export class PartialResultStream extends Transform implements ResultEvents {
240242
}
241243
}
242244

245+
_clearPendingValues() {
246+
this._values = [];
247+
if (this._pendingValueForResume) {
248+
this._pendingValue = this._pendingValueForResume;
249+
} else {
250+
delete this._pendingValue;
251+
}
252+
}
253+
243254
/**
244255
* Manages any chunked values.
245256
*
@@ -268,6 +279,11 @@ export class PartialResultStream extends Transform implements ResultEvents {
268279
// chunk to be processed.
269280
if (chunk.chunkedValue) {
270281
this._pendingValue = values.pop();
282+
if (_hasResumeToken(chunk)) {
283+
this._pendingValueForResume = this._pendingValue;
284+
}
285+
} else if (_hasResumeToken(chunk)) {
286+
delete this._pendingValueForResume;
271287
}
272288

273289
let res = true;
@@ -419,33 +435,51 @@ export function partialResultStream(
419435
options?: RowOptions
420436
): PartialResultStream {
421437
const retryableCodes = [grpc.status.UNAVAILABLE];
438+
const maxQueued = 10;
422439
let lastResumeToken: ResumeToken;
423-
let lastRetriedErr: grpc.ServiceError | undefined;
424440
let lastRequestStream: Readable;
425441

426442
// mergeStream allows multiple streams to be connected into one. This is good;
427443
// if we need to retry a request and pipe more data to the user's stream.
444+
// We also add an additional stream that can be used to flush any remaining
445+
// items in the checkpoint stream that have been received, and that did not
446+
// contain a resume token.
428447
const requestsStream = mergeStream();
448+
const flushStream = new stream.PassThrough({objectMode: true});
449+
requestsStream.add(flushStream);
429450
const partialRSStream = new PartialResultStream(options);
430451
const userStream = streamEvents(partialRSStream);
452+
// We keep track of the number of PartialResultSets that did not include a
453+
// resume token, as that is an indication whether it is safe to retry the
454+
// stream halfway.
455+
let withoutCheckpointCount = 0;
431456
const batchAndSplitOnTokenStream = checkpointStream.obj({
432-
maxQueued: 10,
433-
isCheckpointFn: (row: google.spanner.v1.PartialResultSet): boolean => {
434-
return is.defined(row.resumeToken);
457+
maxQueued,
458+
isCheckpointFn: (chunk: google.spanner.v1.PartialResultSet): boolean => {
459+
const withCheckpoint = _hasResumeToken(chunk);
460+
if (withCheckpoint) {
461+
withoutCheckpointCount = 0;
462+
} else {
463+
withoutCheckpointCount++;
464+
}
465+
return withCheckpoint;
435466
},
436467
});
437468

438469
// This listener ensures that the last request that executed successfully
439470
// after one or more retries will end the requestsStream.
440471
const endListener = () => {
441-
if (lastRetriedErr) {
442-
setImmediate(() => requestsStream.end());
443-
}
472+
setImmediate(() => {
473+
// Push a fake PartialResultSet without any values but with a resume token
474+
// into the stream to ensure that the checkpoint stream is emptied, and
475+
// then push `null` to end the stream.
476+
flushStream.push({resumeToken: '_'});
477+
flushStream.push(null);
478+
requestsStream.end();
479+
});
444480
};
445481
const makeRequest = (): void => {
446-
if (lastRequestStream) {
447-
lastRequestStream.removeListener('end', endListener);
448-
}
482+
partialRSStream._clearPendingValues();
449483
lastRequestStream = requestFn(lastResumeToken);
450484
lastRequestStream.on('end', endListener);
451485
requestsStream.add(lastRequestStream);
@@ -456,30 +490,44 @@ export function partialResultStream(
456490
!(
457491
err.code &&
458492
(retryableCodes!.includes(err.code) || isRetryableInternalError(err))
459-
)
493+
) ||
494+
// If we have received too many chunks without a resume token, it is not
495+
// safe to retry.
496+
withoutCheckpointCount > maxQueued
460497
) {
461-
// This is not a retryable error, so this will flush any rows the
498+
// This is not a retryable error so this will flush any rows the
462499
// checkpoint stream has queued. After that, we will destroy the
463500
// user's stream with the same error.
464501
setImmediate(() => batchAndSplitOnTokenStream.destroy(err));
465502
return;
466503
}
467504

468-
// We're going to retry from where we left off.
469-
// Keep track of the fact that we retried an error in order to end the
470-
// merged result stream.
471-
lastRetriedErr = err;
472-
// Empty queued rows on the checkpoint stream (will not emit them to user).
473-
batchAndSplitOnTokenStream.reset();
474-
makeRequest();
505+
if (lastRequestStream) {
506+
lastRequestStream.removeListener('end', endListener);
507+
lastRequestStream.destroy();
508+
}
509+
// Delay the retry until all the values that are already in the stream
510+
// pipeline have been handled. This ensures that the checkpoint stream is
511+
// reset to the correct point. Calling .reset() directly here could cause
512+
// any values that are currently in the pipeline and that have not been
513+
// handled yet, to be pushed twice into the entire stream.
514+
setImmediate(() => {
515+
// Empty queued rows on the checkpoint stream (will not emit them to user).
516+
batchAndSplitOnTokenStream.reset();
517+
makeRequest();
518+
});
475519
};
476520

477521
userStream.once('reading', makeRequest);
478522
eventsIntercept.patch(requestsStream);
479523

480524
// need types for events-intercept
481525
// eslint-disable-next-line @typescript-eslint/no-explicit-any
482-
(requestsStream as any).intercept('error', retry);
526+
(requestsStream as any).intercept('error', err =>
527+
// Retry __after__ all pending data has been processed to ensure that the
528+
// checkpoint stream is reset at the correct position.
529+
setImmediate(() => retry(err))
530+
);
483531

484532
return (
485533
requestsStream
@@ -497,6 +545,10 @@ export function partialResultStream(
497545
);
498546
}
499547

548+
function _hasResumeToken(chunk: google.spanner.v1.PartialResultSet): boolean {
549+
return is.defined(chunk.resumeToken) && chunk.resumeToken.length > 0;
550+
}
551+
500552
function isRetryableInternalError(err: grpc.ServiceError): boolean {
501553
return (
502554
err.code === grpc.status.INTERNAL &&

0 commit comments

Comments
 (0)