@@ -102,7 +102,7 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage
102102 setFlush (func () (int64 , error ) {
103103 return gw .flush ()
104104 })
105- gw , err := newGRPCWriter (c , s , params , pr , pw , params .setPipeWriter )
105+ gw , err := newGRPCWriter (c , s , params , pr , pr , pw , params .setPipeWriter )
106106 if err != nil {
107107 errorf (err )
108108 pr .CloseWithError (err )
@@ -188,14 +188,14 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage
188188 // These calls are still valid if err is nil
189189 err = checkCanceled (err )
190190 errorf (err )
191- pr .CloseWithError (err )
191+ gw . pr .CloseWithError (err )
192192 close (params .donec )
193193 }()
194194
195195 return pw , nil
196196}
197197
198- func newGRPCWriter (c * grpcStorageClient , s * settings , params * openWriterParams , r io.Reader , pw * io.PipeWriter , setPipeWriter func (* io.PipeWriter )) (* gRPCWriter , error ) {
198+ func newGRPCWriter (c * grpcStorageClient , s * settings , params * openWriterParams , r io.Reader , pr * io. PipeReader , pw * io.PipeWriter , setPipeWriter func (* io.PipeWriter )) (* gRPCWriter , error ) {
199199 if params .attrs .Retention != nil {
200200 // TO-DO: remove once ObjectRetention is available - see b/308194853
201201 return nil , status .Errorf (codes .Unimplemented , "storage: object retention is not supported in gRPC" )
@@ -241,6 +241,7 @@ func newGRPCWriter(c *grpcStorageClient, s *settings, params *openWriterParams,
241241 ctx : params .ctx ,
242242 reader : r ,
243243 pw : pw ,
244+ pr : pr ,
244245 bucket : params .bucket ,
245246 attrs : params .attrs ,
246247 conds : params .conds ,
@@ -266,6 +267,7 @@ type gRPCWriter struct {
266267 c * grpcStorageClient
267268 buf []byte
268269 reader io.Reader
270+ pr * io.PipeReader // Keep track of pr and pw to update post-flush
269271 pw * io.PipeWriter
270272 setPipeWriter func (* io.PipeWriter ) // used to set in parent storage.Writer
271273
@@ -628,6 +630,7 @@ func (w *gRPCWriter) read() (int, bool, error) {
628630 pr , pw := io .Pipe ()
629631 w .reader = pr
630632 w .pw = pw
633+ w .pr = pr
631634 w .setPipeWriter (pw )
632635 } else {
633636 done = true
0 commit comments