Skip to content

Commit 7dbaa6e

Browse files
authored
fix: Out of order read rows fix (#1231)
Client changes: Emits an error if the chunk transformer receives keys that are out of order. The chunk transformer is a stream that data passes through before it reaches the stream that has the handlers for errors and retries. Also pulls some comparison functions into a utility file because now those functions are used by the chunk transformer to check to see if rows are in order. They are still used by the table.ts file, but are available in both places now. Test proxy changes: Makes it so that when the client emits an error as a result of rows that are out of order, that the error is sent back to the test runner as an rpc message instead of an rpc error. This is done to stay consistent with the test proxy in Java and allows the TestReadRows_NoRetry_OutOfOrderError test case to pass.
1 parent 2d9e759 commit 7dbaa6e

File tree

4 files changed

+39
-21
lines changed

4 files changed

+39
-21
lines changed

src/chunktransformer.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import {Transform, TransformOptions} from 'stream';
1616
import {Bytes, Mutation} from './mutation';
17+
import {TableUtils} from './utils/table';
1718

1819
export type Value = string | number | boolean | Uint8Array;
1920

@@ -259,6 +260,11 @@ export class ChunkTransformer extends Transform {
259260
errorMessage = 'A new row cannot be reset';
260261
} else if (lastRowKey === newRowKey) {
261262
errorMessage = 'A commit happened but the same key followed';
263+
} else if (
264+
typeof lastRowKey !== 'undefined' &&
265+
TableUtils.lessThanOrEqualTo(newRowKey as string, lastRowKey as string)
266+
) {
267+
errorMessage = 'A row key must be strictly increasing';
262268
} else if (!chunk.familyName) {
263269
errorMessage = 'A family must be set';
264270
} else if (chunk.qualifier === null || chunk.qualifier === undefined) {

src/table.ts

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -786,18 +786,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
786786
};
787787

788788
if (lastRowKey) {
789-
// TODO: lhs and rhs type shouldn't be string, it could be
790-
// string, number, Uint8Array, boolean. Fix the type
791-
// and clean up the casting.
792-
const lessThan = (lhs: string, rhs: string) => {
793-
const lhsBytes = Mutation.convertToBytes(lhs);
794-
const rhsBytes = Mutation.convertToBytes(rhs);
795-
return (lhsBytes as Buffer).compare(rhsBytes as Uint8Array) === -1;
796-
};
797-
const greaterThan = (lhs: string, rhs: string) => lessThan(rhs, lhs);
798-
const lessThanOrEqualTo = (lhs: string, rhs: string) =>
799-
!greaterThan(lhs, rhs);
800-
801789
// Readjust and/or remove ranges based on previous valid row reads.
802790
// Iterate backward since items may need to be removed.
803791
for (let index = ranges.length - 1; index >= 0; index--) {
@@ -810,11 +798,14 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
810798
: range.end;
811799
const startKeyIsRead =
812800
!startValue ||
813-
lessThanOrEqualTo(startValue as string, lastRowKey as string);
801+
TableUtils.lessThanOrEqualTo(
802+
startValue as string,
803+
lastRowKey as string
804+
);
814805
const endKeyIsNotRead =
815806
!endValue ||
816807
(endValue as Buffer).length === 0 ||
817-
lessThan(lastRowKey as string, endValue as string);
808+
TableUtils.lessThan(lastRowKey as string, endValue as string);
818809
if (startKeyIsRead) {
819810
if (endKeyIsNotRead) {
820811
// EndKey is not read, reset the range to start from lastRowKey open
@@ -831,7 +822,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
831822

832823
// Remove rowKeys already read.
833824
rowKeys = rowKeys.filter(rowKey =>
834-
greaterThan(rowKey, lastRowKey as string)
825+
TableUtils.greaterThan(rowKey, lastRowKey as string)
835826
);
836827

837828
// If there was a row limit in the original request and

src/utils/table.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
import {GetRowsOptions, PrefixRange} from '../table';
16+
import {Mutation} from '../mutation';
1617

1718
export class TableUtils {
1819
static getRanges(options: GetRowsOptions) {
@@ -49,6 +50,23 @@ export class TableUtils {
4950
return ranges;
5051
}
5152

53+
// TODO: lhs and rhs type shouldn't be string, it could be
54+
// string, number, Uint8Array, boolean. Fix the type
55+
// and clean up the casting.
56+
static lessThan(lhs: string, rhs: string) {
57+
const lhsBytes = Mutation.convertToBytes(lhs);
58+
const rhsBytes = Mutation.convertToBytes(rhs);
59+
return (lhsBytes as Buffer).compare(rhsBytes as Uint8Array) === -1;
60+
}
61+
62+
static greaterThan(lhs: string, rhs: string) {
63+
return this.lessThan(rhs, lhs);
64+
}
65+
66+
static lessThanOrEqualTo(lhs: string, rhs: string) {
67+
return !this.greaterThan(lhs, rhs);
68+
}
69+
5270
static createPrefixRange(start: string): PrefixRange {
5371
const prefix = start.replace(new RegExp('[\xff]+$'), '');
5472
let endKey = '';

testproxy/services/read-rows.js

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,15 @@ const readRows = ({clientMap}) =>
6565
const bigtable = clientMap.get(clientId);
6666
const table = getTableInfo(bigtable, tableName);
6767
const rowsOptions = getRowsOptions(readRowsRequest);
68-
const [rows] = await table.getRows(rowsOptions);
69-
70-
return {
71-
status: {code: grpc.status.OK, details: []},
72-
row: rows.map(getRowResponse),
73-
};
68+
try {
69+
const [rows] = await table.getRows(rowsOptions);
70+
return {
71+
status: {code: grpc.status.OK, details: []},
72+
row: rows.map(getRowResponse),
73+
};
74+
} catch (e) {
75+
return {status: e};
76+
}
7477
});
7578

7679
module.exports = readRows;

0 commit comments

Comments
 (0)