Skip to content

Commit 3e27d28

Browse files
authored
feat: Handle StatusRuntimeException in CbtTestProxy, increase inbound message / metadata size (#2763)
1 parent ca24007 commit 3e27d28

File tree

2 files changed

+91
-62
lines changed

2 files changed

+91
-62
lines changed

test-proxy/src/main/java/com/google/cloud/bigtable/testproxy/CbtTestProxy.java

Lines changed: 84 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
6060
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
6161
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
62+
import io.grpc.protobuf.StatusProto;
6263
import io.grpc.stub.StreamObserver;
6364
import java.io.ByteArrayInputStream;
6465
import java.io.Closeable;
@@ -300,6 +301,11 @@ public void mutateRow(
300301
.build());
301302
responseObserver.onCompleted();
302303
return;
304+
} catch (StatusRuntimeException e) {
305+
responseObserver.onNext(
306+
MutateRowResult.newBuilder().setStatus(StatusProto.fromThrowable(e)).build());
307+
responseObserver.onCompleted();
308+
return;
303309
}
304310

305311
responseObserver.onNext(
@@ -354,10 +360,16 @@ public void bulkMutateRows(
354360
.build());
355361
responseObserver.onCompleted();
356362
return;
363+
} catch (StatusRuntimeException e) {
364+
responseObserver.onNext(
365+
MutateRowsResult.newBuilder().setStatus(StatusProto.fromThrowable(e)).build());
366+
responseObserver.onCompleted();
367+
return;
357368
}
358369

