Skip to content

Commit add2dcf

Browse files
dmcgowank8s-infra-cherrypick-robot
authored andcommitted
Ensure fetcher always closes body and properly calls release
Signed-off-by: Derek McGowan <derek@mcg.dev>
1 parent 34a1cb1 commit add2dcf

1 file changed

Lines changed: 28 additions & 23 deletions

File tree

core/remotes/docker/fetcher.go

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,18 @@ func (r dockerFetcher) open(ctx context.Context, req *request, mediatype string,
474474
return nil, err
475475
}
476476

477-
body := resp.Body
477+
body := &fnOnClose{
478+
BeforeClose: func() {
479+
r.Release(1)
480+
},
481+
ReadCloser: resp.Body,
482+
}
483+
defer func() {
484+
if retErr != nil {
485+
body.Close()
486+
}
487+
}()
488+
478489
encoding := strings.FieldsFunc(resp.Header.Get("Content-Encoding"), func(r rune) bool {
479490
return r == ' ' || r == '\t' || r == ','
480491
})
@@ -505,29 +516,33 @@ func (r dockerFetcher) open(ctx context.Context, req *request, mediatype string,
505516
for i := range numChunks {
506517
readers[i], writers[i] = newPipeWriter(bufPool)
507518
}
519+
// keep reference of the initial body value to ensure it is closed
520+
ibody := body
508521
go func() {
509522
for i := range numChunks {
510523
select {
511524
case queue <- i:
512525
case <-done:
526+
if i == 0 {
527+
ibody.Close()
528+
}
513529
return // avoid leaking a goroutine if we exit early.
514530
}
515531
}
516532
close(queue)
517533
}()
518-
r.Release(1)
519534
for range parallelism {
520535
go func() {
521536
for i := range queue { // first in first out
522537
copy := func() error {
523-
if err := r.Acquire(ctx, 1); err != nil {
524-
return err
525-
}
526-
defer r.Release(1)
527538
var body io.ReadCloser
528539
if i == 0 {
529-
body = resp.Body
540+
body = ibody
530541
} else {
542+
if err := r.Acquire(ctx, 1); err != nil {
543+
return err
544+
}
545+
defer r.Release(1)
531546
reqClone := req.clone()
532547
reqClone.setOffset(offset + i*chunkSize)
533548
nresp, err := reqClone.doWithRetries(ctx, lastHost, withErrorCheck)
@@ -564,37 +579,27 @@ func (r dockerFetcher) open(ctx context.Context, req *request, mediatype string,
564579
},
565580
ReadCloser: io.NopCloser(io.MultiReader(readers...)),
566581
}
567-
} else {
568-
defer func() {
569-
if retErr != nil {
570-
r.Release(1)
571-
}
572-
}()
573-
body = &fnOnClose{
574-
BeforeClose: func() {
575-
r.Release(1)
576-
},
577-
ReadCloser: body,
578-
}
579582
}
583+
580584
for i := len(encoding) - 1; i >= 0; i-- {
581585
algorithm := strings.ToLower(encoding[i])
582586
switch algorithm {
583587
case "zstd":
584-
r, err := zstd.NewReader(body,
588+
r, err := zstd.NewReader(body.ReadCloser,
585589
zstd.WithDecoderLowmem(false),
586590
)
587591
if err != nil {
588592
return nil, err
589593
}
590-
body = r.IOReadCloser()
594+
body.ReadCloser = r.IOReadCloser()
591595
case "gzip":
592-
body, err = gzip.NewReader(body)
596+
r, err := gzip.NewReader(body.ReadCloser)
593597
if err != nil {
594598
return nil, err
595599
}
600+
body.ReadCloser = r
596601
case "deflate":
597-
body = flate.NewReader(body)
602+
body.ReadCloser = flate.NewReader(body.ReadCloser)
598603
case "identity", "":
599604
// no content-encoding applied, use raw body
600605
default:

0 commit comments

Comments
 (0)