Skip to content

Commit 6ffe32b

Browse files
authored
fix(bigtable): Mutate groups even after first error (#11434)
* fix(bigtable): Mutate groups even after first error * simplify range use * fix unit tests * refactor code * resolve vet failures * fix tests
1 parent 0a81f8f commit 6ffe32b

File tree

2 files changed

+176
-4
lines changed

2 files changed

+176
-4
lines changed

bigtable/bigtable.go

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1013,7 +1013,8 @@ func mutationsAreRetryable(muts []*btpb.Mutation) bool {
10131013
return true
10141014
}
10151015

1016-
const maxMutations = 100000
1016+
// Overriden in tests
1017+
var maxMutations = 100000
10171018

10181019
// Apply mutates a row atomically. A mutation must contain at least one
10191020
// operation and at most 100000 operations.
@@ -1224,9 +1225,14 @@ func (m *Mutation) mergeToCell(family, column string, ts Timestamp, value *btpb.
12241225
type entryErr struct {
12251226
Entry *btpb.MutateRowsRequest_Entry
12261227
Err error
1228+
1229+
// TopLevelErr is the error received either from
1230+
// 1. client.MutateRows
1231+
// 2. stream.Recv
1232+
TopLevelErr error
12271233
}
12281234

1229-
// ApplyBulk applies multiple Mutations, up to a maximum of 100,000.
1235+
// ApplyBulk applies multiple Mutations.
12301236
// Each mutation is individually applied atomically,
12311237
// but the set of mutations may be applied in any order.
12321238
//
@@ -1254,17 +1260,31 @@ func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutatio
12541260
origEntries[i] = &entryErr{Entry: &btpb.MutateRowsRequest_Entry{RowKey: []byte(key), Mutations: mut.ops}}
12551261
}
12561262

1257-
for _, group := range groupEntries(origEntries, maxMutations) {
1263+
var firstGroupErr error
1264+
numFailed := 0
1265+
groups := groupEntries(origEntries, maxMutations)
1266+
for _, group := range groups {
12581267
err := t.applyGroup(ctx, group, opts...)
12591268
if err != nil {
1260-
return nil, err
1269+
if firstGroupErr == nil {
1270+
firstGroupErr = err
1271+
}
1272+
numFailed++
12611273
}
12621274
}
12631275

1276+
if numFailed == len(groups) {
1277+
return nil, firstGroupErr
1278+
}
1279+
12641280
// All the errors are accumulated into an array and returned, interspersed with nils for successful
12651281
// entries. The absence of any errors means we should return nil.
12661282
var foundErr bool
12671283
for _, entry := range origEntries {
1284+
if entry.Err == nil && entry.TopLevelErr != nil {
1285+
// Populate per mutation error if top level error is not nil
1286+
entry.Err = entry.TopLevelErr
1287+
}
12681288
if entry.Err != nil {
12691289
foundErr = true
12701290
}
@@ -1289,6 +1309,7 @@ func (t *Table) applyGroup(ctx context.Context, group []*entryErr, opts ...Apply
12891309
// We want to retry the entire request with the current group
12901310
return err
12911311
}
1312+
// Get the entries that need to be retried
12921313
group = t.getApplyBulkRetries(group)
12931314
if len(group) > 0 && len(idempotentRetryCodes) > 0 {
12941315
// We have at least one mutation that needs to be retried.
@@ -1324,6 +1345,11 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, headerMD
13241345
}
13251346
}
13261347

1348+
var topLevelErr error
1349+
defer func() {
1350+
populateTopLevelError(entryErrs, topLevelErr)
1351+
}()
1352+
13271353
entries := make([]*btpb.MutateRowsRequest_Entry, len(entryErrs))
13281354
for i, entryErr := range entryErrs {
13291355
entries[i] = entryErr.Entry
@@ -1340,6 +1366,7 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, headerMD
13401366

13411367
stream, err := t.c.client.MutateRows(ctx, req)
13421368
if err != nil {
1369+
_, topLevelErr = convertToGrpcStatusErr(err)
13431370
return err
13441371
}
13451372

@@ -1354,6 +1381,7 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, headerMD
13541381
}
13551382
if err != nil {
13561383
*trailerMD = stream.Trailer()
1384+
_, topLevelErr = convertToGrpcStatusErr(err)
13571385
return err
13581386
}
13591387

@@ -1370,6 +1398,12 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, headerMD
13701398
return nil
13711399
}
13721400

1401+
func populateTopLevelError(entries []*entryErr, topLevelErr error) {
1402+
for _, entry := range entries {
1403+
entry.TopLevelErr = topLevelErr
1404+
}
1405+
}
1406+
13731407
// groupEntries groups entries into groups of a specified size without breaking up
13741408
// individual entries.
13751409
func groupEntries(entries []*entryErr, maxSize int) [][]*entryErr {

bigtable/bigtable_test.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ import (
2828
"github.com/google/go-cmp/cmp"
2929
"google.golang.org/api/option"
3030
"google.golang.org/grpc"
31+
"google.golang.org/grpc/codes"
32+
"google.golang.org/grpc/status"
3133
)
3234

3335
var disableMetricsConfig = ClientConfig{MetricsProvider: NoopMetricsProvider{}}
@@ -875,3 +877,139 @@ func TestMutateRowsWithAggregates_MergeToCell(t *testing.T) {
875877
t.Error()
876878
}
877879
}
880+
881+
type rowKeyCheckingInterceptor struct {
882+
grpc.ClientStream
883+
failRow string
884+
failErr error // error to use while sending failed reponse for fail row
885+
requestCounter *int
886+
}
887+
888+
func (i *rowKeyCheckingInterceptor) SendMsg(m interface{}) error {
889+
*i.requestCounter = *i.requestCounter + 1
890+
if req, ok := m.(*btpb.MutateRowsRequest); ok {
891+
for _, entry := range req.Entries {
892+
if string(entry.RowKey) == i.failRow {
893+
return i.failErr
894+
}
895+
}
896+
}
897+
return i.ClientStream.SendMsg(m)
898+
}
899+
900+
func (i *rowKeyCheckingInterceptor) RecvMsg(m interface{}) error {
901+
return i.ClientStream.RecvMsg(m)
902+
}
903+
904+
// Mutations are broken down into groups of 'maxMutations' and then MutateRowsRequest is sent to Cloud Bigtable Service
905+
// This test validates that even if one of the group receives error, requests are sent for further groups
906+
func TestApplyBulk_MutationsSucceedAfterGroupError(t *testing.T) {
907+
testEnv, gotErr := NewEmulatedEnv(IntegrationTestConfig{})
908+
if gotErr != nil {
909+
t.Fatalf("NewEmulatedEnv failed: %v", gotErr)
910+
}
911+
912+
// Add interceptor to fail rows
913+
failedRow := "row2"
914+
failErr := status.Error(codes.InvalidArgument, "Invalid row key")
915+
reqCount := 0
916+
conn, gotErr := grpc.Dial(testEnv.server.Addr, grpc.WithInsecure(), grpc.WithBlock(),
917+
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(100<<20), grpc.MaxCallRecvMsgSize(100<<20)),
918+
grpc.WithStreamInterceptor(func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
919+
clientStream, err := streamer(ctx, desc, cc, method, opts...)
920+
return &rowKeyCheckingInterceptor{
921+
ClientStream: clientStream,
922+
failRow: failedRow,
923+
requestCounter: &reqCount,
924+
failErr: failErr,
925+
}, err
926+
}),
927+
)
928+
if gotErr != nil {
929+
t.Fatalf("grpc.Dial failed: %v", gotErr)
930+
}
931+
932+
// Create client and table
933+
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
934+
defer cancel()
935+
adminClient, gotErr := NewAdminClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn))
936+
if gotErr != nil {
937+
t.Fatalf("NewClient failed: %v", gotErr)
938+
}
939+
defer adminClient.Close()
940+
tableConf := &TableConf{
941+
TableID: testEnv.config.Table,
942+
ColumnFamilies: map[string]Family{
943+
"f": {
944+
ValueType: AggregateType{
945+
Input: Int64Type{},
946+
Aggregator: SumAggregator{},
947+
},
948+
},
949+
},
950+
}
951+
if err := adminClient.CreateTableFromConf(ctx, tableConf); err != nil {
952+
t.Fatalf("CreateTable(%v) failed: %v", testEnv.config.Table, err)
953+
}
954+
client, gotErr := NewClientWithConfig(ctx, testEnv.config.Project, testEnv.config.Instance, disableMetricsConfig, option.WithGRPCConn(conn))
955+
if gotErr != nil {
956+
t.Fatalf("NewClientWithConfig failed: %v", gotErr)
957+
}
958+
defer client.Close()
959+
table := client.Open(testEnv.config.Table)
960+
961+
// Override maxMutations to break mutations into smaller groups
962+
origMaxMutations := maxMutations
963+
t.Cleanup(func() {
964+
maxMutations = origMaxMutations
965+
})
966+
maxMutations = 2
967+
968+
// Create mutations
969+
m1 := NewMutation()
970+
m1.AddIntToCell("f", "q", 0, 1000)
971+
m2 := NewMutation()
972+
m2.AddIntToCell("f", "q", 0, 2000)
973+
974+
// Perform ApplyBulk operation and compare errors
975+
rowKeys := []string{"row1", "row1", failedRow, failedRow, "row3", "row3"}
976+
var wantErr error
977+
wantErrs := []error{nil, nil, failErr, failErr, nil, nil}
978+
gotErrs, gotErr := table.ApplyBulk(ctx, rowKeys, []*Mutation{m1, m2, m1, m2, m1, m2})
979+
980+
// Assert overall error
981+
if !equalErrs(gotErr, wantErr) {
982+
t.Fatalf("ApplyBulk err got: %v, want: %v", gotErr, wantErr)
983+
}
984+
985+
// Assert individual muation errors
986+
if len(gotErrs) != len(wantErrs) {
987+
t.Fatalf("ApplyBulk errs got: %v, want: %v", gotErrs, wantErrs)
988+
}
989+
for i := range gotErrs {
990+
if !equalErrs(gotErrs[i], wantErrs[i]) {
991+
t.Errorf("#%d ApplyBulk err got: %v, want: %v", i, gotErrs[i], wantErrs[i])
992+
}
993+
}
994+
995+
// Assert number of requests sent
996+
wantReqCount := len(rowKeys) / maxMutations
997+
if reqCount != wantReqCount {
998+
t.Errorf("Number of requests got: %v, want: %v", reqCount, wantReqCount)
999+
}
1000+
1001+
// Assert individual mutation apply success/failure by reading rows
1002+
gotErr = table.ReadRows(ctx, RowList{"row1", failedRow, "row3"}, func(row Row) bool {
1003+
rowMutated := bytes.Equal(row["f"][0].Value, binary.BigEndian.AppendUint64([]byte{}, 3000))
1004+
if rowMutated && row.Key() == failedRow {
1005+
t.Error("Expected mutation to fail for row " + row.Key())
1006+
}
1007+
if !rowMutated && row.Key() != failedRow {
1008+
t.Error("Expected mutation to succeed for row " + row.Key())
1009+
}
1010+
return true
1011+
})
1012+
if gotErr != nil {
1013+
t.Fatalf("ReadRows failed: %v", gotErr)
1014+
}
1015+
}

0 commit comments

Comments
 (0)