Skip to content

Commit 4272271

Browse files
fix(storage): fix panic error in transfermanager downloads (#13815)
1 parent 57f097e commit 4272271

File tree

2 files changed

+103
-105
lines changed

2 files changed

+103
-105
lines changed

storage/transfermanager/downloader.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -485,9 +485,12 @@ func (d *Downloader) gatherShards(in *DownloadObjectInput, out *DownloadOutput,
485485
// If a shard errored, track the error and cancel the shared ctx.
486486
errs = append(errs, shardOut.Err)
487487
in.cancelCtx(errCancelAllShards)
488+
continue
489+
}
490+
// Collect remaining checksums starting from second shard.
491+
if shardOut.shard > 0 && shardOut.shard < shards {
492+
orderedChecksums[shardOut.shard-1] = crc32cPiece{sum: shardOut.crc32c, length: shardOut.shardLength}
488493
}
489-
490-
orderedChecksums[shardOut.shard-1] = crc32cPiece{sum: shardOut.crc32c, length: shardOut.shardLength}
491494
}
492495

493496
// All pieces gathered.

storage/transfermanager/downloader_test.go

Lines changed: 98 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -510,116 +510,111 @@ func TestCalculateRange(t *testing.T) {
510510
// This tests that gather shards works as expected and cancels other shards
511511
// without error after it encounters an error.
512512
func TestGatherShards(t *testing.T) {
513-
ctx, cancelCtx := context.WithCancelCause(context.Background())
514-
515-
// Start a downloader.
516-
d, err := NewDownloader(nil, WithWorkers(2), WithCallbacks())
517-
if err != nil {
518-
t.Fatalf("NewDownloader: %v", err)
519-
}
520-
521-
// Test that gatherShards finishes without error.
522-
object := "obj1"
523-
shards := 4
524-
downloadRange := &DownloadRange{
525-
Offset: 20,
526-
Length: 120,
527-
}
528-
firstOut := &DownloadOutput{Object: object, Range: &DownloadRange{Offset: 20, Length: 30}, shard: 0}
529-
outChan := make(chan *DownloadOutput, shards)
530-
outs := []*DownloadOutput{
531-
{Object: object, Range: &DownloadRange{Offset: 50, Length: 30}, shard: 1},
532-
{Object: object, Range: &DownloadRange{Offset: 80, Length: 30}, shard: 2},
533-
{Object: object, Range: &DownloadRange{Offset: 110, Length: 30}, shard: 3},
534-
}
513+
content := "hello world"
514+
piece1 := "hello"
515+
piece2 := " "
516+
piece3 := "world"
517+
fullCRC := crc32c([]byte(content))
518+
firstPieceCRC := crc32c([]byte(piece1))
519+
520+
for _, tc := range []struct {
521+
desc string
522+
shards int
523+
outputs []*DownloadOutput
524+
checkCRC bool
525+
wantErr bool
526+
errMsg string
527+
expectCancel bool
528+
}{
529+
{
530+
desc: "all shards succeed",
531+
shards: 3,
532+
outputs: []*DownloadOutput{
533+
{shard: 1, crc32c: crc32c([]byte(piece2)), shardLength: int64(len(piece2))},
534+
{shard: 2, crc32c: crc32c([]byte(piece3)), shardLength: int64(len(piece3))},
535+
},
536+
checkCRC: true,
537+
wantErr: false,
538+
},
539+
{
540+
desc: "one shard fails",
541+
shards: 3,
542+
outputs: []*DownloadOutput{
543+
{shard: 0, Err: errors.New("shard failure")},
544+
{shard: 2, crc32c: crc32c([]byte(piece3)), shardLength: int64(len(piece3))},
545+
},
546+
checkCRC: true,
547+
wantErr: true,
548+
expectCancel: true,
549+
errMsg: "shard failure",
550+
},
551+
{
552+
desc: "checksum mismatch",
553+
shards: 3,
554+
outputs: []*DownloadOutput{
555+
{shard: 1, crc32c: crc32c([]byte(piece2)), shardLength: int64(len(piece2))},
556+
{shard: 2, crc32c: 999, shardLength: int64(len(piece3))}, // Wrong CRC
557+
},
558+
checkCRC: true,
559+
wantErr: true,
560+
errMsg: "bad CRC on read",
561+
},
562+
} {
563+
t.Run(tc.desc, func(t *testing.T) {
564+
ctx, cancelCtx := context.WithCancelCause(context.Background())
565+
defer cancelCtx(nil)
566+
d, err := NewDownloader(nil, WithWorkers(2), WithCallbacks())
567+
if err != nil {
568+
t.Fatalf("NewDownloader: %v", err)
569+
}
535570

536-
in := &DownloadObjectInput{
537-
Callback: func(o *DownloadOutput) {
538-
if o.Err != nil {
539-
t.Errorf("unexpected error in DownloadOutput: %v", o.Err)
571+
outChan := make(chan *DownloadOutput, tc.shards)
572+
in := &DownloadObjectInput{
573+
Callback: func(o *DownloadOutput) {},
574+
ctx: ctx,
575+
cancelCtx: cancelCtx,
576+
shardOutputs: outChan,
577+
checkCRC: tc.checkCRC,
540578
}
541-
if o.Range != downloadRange {
542-
t.Errorf("mismatching download range, got: %v, want: %v", o.Range, downloadRange)
579+
out := &DownloadOutput{
580+
Attrs: &storage.ReaderObjectAttrs{CRC32C: fullCRC},
543581
}
544-
if o.Object != object {
545-
t.Errorf("mismatching object names, got: %v, want: %v", o.Object, object)
582+
var wg sync.WaitGroup
583+
wg.Add(1)
584+
d.downloadsInProgress.Add(1)
585+
go func() {
586+
d.gatherShards(in, out, outChan, tc.shards, firstPieceCRC)
587+
wg.Done()
588+
}()
589+
590+
for _, so := range tc.outputs {
591+
outChan <- so
546592
}
547-
},
548-
ctx: ctx,
549-
cancelCtx: cancelCtx,
550-
shardOutputs: outChan,
551-
Range: downloadRange,
552-
}
553593

554-
var wg sync.WaitGroup
555-
wg.Add(1)
556-
d.downloadsInProgress.Add(1)
594+
wg.Wait()
557595

558-
go func() {
559-
d.gatherShards(in, firstOut, outChan, shards, 0)
560-
wg.Done()
561-
}()
562-
563-
for _, o := range outs {
564-
outChan <- o
565-
}
566-
567-
wg.Wait()
568-
569-
// Test that an error will cancel remaining pieces correctly.
570-
shardErr := errors.New("some error")
571-
572-
in.Callback = func(o *DownloadOutput) {
573-
// Error returned should wrap the original error.
574-
if !errors.Is(o.Err, shardErr) {
575-
t.Errorf("error in DownloadOutput should wrap error %q; intead got: %v", shardErr, o.Err)
576-
}
577-
// Error returned should not wrap nor contain the sentinel error.
578-
if errors.Is(o.Err, errCancelAllShards) || strings.Contains(o.Err.Error(), errCancelAllShards.Error()) {
579-
t.Errorf("error in DownloadOutput should not contain error %q; got: %v", errCancelAllShards, o.Err)
580-
}
581-
if o.Range != downloadRange {
582-
t.Errorf("mismatching download range, got: %v, want: %v", o.Range, downloadRange)
583-
}
584-
if o.Object != object {
585-
t.Errorf("mismatching object names, got: %v, want: %v", o.Object, object)
586-
}
587-
}
588-
589-
wg.Add(1)
590-
d.downloadsInProgress.Add(1)
591-
592-
go func() {
593-
d.gatherShards(in, firstOut, outChan, shards, 0)
594-
wg.Done()
595-
}()
596-
597-
// Send a successfull shard, an errored shard, and then a cancelled shard.
598-
outs[1].Err = shardErr
599-
outs[2].Err = context.Canceled
600-
for _, o := range outs {
601-
outChan <- o
602-
}
603-
604-
// Check that the context was cancelled with the sentinel error.
605-
_, ok := <-in.ctx.Done()
606-
if ok {
607-
t.Error("context was not cancelled")
608-
}
609-
610-
if ctxErr := context.Cause(in.ctx); !errors.Is(ctxErr, errCancelAllShards) {
611-
t.Errorf("context.Cause: error should wrap %q; intead got: %v", errCancelAllShards, ctxErr)
612-
}
613-
614-
wg.Wait()
596+
// WaitAndClose will wait for downloadsInProgress, including gatherShards goroutine.
597+
if _, err := d.WaitAndClose(); err != nil && !tc.wantErr {
598+
t.Logf("WaitAndClose returned error: %v", err)
599+
}
600+
if tc.expectCancel {
601+
select {
602+
case <-ctx.Done():
603+
if ctxErr := context.Cause(ctx); !errors.Is(ctxErr, errCancelAllShards) {
604+
t.Errorf("context.Cause: error should be %q; instead got: %v", errCancelAllShards, ctxErr)
605+
}
606+
default:
607+
t.Error("context was not cancelled for shard error")
608+
}
609+
}
615610

616-
// Check that the overall error returned also wraps only the proper error.
617-
_, err = d.WaitAndClose()
618-
if !errors.Is(err, shardErr) {
619-
t.Errorf("error in DownloadOutput should wrap error %q; intead got: %v", shardErr, err)
620-
}
621-
if errors.Is(err, errCancelAllShards) || strings.Contains(err.Error(), errCancelAllShards.Error()) {
622-
t.Errorf("error in DownloadOutput should not contain error %q; got: %v", errCancelAllShards, err)
611+
if (out.Err != nil) != tc.wantErr {
612+
t.Errorf("got error %v, wantErr %v", out.Err, tc.wantErr)
613+
}
614+
if tc.wantErr && tc.errMsg != "" && (out.Err == nil || !strings.Contains(out.Err.Error(), tc.errMsg)) {
615+
t.Errorf("expected error containing %q, got %v", tc.errMsg, out.Err)
616+
}
617+
})
623618
}
624619
}
625620

0 commit comments

Comments
 (0)