359370
responseObserver.onNext(
360371
MutateRowsResult.newBuilder()
372+
// Note that the default instance == OK
361373
.setStatus(com.google.rpc.Status.getDefaultInstance())
362374
.build());
363375
responseObserver.onCompleted();
@@ -388,6 +400,14 @@ public void readRow(ReadRowRequest request, StreamObserver<RowResult> responseOb
388400
client
389401
.dataClient()
390402
.readRow(tableId, request.getRowKey(), FILTERS.fromProto(request.getFilter()));
403+
if (row != null) {
404+
RowResult.Builder resultBuilder = convertRowResult(row);
405+
responseObserver.onNext(
406+
// Note that the default instance == OK
407+
resultBuilder.setStatus(com.google.rpc.Status.getDefaultInstance()).build());
408+
} else {
409+
logger.info(String.format("readRow() did not find row: %s", request.getRowKey()));
410+
}
391411
} catch (ApiException e) {
392412
responseObserver.onNext(
393413
RowResult.newBuilder()
@@ -399,30 +419,23 @@ public void readRow(ReadRowRequest request, StreamObserver<RowResult> responseOb
399419
.build());
400420
responseObserver.onCompleted();
401421
return;
402-
}
403-
404-
if (row != null) {
405-
try {
406-
RowResult.Builder resultBuilder = convertRowResult(row);
407-
responseObserver.onNext(
408-
resultBuilder.setStatus(com.google.rpc.Status.getDefaultInstance()).build());
409-
} catch (RuntimeException e) {
410-
// If client encounters problem, don't return any row result.
411-
responseObserver.onNext(
412-
RowResult.newBuilder()
413-
.setStatus(
414-
com.google.rpc.Status.newBuilder()
415-
.setCode(Code.INTERNAL.getNumber())
416-
.setMessage(e.getMessage())
417-
.build())
418-
.build());
419-
responseObserver.onCompleted();
420-
return;
421-
}
422-
} else {
423-
logger.info(String.format("readRow() did not find row: %s", request.getRowKey()));
422+
} catch (StatusRuntimeException e) {
424423
responseObserver.onNext(
425-
RowResult.newBuilder().setStatus(com.google.rpc.Status.getDefaultInstance()).build());
424+
RowResult.newBuilder().setStatus(StatusProto.fromThrowable(e)).build());
425+
responseObserver.onCompleted();
426+
return;
427+
} catch (RuntimeException e) {
428+
// If client encounters problem, don't return any row result.
429+
responseObserver.onNext(
430+
RowResult.newBuilder()
431+
.setStatus(
432+
com.google.rpc.Status.newBuilder()
433+
.setCode(Code.INTERNAL.getNumber())
434+
.setMessage(e.getMessage())
435+
.build())
436+
.build());
437+
responseObserver.onCompleted();
438+
return;
426439
}
427440
responseObserver.onCompleted();
428441
}
@@ -441,6 +454,11 @@ public void readRows(ReadRowsRequest request, StreamObserver<RowsResult> respons
441454
Query query = Query.fromProto(request.getRequest());
442455
try {
443456
rows = client.dataClient().readRows(query);
457+
int cancelAfterRows = request.getCancelAfterRows();
458+
RowsResult.Builder resultBuilder = convertRowsResult(rows, cancelAfterRows);
459+
responseObserver.onNext(
460+
// Note that the default instance == OK
461+
resultBuilder.setStatus(com.google.rpc.Status.getDefaultInstance()).build());
444462
} catch (ApiException e) {
445463
responseObserver.onNext(
446464
RowsResult.newBuilder()
@@ -452,13 +470,11 @@ public void readRows(ReadRowsRequest request, StreamObserver<RowsResult> respons
452470
.build());
453471
responseObserver.onCompleted();
454472
return;
455-
}
456-
457-
int cancelAfterRows = request.getCancelAfterRows();
458-
try {
459-
RowsResult.Builder resultBuilder = convertRowsResult(rows, cancelAfterRows);
473+
} catch (StatusRuntimeException e) {
460474
responseObserver.onNext(
461-
resultBuilder.setStatus(com.google.rpc.Status.getDefaultInstance()).build());
475+
RowsResult.newBuilder().setStatus(StatusProto.fromThrowable(e)).build());
476+
responseObserver.onCompleted();
477+
return;
462478
} catch (RuntimeException e) {
463479
// If client encounters problem, don't return any row result.
464480
responseObserver.onNext(
@@ -578,6 +594,11 @@ public void sampleRowKeys(
578594
.build());
579595
responseObserver.onCompleted();
580596
return;
597+
} catch (StatusRuntimeException e) {
598+
responseObserver.onNext(
599+
SampleRowKeysResult.newBuilder().setStatus(StatusProto.fromThrowable(e)).build());
600+
responseObserver.onCompleted();
601+
return;
581602
}
582603

583604
SampleRowKeysResult.Builder resultBuilder = SampleRowKeysResult.newBuilder();
@@ -588,6 +609,7 @@ public void sampleRowKeys(
588609
.setOffsetBytes(keyOffset.getOffsetBytes());
589610
}
590611
responseObserver.onNext(
612+
// Note that the default instance == OK
591613
resultBuilder.setStatus(com.google.rpc.Status.getDefaultInstance()).build());
592614
responseObserver.onCompleted();
593615
}
@@ -618,11 +640,17 @@ public void checkAndMutateRow(
618640
.build());
619641
responseObserver.onCompleted();
620642
return;
643+
} catch (StatusRuntimeException e) {
644+
responseObserver.onNext(
645+
CheckAndMutateRowResult.newBuilder().setStatus(StatusProto.fromThrowable(e)).build());
646+
responseObserver.onCompleted();
647+
return;
621648
}
622649

