Skip to content

Commit 647baf3

Browse files
feat: add default checksums for appendable writer (excludes appendable takeover writer) (#13379)
1 parent 86c6edd commit 647baf3

File tree

5 files changed

+243
-46
lines changed

5 files changed

+243
-46
lines changed

storage/doc.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,10 @@ roles which must be enabled in order to do the export successfully. To
407407
disable this export, you can use the [WithDisabledClientMetrics] client
408408
option.
409409
410+
The gRPC client automatically computes and sends CRC32C checksums for uploads using [Writer],
411+
which provides an additional layer of data integrity validation when compared to the HTTP client.
412+
This behavior can optionally be disabled by using [Writer.DisableAutoChecksum].
413+
410414
# Storage Control API
411415
412416
Certain control plane and long-running operations for Cloud Storage (including Folder

storage/grpc_writer.go

Lines changed: 68 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -819,11 +819,12 @@ func bidiWriteObjectRequest(r gRPCBidiWriteRequest, bufChecksum *uint32, objectC
819819
}
820820

821821
type getObjectChecksumsParams struct {
822-
fullObjectChecksum func() uint32
823-
finishWrite bool
824822
sendCRC32C bool
825823
disableAutoChecksum bool
826-
attrs *ObjectAttrs
824+
objectAttrs *ObjectAttrs
825+
fullObjectChecksum func() uint32
826+
finishWrite bool
827+
takeoverWriter bool
827828
}
828829

829830
// getObjectChecksums determines what checksum information to include in the final
@@ -840,9 +841,10 @@ func getObjectChecksums(params *getObjectChecksumsParams) *storagepb.ObjectCheck
840841

841842
// send user's checksum on last write op if available
842843
if params.sendCRC32C {
843-
return toProtoChecksums(params.sendCRC32C, params.attrs)
844+
return toProtoChecksums(params.sendCRC32C, params.objectAttrs)
844845
}
845-
if params.disableAutoChecksum {
846+
// TODO(b/461982277): Enable checksum validation for appendable takeover writer gRPC
847+
if params.disableAutoChecksum || params.takeoverWriter {
846848
return nil
847849
}
848850
return &storagepb.ObjectChecksums{
@@ -879,8 +881,11 @@ type gRPCOneshotBidiWriteBufferSender struct {
879881
firstMessage *storagepb.BidiWriteObjectRequest
880882
streamErr error
881883

882-
checksumSettings func() (bool, bool, *ObjectAttrs)
883-
fullObjectChecksum func() uint32
884+
// Checksum related settings.
885+
sendCRC32C bool
886+
disableAutoChecksum bool
887+
objectAttrs *ObjectAttrs
888+
fullObjectChecksum func() uint32
884889
}
885890

886891
func (w *gRPCWriter) newGRPCOneshotBidiWriteBufferSender() *gRPCOneshotBidiWriteBufferSender {
@@ -894,9 +899,9 @@ func (w *gRPCWriter) newGRPCOneshotBidiWriteBufferSender() *gRPCOneshotBidiWrite
894899
CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey),
895900
ObjectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs),
896901
},
897-
checksumSettings: func() (bool, bool, *ObjectAttrs) {
898-
return w.sendCRC32C, w.disableAutoChecksum, w.attrs
899-
},
902+
sendCRC32C: w.sendCRC32C,
903+
disableAutoChecksum: w.disableAutoChecksum,
904+
objectAttrs: w.attrs,
900905
fullObjectChecksum: func() uint32 {
901906
return w.fullObjectChecksum
902907
},
@@ -939,17 +944,16 @@ func (s *gRPCOneshotBidiWriteBufferSender) connect(ctx context.Context, cs gRPCB
939944
continue
940945
}
941946

942-
sendCrc32C, disableAutoChecksum, attrs := s.checksumSettings()
943947
var bufChecksum *uint32
944-
if !disableAutoChecksum {
948+
if !s.disableAutoChecksum {
945949
bufChecksum = proto.Uint32(crc32.Checksum(r.buf, crc32cTable))
946950
}
947951
objectChecksums := getObjectChecksums(&getObjectChecksumsParams{
952+
sendCRC32C: s.sendCRC32C,
953+
objectAttrs: s.objectAttrs,
948954
fullObjectChecksum: s.fullObjectChecksum,
955+
disableAutoChecksum: s.disableAutoChecksum,
949956
finishWrite: r.finishWrite,
950-
sendCRC32C: sendCrc32C,
951-
disableAutoChecksum: disableAutoChecksum,
952-
attrs: attrs,
953957
})
954958
req := bidiWriteObjectRequest(r, bufChecksum, objectChecksums)
955959

@@ -996,8 +1000,11 @@ type gRPCResumableBidiWriteBufferSender struct {
9961000
startWriteRequest *storagepb.StartResumableWriteRequest
9971001
upid string
9981002

999-
checksumSettings func() (bool, bool, *ObjectAttrs)
1000-
fullObjectChecksum func() uint32
1003+
// Checksum related settings.
1004+
sendCRC32C bool
1005+
disableAutoChecksum bool
1006+
objectAttrs *ObjectAttrs
1007+
fullObjectChecksum func() uint32
10011008

10021009
streamErr error
10031010
}
@@ -1011,9 +1018,9 @@ func (w *gRPCWriter) newGRPCResumableBidiWriteBufferSender() *gRPCResumableBidiW
10111018
CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey),
10121019
ObjectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs),
10131020
},
1014-
checksumSettings: func() (bool, bool, *ObjectAttrs) {
1015-
return w.sendCRC32C, w.disableAutoChecksum, w.attrs
1016-
},
1021+
sendCRC32C: w.sendCRC32C,
1022+
disableAutoChecksum: w.disableAutoChecksum,
1023+
objectAttrs: w.attrs,
10171024
fullObjectChecksum: func() uint32 {
10181025
return w.fullObjectChecksum
10191026
},
@@ -1076,17 +1083,16 @@ func (s *gRPCResumableBidiWriteBufferSender) connect(ctx context.Context, cs gRP
10761083
continue
10771084
}
10781085

