@@ -819,11 +819,12 @@ func bidiWriteObjectRequest(r gRPCBidiWriteRequest, bufChecksum *uint32, objectC
819819}
820820
821821type getObjectChecksumsParams struct {
822- fullObjectChecksum func () uint32
823- finishWrite bool
824822 sendCRC32C bool
825823 disableAutoChecksum bool
826- attrs * ObjectAttrs
824+ objectAttrs * ObjectAttrs
825+ fullObjectChecksum func () uint32
826+ finishWrite bool
827+ takeoverWriter bool
827828}
828829
829830// getObjectChecksums determines what checksum information to include in the final
@@ -840,9 +841,10 @@ func getObjectChecksums(params *getObjectChecksumsParams) *storagepb.ObjectCheck
840841
841842 // send user's checksum on last write op if available
842843 if params .sendCRC32C {
843- return toProtoChecksums (params .sendCRC32C , params .attrs )
844+ return toProtoChecksums (params .sendCRC32C , params .objectAttrs )
844845 }
845- if params .disableAutoChecksum {
846+ // TODO(b/461982277): Enable checksum validation for appendable takeover writer gRPC
847+ if params .disableAutoChecksum || params .takeoverWriter {
846848 return nil
847849 }
848850 return & storagepb.ObjectChecksums {
@@ -879,8 +881,11 @@ type gRPCOneshotBidiWriteBufferSender struct {
879881 firstMessage * storagepb.BidiWriteObjectRequest
880882 streamErr error
881883
882- checksumSettings func () (bool , bool , * ObjectAttrs )
883- fullObjectChecksum func () uint32
884+ // Checksum related settings.
885+ sendCRC32C bool
886+ disableAutoChecksum bool
887+ objectAttrs * ObjectAttrs
888+ fullObjectChecksum func () uint32
884889}
885890
886891func (w * gRPCWriter ) newGRPCOneshotBidiWriteBufferSender () * gRPCOneshotBidiWriteBufferSender {
@@ -894,9 +899,9 @@ func (w *gRPCWriter) newGRPCOneshotBidiWriteBufferSender() *gRPCOneshotBidiWrite
894899 CommonObjectRequestParams : toProtoCommonObjectRequestParams (w .encryptionKey ),
895900 ObjectChecksums : toProtoChecksums (w .sendCRC32C , w .attrs ),
896901 },
897- checksumSettings : func () ( bool , bool , * ObjectAttrs ) {
898- return w .sendCRC32C , w . disableAutoChecksum , w . attrs
899- } ,
902+ sendCRC32C : w . sendCRC32C ,
903+ disableAutoChecksum : w .disableAutoChecksum ,
904+ objectAttrs : w . attrs ,
900905 fullObjectChecksum : func () uint32 {
901906 return w .fullObjectChecksum
902907 },
@@ -939,17 +944,16 @@ func (s *gRPCOneshotBidiWriteBufferSender) connect(ctx context.Context, cs gRPCB
939944 continue
940945 }
941946
942- sendCrc32C , disableAutoChecksum , attrs := s .checksumSettings ()
943947 var bufChecksum * uint32
944- if ! disableAutoChecksum {
948+ if ! s . disableAutoChecksum {
945949 bufChecksum = proto .Uint32 (crc32 .Checksum (r .buf , crc32cTable ))
946950 }
947951 objectChecksums := getObjectChecksums (& getObjectChecksumsParams {
952+ sendCRC32C : s .sendCRC32C ,
953+ objectAttrs : s .objectAttrs ,
948954 fullObjectChecksum : s .fullObjectChecksum ,
955+ disableAutoChecksum : s .disableAutoChecksum ,
949956 finishWrite : r .finishWrite ,
950- sendCRC32C : sendCrc32C ,
951- disableAutoChecksum : disableAutoChecksum ,
952- attrs : attrs ,
953957 })
954958 req := bidiWriteObjectRequest (r , bufChecksum , objectChecksums )
955959
@@ -996,8 +1000,11 @@ type gRPCResumableBidiWriteBufferSender struct {
9961000 startWriteRequest * storagepb.StartResumableWriteRequest
9971001 upid string
9981002
999- checksumSettings func () (bool , bool , * ObjectAttrs )
1000- fullObjectChecksum func () uint32
1003+ // Checksum related settings.
1004+ sendCRC32C bool
1005+ disableAutoChecksum bool
1006+ objectAttrs * ObjectAttrs
1007+ fullObjectChecksum func () uint32
10011008
10021009 streamErr error
10031010}
@@ -1011,9 +1018,9 @@ func (w *gRPCWriter) newGRPCResumableBidiWriteBufferSender() *gRPCResumableBidiW
10111018 CommonObjectRequestParams : toProtoCommonObjectRequestParams (w .encryptionKey ),
10121019 ObjectChecksums : toProtoChecksums (w .sendCRC32C , w .attrs ),
10131020 },
1014- checksumSettings : func () ( bool , bool , * ObjectAttrs ) {
1015- return w .sendCRC32C , w . disableAutoChecksum , w . attrs
1016- } ,
1021+ sendCRC32C : w . sendCRC32C ,
1022+ disableAutoChecksum : w .disableAutoChecksum ,
1023+ objectAttrs : w . attrs ,
10171024 fullObjectChecksum : func () uint32 {
10181025 return w .fullObjectChecksum
10191026 },
@@ -1076,17 +1083,16 @@ func (s *gRPCResumableBidiWriteBufferSender) connect(ctx context.Context, cs gRP
10761083 continue
10771084 }
10781085
1079- sendCrc32C , disableAutoChecksum , attrs := s .checksumSettings ()
10801086 var bufChecksum * uint32
1081- if ! disableAutoChecksum {
1087+ if ! s . disableAutoChecksum {
10821088 bufChecksum = proto .Uint32 (crc32 .Checksum (r .buf , crc32cTable ))
10831089 }
10841090 objectChecksums := getObjectChecksums (& getObjectChecksumsParams {
1091+ sendCRC32C : s .sendCRC32C ,
1092+ objectAttrs : s .objectAttrs ,
10851093 fullObjectChecksum : s .fullObjectChecksum ,
1094+ disableAutoChecksum : s .disableAutoChecksum ,
10861095 finishWrite : r .finishWrite ,
1087- sendCRC32C : sendCrc32C ,
1088- disableAutoChecksum : disableAutoChecksum ,
1089- attrs : attrs ,
10901096 })
10911097 req := bidiWriteObjectRequest (r , bufChecksum , objectChecksums )
10921098
@@ -1142,12 +1148,18 @@ type gRPCAppendBidiWriteBufferSender struct {
11421148 bucket string
11431149 routingToken * string
11441150
1145- firstMessage * storagepb.BidiWriteObjectRequest
1146-
1147- objectChecksums * storagepb.ObjectChecksums
1151+ firstMessage * storagepb.BidiWriteObjectRequest
11481152 finalizeOnClose bool
11491153 objResource * storagepb.Object
11501154
1155+ // Checksum related settings.
1156+ sendCRC32C bool
1157+ disableAutoChecksum bool
1158+ objectAttrs * ObjectAttrs
1159+ fullObjectChecksum func () uint32
1160+
1161+ takeoverWriter bool
1162+
11511163 streamErr error
11521164}
11531165
@@ -1164,8 +1176,13 @@ func (w *gRPCWriter) newGRPCAppendableObjectBufferSender() *gRPCAppendBidiWriteB
11641176 },
11651177 CommonObjectRequestParams : toProtoCommonObjectRequestParams (w .encryptionKey ),
11661178 },
1167- objectChecksums : toProtoChecksums (w .sendCRC32C , w .attrs ),
1168- finalizeOnClose : w .finalizeOnClose ,
1179+ finalizeOnClose : w .finalizeOnClose ,
1180+ sendCRC32C : w .sendCRC32C ,
1181+ disableAutoChecksum : w .disableAutoChecksum ,
1182+ objectAttrs : w .attrs ,
1183+ fullObjectChecksum : func () uint32 {
1184+ return w .fullObjectChecksum
1185+ },
11691186 }
11701187}
11711188
@@ -1278,8 +1295,14 @@ func (w *gRPCWriter) newGRPCAppendTakeoverWriteBufferSender() *gRPCAppendTakeove
12781295 AppendObjectSpec : writeObjectSpecAsAppendObjectSpec (w .spec , w .appendGen ),
12791296 },
12801297 },
1281- objectChecksums : toProtoChecksums (w .sendCRC32C , w .attrs ),
1282- finalizeOnClose : w .finalizeOnClose ,
1298+ finalizeOnClose : w .finalizeOnClose ,
1299+ takeoverWriter : true ,
1300+ sendCRC32C : w .sendCRC32C ,
1301+ disableAutoChecksum : w .disableAutoChecksum ,
1302+ objectAttrs : w .attrs ,
1303+ fullObjectChecksum : func () uint32 {
1304+ return w .fullObjectChecksum
1305+ },
12831306 },
12841307 takeoverReported : false ,
12851308 handleTakeoverCompletion : func (c gRPCBidiWriteCompletion ) {
@@ -1409,12 +1432,20 @@ func (s *gRPCAppendBidiWriteBufferSender) send(stream storagepb.Storage_BidiWrit
14091432 flush : flush ,
14101433 finishWrite : finalizeObject ,
14111434 }
1412- // TODO(b/453869602): implement default checksumming for appendable writes
1413- req := bidiWriteObjectRequest (r , nil , nil )
1414- if finalizeObject {
1415- // appendable objects pass checksums on the finalize message only
1416- req .ObjectChecksums = s .objectChecksums
1435+
1436+ var bufChecksum * uint32
1437+ if ! s .disableAutoChecksum {
1438+ bufChecksum = proto .Uint32 (crc32 .Checksum (r .buf , crc32cTable ))
14171439 }
1440+ objectChecksums := getObjectChecksums (& getObjectChecksumsParams {
1441+ sendCRC32C : s .sendCRC32C ,
1442+ objectAttrs : s .objectAttrs ,
1443+ fullObjectChecksum : s .fullObjectChecksum ,
1444+ disableAutoChecksum : s .disableAutoChecksum ,
1445+ finishWrite : finalizeObject ,
1446+ takeoverWriter : s .takeoverWriter ,
1447+ })
1448+ req := bidiWriteObjectRequest (r , bufChecksum , objectChecksums )
14181449 if sendFirstMessage {
14191450 proto .Merge (req , s .firstMessage )
14201451 }
0 commit comments