Skip to content
This repository was archived by the owner on Mar 4, 2026. It is now read-only.

Commit 2b97bac

Browse files
authored
fix: retry PDML on Aborted and Internal errors (#1205)
PDML statements should be retried on the following errors: * ABORTED: Retry using a new transaction * INTERNAL(Received unexpected EOS on DATA frame from server): Retry using the same transaction and any resume token that was received. * INTERNAL (RST_STREAM): Retry using the same transaction and any resume token that was received. Fixes #1197
1 parent 445015f commit 2b97bac

5 files changed

Lines changed: 148 additions & 32 deletions

File tree

src/database.ts

Lines changed: 50 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,47 +17,50 @@
1717
import {
1818
ApiError,
1919
ExistsCallback,
20+
GetConfig,
2021
Metadata,
2122
ServiceObjectConfig,
22-
GetConfig,
2323
} from '@google-cloud/common';
2424
// eslint-disable-next-line @typescript-eslint/no-var-requires
2525
const common = require('./common-grpc/service-object');
2626
import {promisify, promisifyAll} from '@google-cloud/promisify';
27-
import arrify = require('arrify');
2827
import * as extend from 'extend';
2928
import * as r from 'teeny-request';
3029
import * as streamEvents from 'stream-events';
3130
import * as through from 'through2';
32-
import {grpc, Operation as GaxOperation, CallOptions} from 'google-gax';
31+
import {CallOptions, grpc, Operation as GaxOperation} from 'google-gax';
3332
import {Backup} from './backup';
3433
import {BatchTransaction, TransactionIdentifier} from './batch-transaction';
35-
import {google as databaseAdmin} from '../protos/protos';
3634
import {
37-
Instance,
38-
CreateDatabaseOptions,
35+
google as databaseAdmin,
36+
google,
37+
google as spannerClient,
38+
} from '../protos/protos';
39+
import {
3940
CreateDatabaseCallback,
41+
CreateDatabaseOptions,
4042
GetDatabaseOperationsOptions,
4143
GetDatabaseOperationsResponse,
44+
Instance,
4245
} from './instance';
4346
import {PartialResultStream, Row} from './partial-result-stream';
4447
import {Session} from './session';
4548
import {
49+
isSessionNotFoundError,
4650
SessionPool,
47-
SessionPoolOptions,
4851
SessionPoolCloseCallback,
4952
SessionPoolInterface,
50-
isSessionNotFoundError,
53+
SessionPoolOptions,
5154
} from './session-pool';
52-
import {Table, CreateTableCallback, CreateTableResponse} from './table';
55+
import {CreateTableCallback, CreateTableResponse, Table} from './table';
5356
import {
57+
ExecuteSqlRequest,
58+
RunCallback,
59+
RunResponse,
60+
RunUpdateCallback,
5461
Snapshot,
5562
TimestampBounds,
5663
Transaction,
57-
ExecuteSqlRequest,
58-
RunUpdateCallback,
59-
RunResponse,
60-
RunCallback,
6164
} from './transaction';
6265
import {
6366
AsyncRunTransactionCallback,
@@ -66,22 +69,20 @@ import {
6669
RunTransactionOptions,
6770
TransactionRunner,
6871
} from './transaction-runner';
69-
70-
import {google} from '../protos/protos';
7172
import {
7273
IOperation,
73-
Schema,
74-
RequestCallback,
75-
ResourceCallback,
76-
PagedResponse,
77-
NormalCallback,
7874
LongRunningCallback,
75+
NormalCallback,
7976
PagedOptionsWithFilter,
77+
PagedResponse,
78+
RequestCallback,
79+
ResourceCallback,
80+
Schema,
8081
} from './common';
81-
import {Readable, Transform, Duplex} from 'stream';
82+
import {Duplex, Readable, Transform} from 'stream';
8283
import {PreciseDate} from '@google-cloud/precise-date';
83-
import {google as spannerClient} from '../protos/protos';
8484
import {EnumKey, RequestConfig, TranslateEnumKeys} from '.';
85+
import arrify = require('arrify');
8586

8687
type CreateBatchTransactionCallback = ResourceCallback<
8788
BatchTransaction,
@@ -2041,20 +2042,41 @@ class Database extends common.GrpcServiceObject {
20412042
return;
20422043
}
20432044

2044-
const transaction = session!.partitionedDml();
2045+
this._runPartitionedUpdate(session!, query, callback);
2046+
});
2047+
}
20452048

