Stream packs in check --read-data and during repacking#3484
Stream packs in check --read-data and during repacking#3484fd0 merged 13 commits intorestic:masterfrom
check --read-data and during repacking#3484Conversation
be1826d to
a7bede4
Compare
| bufRd.Reset(hrd) | ||
|
|
||
| // skip to start of first blob, offset == 0 for correct pack files | ||
| _, err := bufRd.Discard(int(offset)) |
There was a problem hiding this comment.
Is Discard the only reason for using a bufio.Reader here? Because then this could also be implemented as io.CopyN(ioutil.Discard, hrd, offset).
There was a problem hiding this comment.
repository.StreamPack also uses a bufio.Reader. The reader in the hashingLoader gets reused by StreamPack, which is essential to later on be able to extract the pack file header. Without using a bufio.Reader here, we'd end up in a situation where the Reader in StreamPack might have read and cached a part of the pack file header already.
| return nil, restic.ID{}, -1, errors.Wrap(err, "TempFile") | ||
| type BackendLoadFn func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error | ||
|
|
||
| func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { |
There was a problem hiding this comment.
Why not use goroutines instead of a callback? Like,
type BlobContent struct {
restic.BlobHandle
Plaintext []byte
Err error
}
func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob) <-chan *BlobContentThere was a problem hiding this comment.
The handleBlobFn can currently return an error to force StreamPack to try loading the blobs again, which won't work with goroutines. My idea here was to allow retrying broken downloads or temporary decryption errors. That functionality would currently only be used in Repack in the first if err != nil. However, I've just noticed that the error handling for SaveBlob in Repack was broken. As that error is one which cannot be retried.
So I guess, I'll remove at least the return value of handleBlobFn. The downside of letting StreamPack return a channel is that we also should make sure that the goroutine is shut down properly, which would probably also require using an ErrGroup as for StreamTrees.
There was a problem hiding this comment.
Hmm, looks like I can't get rid of the return value of handleBlobFn. If the filerestorer or repack encounters an error then StreamPack should just abort with an error. I've changed StreamPack to just interpret any error returned by the handleBlobFn as a permanent error, to prevent retries. It would be possible to get something similar by using errgroups, but in a quick test I ended up with a really messy error handling in the filerestorer.
Another complication of returning a channel would be that the plaintext buffer can no longer be reused easily.
ef2006e to
a066c78
Compare
The function supports efficiently loading a specified list of blobs from a single pack in a streaming fashion. That is there's no need for temporary files independent of the pack size.
When storing a blob fails, this is a fatal error which must not be retried.
a066c78 to
bba8ba7
Compare
fd0
left a comment
There was a problem hiding this comment.
Very impressive, I like it a lot! I've read through the code and found no issues with it, so in principle we can merge it.
The only thing that bugs me is that StreamPacks() has no dedicated tests. Do you think that's a good idea to add tests for it?
|
I've pushed a commit adding a few tests, please have a look! |
5966b32 to
e682f7c
Compare
|
The test looks fine. Thanks for adding it. Do you want to merge the PR or should I? |
|
Let's do the release first, then merge this PR. |
What does this PR change? What problem does it solve?
check --read-dataandprunekeep downloaded packs in temporary files which can end up being written to disk. This can cause a large amount of data being written to disk.The PR converts both operations to stream the pack files directly from the backend without using temporary files. Note that for pack uploads during
pruneandbackuptemporary files are still used. The streaming operation is implemented usingStreamPackwhich loads the requested blobs of a pack file andListPacksin the MasterIndex which returns all blobs contained in a set of pack files, with the result being grouped by pack file.The repack step of
pruneusedDownloadAndHashto verify the pack and then extract blobs that should be kept. Now, only the relevant part of the pack file is loaded which reduces the amount of data downloaded duringprune. Verifying the pack hash previously provided the assurance that the pack contains the list of blobs it was expected to have according the repository index. This assurance is now provided by directly accessing the pack file based on the index. As before each individual blob is checked to match the expected hash before any further processing.The check read data implementation injects a special
Loadmethod intoStreamPackto calculate the pack hash, verify individual blobs and extract the pack header for further processing in one pass.In addition, the filerestorer code is switched to use
StreamPackwhich removes the duplicate blob decryption implementation outside ofrepository.ListPacksin the MasterIndex iterates multiple times over the repository index to keep the memory overhead low.Was the change previously discussed in an issue or on the forum?
Fixes #3375
Related to #3465
Checklist
[ ] I have added documentation for relevant changes (in the manual).changelog/unreleased/that describes the changes for our users (see template).gofmton the code in all commits.