5959import io .grpc .netty .shaded .io .grpc .netty .GrpcSslContexts ;
6060import io .grpc .netty .shaded .io .grpc .netty .NettyChannelBuilder ;
6161import io .grpc .netty .shaded .io .netty .handler .ssl .SslContext ;
62+ import io .grpc .protobuf .StatusProto ;
6263import io .grpc .stub .StreamObserver ;
6364import java .io .ByteArrayInputStream ;
6465import java .io .Closeable ;
@@ -300,6 +301,11 @@ public void mutateRow(
300301 .build ());
301302 responseObserver .onCompleted ();
302303 return ;
304+ } catch (StatusRuntimeException e ) {
305+ responseObserver .onNext (
306+ MutateRowResult .newBuilder ().setStatus (StatusProto .fromThrowable (e )).build ());
307+ responseObserver .onCompleted ();
308+ return ;
303309 }
304310
305311 responseObserver .onNext (
@@ -354,10 +360,16 @@ public void bulkMutateRows(
354360 .build ());
355361 responseObserver .onCompleted ();
356362 return ;
363+ } catch (StatusRuntimeException e ) {
364+ responseObserver .onNext (
365+ MutateRowsResult .newBuilder ().setStatus (StatusProto .fromThrowable (e )).build ());
366+ responseObserver .onCompleted ();
367+ return ;
357368 }
358369
359370 responseObserver .onNext (
360371 MutateRowsResult .newBuilder ()
372+ // Note that the default instance == OK
361373 .setStatus (com .google .rpc .Status .getDefaultInstance ())
362374 .build ());
363375 responseObserver .onCompleted ();
@@ -388,6 +400,14 @@ public void readRow(ReadRowRequest request, StreamObserver<RowResult> responseOb
388400 client
389401 .dataClient ()
390402 .readRow (tableId , request .getRowKey (), FILTERS .fromProto (request .getFilter ()));
403+ if (row != null ) {
404+ RowResult .Builder resultBuilder = convertRowResult (row );
405+ responseObserver .onNext (
406+ // Note that the default instance == OK
407+ resultBuilder .setStatus (com .google .rpc .Status .getDefaultInstance ()).build ());
408+ } else {
409+ logger .info (String .format ("readRow() did not find row: %s" , request .getRowKey ()));
410+ }
391411 } catch (ApiException e ) {
392412 responseObserver .onNext (
393413 RowResult .newBuilder ()
@@ -399,30 +419,23 @@ public void readRow(ReadRowRequest request, StreamObserver<RowResult> responseOb
399419 .build ());
400420 responseObserver .onCompleted ();
401421 return ;
402- }
403-
404- if (row != null ) {
405- try {
406- RowResult .Builder resultBuilder = convertRowResult (row );
407- responseObserver .onNext (
408- resultBuilder .setStatus (com .google .rpc .Status .getDefaultInstance ()).build ());
409- } catch (RuntimeException e ) {
410- // If client encounters problem, don't return any row result.
411- responseObserver .onNext (
412- RowResult .newBuilder ()
413- .setStatus (
414- com .google .rpc .Status .newBuilder ()
415- .setCode (Code .INTERNAL .getNumber ())
416- .setMessage (e .getMessage ())
417- .build ())
418- .build ());
419- responseObserver .onCompleted ();
420- return ;
421- }
422- } else {
423- logger .info (String .format ("readRow() did not find row: %s" , request .getRowKey ()));
422+ } catch (StatusRuntimeException e ) {
424423 responseObserver .onNext (
425- RowResult .newBuilder ().setStatus (com .google .rpc .Status .getDefaultInstance ()).build ());
424+ RowResult .newBuilder ().setStatus (StatusProto .fromThrowable (e )).build ());
425+ responseObserver .onCompleted ();
426+ return ;
427+ } catch (RuntimeException e ) {
428+ // If client encounters problem, don't return any row result.
429+ responseObserver .onNext (
430+ RowResult .newBuilder ()
431+ .setStatus (
432+ com .google .rpc .Status .newBuilder ()
433+ .setCode (Code .INTERNAL .getNumber ())
434+ .setMessage (e .getMessage ())
435+ .build ())
436+ .build ());
437+ responseObserver .onCompleted ();
438+ return ;
426439 }
427440 responseObserver .onCompleted ();
428441 }
@@ -441,6 +454,11 @@ public void readRows(ReadRowsRequest request, StreamObserver<RowsResult> respons
441454 Query query = Query .fromProto (request .getRequest ());
442455 try {
443456 rows = client .dataClient ().readRows (query );
457+ int cancelAfterRows = request .getCancelAfterRows ();
458+ RowsResult .Builder resultBuilder = convertRowsResult (rows , cancelAfterRows );
459+ responseObserver .onNext (
460+ // Note that the default instance == OK
461+ resultBuilder .setStatus (com .google .rpc .Status .getDefaultInstance ()).build ());
444462 } catch (ApiException e ) {
445463 responseObserver .onNext (
446464 RowsResult .newBuilder ()
@@ -452,13 +470,11 @@ public void readRows(ReadRowsRequest request, StreamObserver<RowsResult> respons
452470 .build ());
453471 responseObserver .onCompleted ();
454472 return ;
455- }
456-
457- int cancelAfterRows = request .getCancelAfterRows ();
458- try {
459- RowsResult .Builder resultBuilder = convertRowsResult (rows , cancelAfterRows );
473+ } catch (StatusRuntimeException e ) {
460474 responseObserver .onNext (
461- resultBuilder .setStatus (com .google .rpc .Status .getDefaultInstance ()).build ());
475+ RowsResult .newBuilder ().setStatus (StatusProto .fromThrowable (e )).build ());
476+ responseObserver .onCompleted ();
477+ return ;
462478 } catch (RuntimeException e ) {
463479 // If client encounters problem, don't return any row result.
464480 responseObserver .onNext (
@@ -578,6 +594,11 @@ public void sampleRowKeys(
578594 .build ());
579595 responseObserver .onCompleted ();
580596 return ;
597+ } catch (StatusRuntimeException e ) {
598+ responseObserver .onNext (
599+ SampleRowKeysResult .newBuilder ().setStatus (StatusProto .fromThrowable (e )).build ());
600+ responseObserver .onCompleted ();
601+ return ;
581602 }
582603
583604 SampleRowKeysResult .Builder resultBuilder = SampleRowKeysResult .newBuilder ();
@@ -588,6 +609,7 @@ public void sampleRowKeys(
588609 .setOffsetBytes (keyOffset .getOffsetBytes ());
589610 }
590611 responseObserver .onNext (
612+ // Note that the default instance == OK
591613 resultBuilder .setStatus (com .google .rpc .Status .getDefaultInstance ()).build ());
592614 responseObserver .onCompleted ();
593615 }
@@ -618,11 +640,17 @@ public void checkAndMutateRow(
618640 .build ());
619641 responseObserver .onCompleted ();
620642 return ;
643+ } catch (StatusRuntimeException e ) {
644+ responseObserver .onNext (
645+ CheckAndMutateRowResult .newBuilder ().setStatus (StatusProto .fromThrowable (e )).build ());
646+ responseObserver .onCompleted ();
647+ return ;
621648 }
622649
623650 CheckAndMutateRowResult .Builder resultBuilder = CheckAndMutateRowResult .newBuilder ();
624651 resultBuilder .getResultBuilder ().setPredicateMatched (matched );
625652 responseObserver .onNext (
653+ // Note that the default instance == OK
626654 resultBuilder .setStatus (com .google .rpc .Status .getDefaultInstance ()).build ());
627655 responseObserver .onCompleted ();
628656 }
@@ -642,6 +670,16 @@ public void readModifyWriteRow(
642670 ReadModifyWriteRow mutation = ReadModifyWriteRow .fromProto (request .getRequest ());
643671 try {
644672 row = client .dataClient ().readModifyWriteRow (mutation );
673+ if (row != null ) {
674+ RowResult .Builder resultBuilder = convertRowResult (row );
675+ responseObserver .onNext (
676+ // Note that the default instance == OK
677+ resultBuilder .setStatus (com .google .rpc .Status .getDefaultInstance ()).build ());
678+ } else {
679+ logger .info (
680+ String .format (
681+ "readModifyWriteRow() did not find row: %s" , request .getRequest ().getRowKey ()));
682+ }
645683 } catch (ApiException e ) {
646684 responseObserver .onNext (
647685 RowResult .newBuilder ()
@@ -653,32 +691,23 @@ public void readModifyWriteRow(
653691 .build ());
654692 responseObserver .onCompleted ();
655693 return ;
656- }
657-
658- if (row != null ) {
659- try {
660- RowResult .Builder resultBuilder = convertRowResult (row );
661- responseObserver .onNext (
662- resultBuilder .setStatus (com .google .rpc .Status .getDefaultInstance ()).build ());
663- } catch (RuntimeException e ) {
664- // If client encounters problem, fail the whole operation.
665- responseObserver .onNext (
666- RowResult .newBuilder ()
667- .setStatus (
668- com .google .rpc .Status .newBuilder ()
669- .setCode (Code .INTERNAL .getNumber ())
670- .setMessage (e .getMessage ())
671- .build ())
672- .build ());
673- responseObserver .onCompleted ();
674- return ;
675- }
676- } else {
677- logger .info (
678- String .format (
679- "readModifyWriteRow() did not find row: %s" , request .getRequest ().getRowKey ()));
694+ } catch (StatusRuntimeException e ) {
680695 responseObserver .onNext (
681- RowResult .newBuilder ().setStatus (com .google .rpc .Status .getDefaultInstance ()).build ());
696+ RowResult .newBuilder ().setStatus (StatusProto .fromThrowable (e )).build ());
697+ responseObserver .onCompleted ();
698+ return ;
699+ } catch (RuntimeException e ) {
700+ // If client encounters problem, fail the whole operation.
701+ responseObserver .onNext (
702+ RowResult .newBuilder ()
703+ .setStatus (
704+ com .google .rpc .Status .newBuilder ()
705+ .setCode (Code .INTERNAL .getNumber ())
706+ .setMessage (e .getMessage ())
707+ .build ())
708+ .build ());
709+ responseObserver .onCompleted ();
710+ return ;
682711 }
683712 responseObserver .onCompleted ();
684713 }
@@ -727,13 +756,7 @@ public void executeQuery(
727756 return ;
728757 } catch (StatusRuntimeException e ) {
729758 responseObserver .onNext (
730- ExecuteQueryResult .newBuilder ()
731- .setStatus (
732- com .google .rpc .Status .newBuilder ()
733- .setCode (e .getStatus ().getCode ().value ())
734- .setMessage (e .getStatus ().getDescription ())
735- .build ())
736- .build ());
759+ ExecuteQueryResult .newBuilder ().setStatus (StatusProto .fromThrowable (e )).build ());
737760 responseObserver .onCompleted ();
738761 return ;
739762 } catch (RuntimeException e ) {
0 commit comments