2046-
transaction.begin(err => {
2049+
_runPartitionedUpdate(
2050+
session: Session,
2051+
query: string | ExecuteSqlRequest,
2052+
callback?: RunUpdateCallback
2053+
): void | Promise<number> {
2054+
const transaction = session.partitionedDml();
2055+
2056+
transaction.begin(err => {
2057+
if (err) {
2058+
this.pool_.release(session!);
2059+
callback!(err, 0);
2060+
return;
2061+
}
2062+
2063+
transaction.runUpdate(query, (err, updateCount) => {
20472064
if (err) {
2065+
if (err.code !== grpc.status.ABORTED) {
2066+
this.pool_.release(session!);
2067+
callback!(err, 0);
2068+
return;
2069+
}
2070+
this._runPartitionedUpdate(session, query, callback);
2071+
} else {
20482072
this.pool_.release(session!);
2049-
callback!(err, 0);
2073+
callback!(null, updateCount);
20502074
return;
20512075
}
2052-
2053-
this._releaseOnEnd(session!, transaction);
2054-
transaction.runUpdate(query, callback!);
20552076
});
20562077
});
20572078
}
2079+
20582080
/**
20592081
* Create a readable object stream to receive resulting rows from a SQL
20602082
* statement.

src/partial-result-stream.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,12 @@ export function partialResultStream(
452452
};
453453

454454
const retry = (err: grpc.ServiceError): void => {
455-
if (!(err.code && retryableCodes!.includes(err.code))) {
455+
if (
456+
!(
457+
err.code &&
458+
(retryableCodes!.includes(err.code) || isRetryableInternalError(err))
459+
)
460+
) {
456461
// This is not a retryable error, so this will flush any rows the
457462
// checkpoint stream has queued. After that, we will destroy the
458463
// user's stream with the same error.
@@ -491,3 +496,13 @@ export function partialResultStream(
491496
.on('resumed', () => requestsStream.resume())
492497
);
493498
}
499+
500+
function isRetryableInternalError(err: grpc.ServiceError): boolean {
501+
return (
502+
err.code === grpc.status.INTERNAL &&
503+
(err.message.includes(
504+
'Received unexpected EOS on DATA frame from server'
505+
) ||
506+
err.message.includes('Received RST_STREAM'))
507+
);
508+
}

test/database.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2276,7 +2276,10 @@ describe('Database', () => {
22762276
'begin'
22772277
) as sinon.SinonStub).callsFake(callback => callback(null));
22782278

2279-
runUpdateStub = sandbox.stub(fakePartitionedDml, 'runUpdate');
2279+
runUpdateStub = (sandbox.stub(
2280+
fakePartitionedDml,
2281+
'runUpdate'
2282+
) as sinon.SinonStub).callsFake((_, callback) => callback(null));
22802283
});
22812284

22822285
it('should get a read only session from the pool', () => {
@@ -2330,10 +2333,10 @@ describe('Database', () => {
23302333

23312334
database.runPartitionedUpdate(QUERY, fakeCallback);
23322335

2333-
const [query, callback] = runUpdateStub.lastCall.args;
2336+
const [query] = runUpdateStub.lastCall.args;
23342337

23352338
assert.strictEqual(query, QUERY);
2336-
assert.strictEqual(callback, fakeCallback);
2339+
assert.ok(fakeCallback.calledOnce);
23372340
});
23382341

23392342
it('should release the session on transaction end', () => {

test/mockserver/mockspanner.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,7 @@ export class MockSpanner {
520520
if (res) {
521521
let partialResultSets;
522522
let resumeIndex;
523+
let streamErr;
523524
switch (res.type) {
524525
case StatementResultType.RESULT_SET:
525526
partialResultSets = MockSpanner.toPartialResultSets(
@@ -549,6 +550,19 @@ export class MockSpanner {
549550
}
550551
break;
551552
case StatementResultType.UPDATE_COUNT:
553+
call.write(
554+
MockSpanner.emptyPartialResultSet(
555+
Buffer.from('1'.padStart(8, '0'))
556+
)
557+
);
558+
streamErr = this.shiftStreamError(
559+
this.executeStreamingSql.name,
560+
1
561+
);
562+
if (streamErr) {
563+
call.emit('error', streamErr);
564+
break;
565+
}
552566
call.write(MockSpanner.toPartialResultSet(res.updateCount));
553567
break;
554568
case StatementResultType.ERROR:
@@ -617,6 +631,14 @@ export class MockSpanner {
617631
return res;
618632
}
619633

634+
private static emptyPartialResultSet(
635+
resumeToken: Uint8Array
636+
): protobuf.PartialResultSet {
637+
return protobuf.PartialResultSet.create({
638+
resumeToken,
639+
});
640+
}
641+
620642
private static toPartialResultSet(
621643
rowCount: number
622644
): protobuf.PartialResultSet {

test/spanner.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2264,6 +2264,60 @@ describe('Spanner with mock server', () => {
22642264
}
22652265
await database.close();
22662266
});
2267+
2268+
describe('pdml', () => {
2269+
it('should retry on aborted error', async () => {
2270+
const database = newTestDatabase();
2271+
spannerMock.setExecutionTime(
2272+
spannerMock.executeStreamingSql,
2273+
SimulatedExecutionTime.ofError({
2274+
code: grpc.status.ABORTED,
2275+
message: 'Transaction aborted',
2276+
metadata: MockSpanner.createMinimalRetryDelayMetadata(),
2277+
streamIndex: 1,
2278+
} as MockError)
2279+
);
2280+
const [updateCount] = await database.runPartitionedUpdate(updateSql);
2281+
assert.strictEqual(updateCount, 2);
2282+
await database.close();
2283+
});
2284+
2285+
it('should retry on specific internal error', async () => {
2286+
const database = newTestDatabase();
2287+
spannerMock.setExecutionTime(
2288+
spannerMock.executeStreamingSql,
2289+
SimulatedExecutionTime.ofError({
2290+
code: grpc.status.INTERNAL,
2291+
message: 'Received unexpected EOS on DATA frame from server',
2292+
streamIndex: 1,
2293+
} as MockError)
2294+
);
2295+
const [updateCount] = await database.runPartitionedUpdate(updateSql);
2296+
assert.strictEqual(updateCount, 2);
2297+
await database.close();
2298+
});
2299+
2300+
it('should fail on generic internal error', async () => {
2301+
const database = newTestDatabase();
2302+
spannerMock.setExecutionTime(
2303+
spannerMock.executeStreamingSql,
2304+
SimulatedExecutionTime.ofError({
2305+
code: grpc.status.INTERNAL,
2306+
message: 'Generic internal error',
2307+
streamIndex: 1,
2308+
} as MockError)
2309+
);
2310+
try {
2311+
await database.runPartitionedUpdate(updateSql);
2312+
assert.fail('missing expected INTERNAL error');
2313+
} catch (err) {
2314+
assert.strictEqual(err.code, grpc.status.INTERNAL);
2315+
assert.ok(err.message.includes('Generic internal error'));
2316+
} finally {
2317+
await database.close();
2318+
}
2319+
});
2320+
});
22672321
});
22682322

22692323
describe('instanceAdmin', () => {

0 commit comments

Comments
 (0)