@@ -28,6 +28,8 @@ import (
2828 "github.com/google/go-cmp/cmp"
2929 "google.golang.org/api/option"
3030 "google.golang.org/grpc"
31+ "google.golang.org/grpc/codes"
32+ "google.golang.org/grpc/status"
3133)
3234
3335var disableMetricsConfig = ClientConfig {MetricsProvider : NoopMetricsProvider {}}
@@ -875,3 +877,139 @@ func TestMutateRowsWithAggregates_MergeToCell(t *testing.T) {
875877 t .Error ()
876878 }
877879}
880+
881+ type rowKeyCheckingInterceptor struct {
882+ grpc.ClientStream
883+ failRow string
884+ failErr error // error to use while sending failed reponse for fail row
885+ requestCounter * int
886+ }
887+
888+ func (i * rowKeyCheckingInterceptor ) SendMsg (m interface {}) error {
889+ * i .requestCounter = * i .requestCounter + 1
890+ if req , ok := m .(* btpb.MutateRowsRequest ); ok {
891+ for _ , entry := range req .Entries {
892+ if string (entry .RowKey ) == i .failRow {
893+ return i .failErr
894+ }
895+ }
896+ }
897+ return i .ClientStream .SendMsg (m )
898+ }
899+
900+ func (i * rowKeyCheckingInterceptor ) RecvMsg (m interface {}) error {
901+ return i .ClientStream .RecvMsg (m )
902+ }
903+
904+ // Mutations are broken down into groups of 'maxMutations' and then MutateRowsRequest is sent to Cloud Bigtable Service
905+ // This test validates that even if one of the group receives error, requests are sent for further groups
906+ func TestApplyBulk_MutationsSucceedAfterGroupError (t * testing.T ) {
907+ testEnv , gotErr := NewEmulatedEnv (IntegrationTestConfig {})
908+ if gotErr != nil {
909+ t .Fatalf ("NewEmulatedEnv failed: %v" , gotErr )
910+ }
911+
912+ // Add interceptor to fail rows
913+ failedRow := "row2"
914+ failErr := status .Error (codes .InvalidArgument , "Invalid row key" )
915+ reqCount := 0
916+ conn , gotErr := grpc .Dial (testEnv .server .Addr , grpc .WithInsecure (), grpc .WithBlock (),
917+ grpc .WithDefaultCallOptions (grpc .MaxCallSendMsgSize (100 << 20 ), grpc .MaxCallRecvMsgSize (100 << 20 )),
918+ grpc .WithStreamInterceptor (func (ctx context.Context , desc * grpc.StreamDesc , cc * grpc.ClientConn , method string , streamer grpc.Streamer , opts ... grpc.CallOption ) (grpc.ClientStream , error ) {
919+ clientStream , err := streamer (ctx , desc , cc , method , opts ... )
920+ return & rowKeyCheckingInterceptor {
921+ ClientStream : clientStream ,
922+ failRow : failedRow ,
923+ requestCounter : & reqCount ,
924+ failErr : failErr ,
925+ }, err
926+ }),
927+ )
928+ if gotErr != nil {
929+ t .Fatalf ("grpc.Dial failed: %v" , gotErr )
930+ }
931+
932+ // Create client and table
933+ ctx , cancel := context .WithTimeout (context .Background (), 20 * time .Second )
934+ defer cancel ()
935+ adminClient , gotErr := NewAdminClient (ctx , testEnv .config .Project , testEnv .config .Instance , option .WithGRPCConn (conn ))
936+ if gotErr != nil {
937+ t .Fatalf ("NewClient failed: %v" , gotErr )
938+ }
939+ defer adminClient .Close ()
940+ tableConf := & TableConf {
941+ TableID : testEnv .config .Table ,
942+ ColumnFamilies : map [string ]Family {
943+ "f" : {
944+ ValueType : AggregateType {
945+ Input : Int64Type {},
946+ Aggregator : SumAggregator {},
947+ },
948+ },
949+ },
950+ }
951+ if err := adminClient .CreateTableFromConf (ctx , tableConf ); err != nil {
952+ t .Fatalf ("CreateTable(%v) failed: %v" , testEnv .config .Table , err )
953+ }
954+ client , gotErr := NewClientWithConfig (ctx , testEnv .config .Project , testEnv .config .Instance , disableMetricsConfig , option .WithGRPCConn (conn ))
955+ if gotErr != nil {
956+ t .Fatalf ("NewClientWithConfig failed: %v" , gotErr )
957+ }
958+ defer client .Close ()
959+ table := client .Open (testEnv .config .Table )
960+
961+ // Override maxMutations to break mutations into smaller groups
962+ origMaxMutations := maxMutations
963+ t .Cleanup (func () {
964+ maxMutations = origMaxMutations
965+ })
966+ maxMutations = 2
967+
968+ // Create mutations
969+ m1 := NewMutation ()
970+ m1 .AddIntToCell ("f" , "q" , 0 , 1000 )
971+ m2 := NewMutation ()
972+ m2 .AddIntToCell ("f" , "q" , 0 , 2000 )
973+
974+ // Perform ApplyBulk operation and compare errors
975+ rowKeys := []string {"row1" , "row1" , failedRow , failedRow , "row3" , "row3" }
976+ var wantErr error
977+ wantErrs := []error {nil , nil , failErr , failErr , nil , nil }
978+ gotErrs , gotErr := table .ApplyBulk (ctx , rowKeys , []* Mutation {m1 , m2 , m1 , m2 , m1 , m2 })
979+
980+ // Assert overall error
981+ if ! equalErrs (gotErr , wantErr ) {
982+ t .Fatalf ("ApplyBulk err got: %v, want: %v" , gotErr , wantErr )
983+ }
984+
985+ // Assert individual muation errors
986+ if len (gotErrs ) != len (wantErrs ) {
987+ t .Fatalf ("ApplyBulk errs got: %v, want: %v" , gotErrs , wantErrs )
988+ }
989+ for i := range gotErrs {
990+ if ! equalErrs (gotErrs [i ], wantErrs [i ]) {
991+ t .Errorf ("#%d ApplyBulk err got: %v, want: %v" , i , gotErrs [i ], wantErrs [i ])
992+ }
993+ }
994+
995+ // Assert number of requests sent
996+ wantReqCount := len (rowKeys ) / maxMutations
997+ if reqCount != wantReqCount {
998+ t .Errorf ("Number of requests got: %v, want: %v" , reqCount , wantReqCount )
999+ }
1000+
1001+ // Assert individual mutation apply success/failure by reading rows
1002+ gotErr = table .ReadRows (ctx , RowList {"row1" , failedRow , "row3" }, func (row Row ) bool {
1003+ rowMutated := bytes .Equal (row ["f" ][0 ].Value , binary .BigEndian .AppendUint64 ([]byte {}, 3000 ))
1004+ if rowMutated && row .Key () == failedRow {
1005+ t .Error ("Expected mutation to fail for row " + row .Key ())
1006+ }
1007+ if ! rowMutated && row .Key () != failedRow {
1008+ t .Error ("Expected mutation to succeed for row " + row .Key ())
1009+ }
1010+ return true
1011+ })
1012+ if gotErr != nil {
1013+ t .Fatalf ("ReadRows failed: %v" , gotErr )
1014+ }
1015+ }
0 commit comments