623650
CheckAndMutateRowResult.Builder resultBuilder = CheckAndMutateRowResult.newBuilder();
624651
resultBuilder.getResultBuilder().setPredicateMatched(matched);
625652
responseObserver.onNext(
653+
// Note that the default instance == OK
626654
resultBuilder.setStatus(com.google.rpc.Status.getDefaultInstance()).build());
627655
responseObserver.onCompleted();
628656
}
@@ -642,6 +670,16 @@ public void readModifyWriteRow(
642670
ReadModifyWriteRow mutation = ReadModifyWriteRow.fromProto(request.getRequest());
643671
try {
644672
row = client.dataClient().readModifyWriteRow(mutation);
673+
if (row != null) {
674+
RowResult.Builder resultBuilder = convertRowResult(row);
675+
responseObserver.onNext(
676+
// Note that the default instance == OK
677+
resultBuilder.setStatus(com.google.rpc.Status.getDefaultInstance()).build());
678+
} else {
679+
logger.info(
680+
String.format(
681+
"readModifyWriteRow() did not find row: %s", request.getRequest().getRowKey()));
682+
}
645683
} catch (ApiException e) {
646684
responseObserver.onNext(
647685
RowResult.newBuilder()
@@ -653,32 +691,23 @@ public void readModifyWriteRow(
653691
.build());
654692
responseObserver.onCompleted();
655693
return;
656-
}
657-
658-
if (row != null) {
659-
try {
660-
RowResult.Builder resultBuilder = convertRowResult(row);
661-
responseObserver.onNext(
662-
resultBuilder.setStatus(com.google.rpc.Status.getDefaultInstance()).build());
663-
} catch (RuntimeException e) {
664-
// If client encounters problem, fail the whole operation.
665-
responseObserver.onNext(
666-
RowResult.newBuilder()
667-
.setStatus(
668-
com.google.rpc.Status.newBuilder()
669-
.setCode(Code.INTERNAL.getNumber())
670-
.setMessage(e.getMessage())
671-
.build())
672-
.build());
673-
responseObserver.onCompleted();
674-
return;
675-
}
676-
} else {
677-
logger.info(
678-
String.format(
679-
"readModifyWriteRow() did not find row: %s", request.getRequest().getRowKey()));
694+
} catch (StatusRuntimeException e) {
680695
responseObserver.onNext(
681-
RowResult.newBuilder().setStatus(com.google.rpc.Status.getDefaultInstance()).build());
696+
RowResult.newBuilder().setStatus(StatusProto.fromThrowable(e)).build());
697+
responseObserver.onCompleted();
698+
return;
699+
} catch (RuntimeException e) {
700+
// If client encounters problem, fail the whole operation.
701+
responseObserver.onNext(
702+
RowResult.newBuilder()
703+
.setStatus(
704+
com.google.rpc.Status.newBuilder()
705+
.setCode(Code.INTERNAL.getNumber())
706+
.setMessage(e.getMessage())
707+
.build())
708+
.build());
709+
responseObserver.onCompleted();
710+
return;
682711
}
683712
responseObserver.onCompleted();
684713
}
@@ -727,13 +756,7 @@ public void executeQuery(
727756
return;
728757
} catch (StatusRuntimeException e) {
729758
responseObserver.onNext(
730-
ExecuteQueryResult.newBuilder()
731-
.setStatus(
732-
com.google.rpc.Status.newBuilder()
733-
.setCode(e.getStatus().getCode().value())
734-
.setMessage(e.getStatus().getDescription())
735-
.build())
736-
.build());
759+
ExecuteQueryResult.newBuilder().setStatus(StatusProto.fromThrowable(e)).build());
737760
responseObserver.onCompleted();
738761
return;
739762
} catch (RuntimeException e) {

test-proxy/src/main/java/com/google/cloud/bigtable/testproxy/CbtTestProxyMain.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@ public static void main(String[] args) throws InterruptedException, IOException
3434

3535
CbtTestProxy cbtTestProxy = CbtTestProxy.create();
3636
logger.info(String.format("Test proxy starting on %d", port));
37-
ServerBuilder.forPort(port).addService(cbtTestProxy).build().start().awaitTermination();
37+
ServerBuilder.forPort(port)
38+
.addService(cbtTestProxy)
39+
.maxInboundMessageSize(Integer.MAX_VALUE)
40+
.maxInboundMetadataSize(Integer.MAX_VALUE)
41+
.build()
42+
.start()
43+
.awaitTermination();
3844
}
3945
}

0 commit comments

Comments
 (0)