1079-
sendCrc32C, disableAutoChecksum, attrs := s.checksumSettings()
10801086
var bufChecksum *uint32
1081-
if !disableAutoChecksum {
1087+
if !s.disableAutoChecksum {
10821088
bufChecksum = proto.Uint32(crc32.Checksum(r.buf, crc32cTable))
10831089
}
10841090
objectChecksums := getObjectChecksums(&getObjectChecksumsParams{
1091+
sendCRC32C: s.sendCRC32C,
1092+
objectAttrs: s.objectAttrs,
10851093
fullObjectChecksum: s.fullObjectChecksum,
1094+
disableAutoChecksum: s.disableAutoChecksum,
10861095
finishWrite: r.finishWrite,
1087-
sendCRC32C: sendCrc32C,
1088-
disableAutoChecksum: disableAutoChecksum,
1089-
attrs: attrs,
10901096
})
10911097
req := bidiWriteObjectRequest(r, bufChecksum, objectChecksums)
10921098

@@ -1142,12 +1148,18 @@ type gRPCAppendBidiWriteBufferSender struct {
11421148
bucket string
11431149
routingToken *string
11441150

1145-
firstMessage *storagepb.BidiWriteObjectRequest
1146-
1147-
objectChecksums *storagepb.ObjectChecksums
1151+
firstMessage *storagepb.BidiWriteObjectRequest
11481152
finalizeOnClose bool
11491153
objResource *storagepb.Object
11501154

1155+
// Checksum related settings.
1156+
sendCRC32C bool
1157+
disableAutoChecksum bool
1158+
objectAttrs *ObjectAttrs
1159+
fullObjectChecksum func() uint32
1160+
1161+
takeoverWriter bool
1162+
11511163
streamErr error
11521164
}
11531165

@@ -1164,8 +1176,13 @@ func (w *gRPCWriter) newGRPCAppendableObjectBufferSender() *gRPCAppendBidiWriteB
11641176
},
11651177
CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey),
11661178
},
1167-
objectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs),
1168-
finalizeOnClose: w.finalizeOnClose,
1179+
finalizeOnClose: w.finalizeOnClose,
1180+
sendCRC32C: w.sendCRC32C,
1181+
disableAutoChecksum: w.disableAutoChecksum,
1182+
objectAttrs: w.attrs,
1183+
fullObjectChecksum: func() uint32 {
1184+
return w.fullObjectChecksum
1185+
},
11691186
}
11701187
}
11711188

