@@ -474,7 +474,18 @@ func (r dockerFetcher) open(ctx context.Context, req *request, mediatype string,
474474 return nil , err
475475 }
476476
477- body := resp .Body
477+ body := & fnOnClose {
478+ BeforeClose : func () {
479+ r .Release (1 )
480+ },
481+ ReadCloser : resp .Body ,
482+ }
483+ defer func () {
484+ if retErr != nil {
485+ body .Close ()
486+ }
487+ }()
488+
478489 encoding := strings .FieldsFunc (resp .Header .Get ("Content-Encoding" ), func (r rune ) bool {
479490 return r == ' ' || r == '\t' || r == ','
480491 })
@@ -505,29 +516,33 @@ func (r dockerFetcher) open(ctx context.Context, req *request, mediatype string,
505516 for i := range numChunks {
506517 readers [i ], writers [i ] = newPipeWriter (bufPool )
507518 }
519+ // keep reference of the initial body value to ensure it is closed
520+ ibody := body
508521 go func () {
509522 for i := range numChunks {
510523 select {
511524 case queue <- i :
512525 case <- done :
526+ if i == 0 {
527+ ibody .Close ()
528+ }
513529 return // avoid leaking a goroutine if we exit early.
514530 }
515531 }
516532 close (queue )
517533 }()
518- r .Release (1 )
519534 for range parallelism {
520535 go func () {
521536 for i := range queue { // first in first out
522537 copy := func () error {
523- if err := r .Acquire (ctx , 1 ); err != nil {
524- return err
525- }
526- defer r .Release (1 )
527538 var body io.ReadCloser
528539 if i == 0 {
529- body = resp . Body
540+ body = ibody
530541 } else {
542+ if err := r .Acquire (ctx , 1 ); err != nil {
543+ return err
544+ }
545+ defer r .Release (1 )
531546 reqClone := req .clone ()
532547 reqClone .setOffset (offset + i * chunkSize )
533548 nresp , err := reqClone .doWithRetries (ctx , lastHost , withErrorCheck )
@@ -564,37 +579,27 @@ func (r dockerFetcher) open(ctx context.Context, req *request, mediatype string,
564579 },
565580 ReadCloser : io .NopCloser (io .MultiReader (readers ... )),
566581 }
567- } else {
568- defer func () {
569- if retErr != nil {
570- r .Release (1 )
571- }
572- }()
573- body = & fnOnClose {
574- BeforeClose : func () {
575- r .Release (1 )
576- },
577- ReadCloser : body ,
578- }
579582 }
583+
580584 for i := len (encoding ) - 1 ; i >= 0 ; i -- {
581585 algorithm := strings .ToLower (encoding [i ])
582586 switch algorithm {
583587 case "zstd" :
584- r , err := zstd .NewReader (body ,
588+ r , err := zstd .NewReader (body . ReadCloser ,
585589 zstd .WithDecoderLowmem (false ),
586590 )
587591 if err != nil {
588592 return nil , err
589593 }
590- body = r .IOReadCloser ()
594+ body . ReadCloser = r .IOReadCloser ()
591595 case "gzip" :
592- body , err = gzip .NewReader (body )
596+ r , err : = gzip .NewReader (body . ReadCloser )
593597 if err != nil {
594598 return nil , err
595599 }
600+ body .ReadCloser = r
596601 case "deflate" :
597- body = flate .NewReader (body )
602+ body . ReadCloser = flate .NewReader (body . ReadCloser )
598603 case "identity" , "" :
599604 // no content-encoding applied, use raw body
600605 default :
0 commit comments