Merge pull request #2941 from MichaelEischer/parallel-repack

prune: Parallelize repack step
This commit is contained in:
Alexander Neumann 2020-11-05 11:00:41 +01:00 committed by GitHub
commit 636b2f2e94
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 157 additions and 70 deletions

View file

@ -0,0 +1,8 @@
Enhancement: Speed up repacking step of prune command
The repack step of the prune command, which moves still used file parts into
new pack files such that the old ones can be garbage collected later on, now
processes multiple pack files in parallel. This is especially beneficial for
high latency backends or when using a fast network connection.
https://github.com/restic/restic/pull/2941

View file

@ -2,18 +2,26 @@ package repository
import ( import (
"context" "context"
"os"
"sync"
"github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/fs" "github.com/restic/restic/internal/fs"
"github.com/restic/restic/internal/pack" "github.com/restic/restic/internal/pack"
"github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/restic"
"golang.org/x/sync/errgroup"
) )
const numRepackWorkers = 8
// Repack takes a list of packs together with a list of blobs contained in // Repack takes a list of packs together with a list of blobs contained in
// these packs. Each pack is loaded and the blobs listed in keepBlobs is saved // these packs. Each pack is loaded and the blobs listed in keepBlobs is saved
// into a new pack. Returned is the list of obsolete packs which can then // into a new pack. Returned is the list of obsolete packs which can then
// be removed. // be removed.
//
// The map keepBlobs is modified by Repack, it is used to keep track of which
// blobs have been processed.
func Repack(ctx context.Context, repo restic.Repository, packs restic.IDSet, keepBlobs restic.BlobSet, p *restic.Progress) (obsoletePacks restic.IDSet, err error) { func Repack(ctx context.Context, repo restic.Repository, packs restic.IDSet, keepBlobs restic.BlobSet, p *restic.Progress) (obsoletePacks restic.IDSet, err error) {
if p != nil { if p != nil {
p.Start() p.Start()
@ -22,91 +30,161 @@ func Repack(ctx context.Context, repo restic.Repository, packs restic.IDSet, kee
debug.Log("repacking %d packs while keeping %d blobs", len(packs), len(keepBlobs)) debug.Log("repacking %d packs while keeping %d blobs", len(packs), len(keepBlobs))
for packID := range packs { wg, ctx := errgroup.WithContext(ctx)
// load the complete pack into a temp file
h := restic.Handle{Type: restic.PackFile, Name: packID.String()}
tempfile, hash, packLength, err := DownloadAndHash(ctx, repo.Backend(), h) downloadQueue := make(chan restic.ID)
if err != nil { wg.Go(func() error {
return nil, errors.Wrap(err, "Repack") defer close(downloadQueue)
} for packID := range packs {
select {
debug.Log("pack %v loaded (%d bytes), hash %v", packID, packLength, hash) case downloadQueue <- packID:
case <-ctx.Done():
if !packID.Equal(hash) { return ctx.Err()
return nil, errors.Errorf("hash does not match id: want %v, got %v", packID, hash)
}
_, err = tempfile.Seek(0, 0)
if err != nil {
return nil, errors.Wrap(err, "Seek")
}
blobs, err := pack.List(repo.Key(), tempfile, packLength)
if err != nil {
return nil, err
}
debug.Log("processing pack %v, blobs: %v", packID, len(blobs))
var buf []byte
for _, entry := range blobs {
h := restic.BlobHandle{ID: entry.ID, Type: entry.Type}
if !keepBlobs.Has(h) {
continue
} }
}
return nil
})
debug.Log(" process blob %v", h) type repackJob struct {
tempfile *os.File
hash restic.ID
packLength int64
}
processQueue := make(chan repackJob)
// used to close processQueue once all downloaders have finished
var downloadWG sync.WaitGroup
if uint(cap(buf)) < entry.Length { downloader := func() error {
buf = make([]byte, entry.Length) defer downloadWG.Done()
} for packID := range downloadQueue {
buf = buf[:entry.Length] // load the complete pack into a temp file
h := restic.Handle{Type: restic.PackFile, Name: packID.String()}
n, err := tempfile.ReadAt(buf, int64(entry.Offset)) tempfile, hash, packLength, err := DownloadAndHash(ctx, repo.Backend(), h)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "ReadAt") return errors.Wrap(err, "Repack")
} }
if n != len(buf) { debug.Log("pack %v loaded (%d bytes), hash %v", packID, packLength, hash)
return nil, errors.Errorf("read blob %v from %v: not enough bytes read, want %v, got %v",
h, tempfile.Name(), len(buf), n) if !packID.Equal(hash) {
return errors.Errorf("hash does not match id: want %v, got %v", packID, hash)
} }
nonce, ciphertext := buf[:repo.Key().NonceSize()], buf[repo.Key().NonceSize():] select {
plaintext, err := repo.Key().Open(ciphertext[:0], nonce, ciphertext, nil) case processQueue <- repackJob{tempfile, hash, packLength}:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
downloadWG.Add(numRepackWorkers)
for i := 0; i < numRepackWorkers; i++ {
wg.Go(downloader)
}
wg.Go(func() error {
downloadWG.Wait()
close(processQueue)
return nil
})
var keepMutex sync.Mutex
worker := func() error {
for job := range processQueue {
tempfile, packID, packLength := job.tempfile, job.hash, job.packLength
blobs, err := pack.List(repo.Key(), tempfile, packLength)
if err != nil { if err != nil {
return nil, err return err
} }
id := restic.Hash(plaintext) debug.Log("processing pack %v, blobs: %v", packID, len(blobs))
if !id.Equal(entry.ID) { var buf []byte
debug.Log("read blob %v/%v from %v: wrong data returned, hash is %v", for _, entry := range blobs {
h.Type, h.ID, tempfile.Name(), id) h := restic.BlobHandle{ID: entry.ID, Type: entry.Type}
return nil, errors.Errorf("read blob %v from %v: wrong data returned, hash is %v",
h, tempfile.Name(), id) keepMutex.Lock()
shouldKeep := keepBlobs.Has(h)
keepMutex.Unlock()
if !shouldKeep {
continue
}
debug.Log(" process blob %v", h)
if uint(cap(buf)) < entry.Length {
buf = make([]byte, entry.Length)
}
buf = buf[:entry.Length]
n, err := tempfile.ReadAt(buf, int64(entry.Offset))
if err != nil {
return errors.Wrap(err, "ReadAt")
}
if n != len(buf) {
return errors.Errorf("read blob %v from %v: not enough bytes read, want %v, got %v",
h, tempfile.Name(), len(buf), n)
}
nonce, ciphertext := buf[:repo.Key().NonceSize()], buf[repo.Key().NonceSize():]
plaintext, err := repo.Key().Open(ciphertext[:0], nonce, ciphertext, nil)
if err != nil {
return err
}
id := restic.Hash(plaintext)
if !id.Equal(entry.ID) {
debug.Log("read blob %v/%v from %v: wrong data returned, hash is %v",
h.Type, h.ID, tempfile.Name(), id)
return errors.Errorf("read blob %v from %v: wrong data returned, hash is %v",
h, tempfile.Name(), id)
}
keepMutex.Lock()
// recheck whether some other worker was faster
shouldKeep = keepBlobs.Has(h)
if shouldKeep {
keepBlobs.Delete(h)
}
keepMutex.Unlock()
if !shouldKeep {
continue
}
// We do want to save already saved blobs!
_, _, err = repo.SaveBlob(ctx, entry.Type, plaintext, entry.ID, true)
if err != nil {
return err
}
debug.Log(" saved blob %v", entry.ID)
} }
// We do want to save already saved blobs! if err = tempfile.Close(); err != nil {
_, _, err = repo.SaveBlob(ctx, entry.Type, plaintext, entry.ID, true) return errors.Wrap(err, "Close")
if err != nil {
return nil, err
} }
debug.Log(" saved blob %v", entry.ID) if err = fs.RemoveIfExists(tempfile.Name()); err != nil {
return errors.Wrap(err, "Remove")
}
if p != nil {
p.Report(restic.Stat{Blobs: 1})
}
}
return nil
}
keepBlobs.Delete(h) for i := 0; i < numRepackWorkers; i++ {
} wg.Go(worker)
}
if err = tempfile.Close(); err != nil { if err := wg.Wait(); err != nil {
return nil, errors.Wrap(err, "Close") return nil, err
}
if err = fs.RemoveIfExists(tempfile.Name()); err != nil {
return nil, errors.Wrap(err, "Remove")
}
if p != nil {
p.Report(restic.Stat{Blobs: 1})
}
} }
if err := repo.Flush(ctx); err != nil { if err := repo.Flush(ctx); err != nil {

View file

@ -4,6 +4,7 @@ import (
"context" "context"
"math/rand" "math/rand"
"testing" "testing"
"time"
"github.com/restic/restic/internal/index" "github.com/restic/restic/internal/index"
"github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/repository"
@ -195,7 +196,7 @@ func TestRepack(t *testing.T) {
repo, cleanup := repository.TestRepository(t) repo, cleanup := repository.TestRepository(t)
defer cleanup() defer cleanup()
seed := rand.Int63() seed := time.Now().UnixNano()
rand.Seed(seed) rand.Seed(seed)
t.Logf("rand seed is %v", seed) t.Logf("rand seed is %v", seed)
@ -262,7 +263,7 @@ func TestRepackWrongBlob(t *testing.T) {
repo, cleanup := repository.TestRepository(t) repo, cleanup := repository.TestRepository(t)
defer cleanup() defer cleanup()
seed := rand.Int63() seed := time.Now().UnixNano()
rand.Seed(seed) rand.Seed(seed)
t.Logf("rand seed is %v", seed) t.Logf("rand seed is %v", seed)
@ -277,5 +278,5 @@ func TestRepackWrongBlob(t *testing.T) {
if err == nil { if err == nil {
t.Fatal("expected repack to fail but got no error") t.Fatal("expected repack to fail but got no error")
} }
t.Log(err) t.Logf("found expected error: %v", err)
} }

View file

@ -758,7 +758,7 @@ type Loader interface {
// DownloadAndHash is all-in-one helper to download content of the file at h to a temporary filesystem location // DownloadAndHash is all-in-one helper to download content of the file at h to a temporary filesystem location
// and calculate ID of the contents. Returned (temporary) file is positioned at the beginning of the file; // and calculate ID of the contents. Returned (temporary) file is positioned at the beginning of the file;
// it is reponsibility of the caller to close and delete the file. // it is the reponsibility of the caller to close and delete the file.
func DownloadAndHash(ctx context.Context, be Loader, h restic.Handle) (tmpfile *os.File, hash restic.ID, size int64, err error) { func DownloadAndHash(ctx context.Context, be Loader, h restic.Handle) (tmpfile *os.File, hash restic.ID, size int64, err error) {
tmpfile, err = fs.TempFile("", "restic-temp-") tmpfile, err = fs.TempFile("", "restic-temp-")
if err != nil { if err != nil {