@@ -1278,8 +1295,14 @@ func (w *gRPCWriter) newGRPCAppendTakeoverWriteBufferSender() *gRPCAppendTakeove
12781295
AppendObjectSpec: writeObjectSpecAsAppendObjectSpec(w.spec, w.appendGen),
12791296
},
12801297
},
1281-
objectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs),
1282-
finalizeOnClose: w.finalizeOnClose,
1298+
finalizeOnClose: w.finalizeOnClose,
1299+
takeoverWriter: true,
1300+
sendCRC32C: w.sendCRC32C,
1301+
disableAutoChecksum: w.disableAutoChecksum,
1302+
objectAttrs: w.attrs,
1303+
fullObjectChecksum: func() uint32 {
1304+
return w.fullObjectChecksum
1305+
},
12831306
},
12841307
takeoverReported: false,
12851308
handleTakeoverCompletion: func(c gRPCBidiWriteCompletion) {
@@ -1409,12 +1432,20 @@ func (s *gRPCAppendBidiWriteBufferSender) send(stream storagepb.Storage_BidiWrit
14091432
flush: flush,
14101433
finishWrite: finalizeObject,
14111434
}
1412-
// TODO(b/453869602): implement default checksumming for appendable writes
1413-
req := bidiWriteObjectRequest(r, nil, nil)
1414-
if finalizeObject {
1415-
// appendable objects pass checksums on the finalize message only
1416-
req.ObjectChecksums = s.objectChecksums
1435+
1436+
var bufChecksum *uint32
1437+
if !s.disableAutoChecksum {
1438+
bufChecksum = proto.Uint32(crc32.Checksum(r.buf, crc32cTable))
14171439
}
1440+
objectChecksums := getObjectChecksums(&getObjectChecksumsParams{
1441+
sendCRC32C: s.sendCRC32C,
1442+
objectAttrs: s.objectAttrs,
1443+
fullObjectChecksum: s.fullObjectChecksum,
1444+
disableAutoChecksum: s.disableAutoChecksum,
1445+
finishWrite: finalizeObject,
1446+
takeoverWriter: s.takeoverWriter,
1447+
})
1448+
req := bidiWriteObjectRequest(r, bufChecksum, objectChecksums)
14181449
if sendFirstMessage {
14191450
proto.Merge(req, s.firstMessage)
14201451
}

storage/grpc_writer_test.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ func TestGetObjectChecksums(t *testing.T) {
2727
fullObjectChecksum func() uint32
2828
finishWrite bool
2929
sendCRC32C bool
30+
takeoverWriter bool
3031
disableAutoChecksum bool
3132
attrs *ObjectAttrs
3233
want *storagepb.ObjectChecksums
@@ -73,16 +74,24 @@ func TestGetObjectChecksums(t *testing.T) {
7374
Crc32C: proto.Uint32(456),
7475
},
7576
},
77+
// TODO(b/461982277): remove this testcase once checksums for takeover writer is implemented
78+
{
79+
name: "takeover writer should return nil",
80+
finishWrite: true,
81+
takeoverWriter: true,
82+
want: nil,
83+
},
7684
}
7785

7886
for _, tt := range tests {
7987
t.Run(tt.name, func(t *testing.T) {
8088
got := getObjectChecksums(&getObjectChecksumsParams{
89+
disableAutoChecksum: tt.disableAutoChecksum,
90+
sendCRC32C: tt.sendCRC32C,
91+
objectAttrs: tt.attrs,
8192
fullObjectChecksum: tt.fullObjectChecksum,
8293
finishWrite: tt.finishWrite,
83-
sendCRC32C: tt.sendCRC32C,
84-
disableAutoChecksum: tt.disableAutoChecksum,
85-
attrs: tt.attrs,
94+
takeoverWriter: tt.takeoverWriter,
8695
})
8796
if !proto.Equal(got, tt.want) {
8897
t.Errorf("getObjectChecksums() = %v, want %v", got, tt.want)

0 commit comments

Comments
 (0)