restic/internal/repository/repack.go
Michael Eischer 2c310a526e repository: Replace StreamPack function with LoadBlobsFromPack method
LoadBlobsFromPack is now part of the repository struct. This ensures
that users of that method don't have to deal will internals of the
repository implementation.

The filerestorer tests now also contain far fewer pack file
implementation details.
2024-01-19 21:40:43 +01:00

140 lines
3.7 KiB
Go

package repository
import (
"context"
"sync"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui/progress"
"golang.org/x/sync/errgroup"
)
type repackBlobSet interface {
Has(bh restic.BlobHandle) bool
Delete(bh restic.BlobHandle)
Len() int
}
// Repack takes a list of packs together with a list of blobs contained in
// these packs. Each pack is loaded and the blobs listed in keepBlobs is saved
// into a new pack. Returned is the list of obsolete packs which can then
// be removed.
//
// The map keepBlobs is modified by Repack, it is used to keep track of which
// blobs have been processed.
func Repack(ctx context.Context, repo restic.Repository, dstRepo restic.Repository, packs restic.IDSet, keepBlobs repackBlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) {
debug.Log("repacking %d packs while keeping %d blobs", len(packs), keepBlobs.Len())
if repo == dstRepo && dstRepo.Connections() < 2 {
return nil, errors.New("repack step requires a backend connection limit of at least two")
}
wg, wgCtx := errgroup.WithContext(ctx)
dstRepo.StartPackUploader(wgCtx, wg)
wg.Go(func() error {
var err error
obsoletePacks, err = repack(wgCtx, repo, dstRepo, packs, keepBlobs, p)
return err
})
if err := wg.Wait(); err != nil {
return nil, err
}
return obsoletePacks, nil
}
func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Repository, packs restic.IDSet, keepBlobs repackBlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) {
wg, wgCtx := errgroup.WithContext(ctx)
var keepMutex sync.Mutex
downloadQueue := make(chan restic.PackBlobs)
wg.Go(func() error {
defer close(downloadQueue)
for pbs := range repo.Index().ListPacks(wgCtx, packs) {
var packBlobs []restic.Blob
keepMutex.Lock()
// filter out unnecessary blobs
for _, entry := range pbs.Blobs {
h := restic.BlobHandle{ID: entry.ID, Type: entry.Type}
if keepBlobs.Has(h) {
packBlobs = append(packBlobs, entry)
}
}
keepMutex.Unlock()
select {
case downloadQueue <- restic.PackBlobs{PackID: pbs.PackID, Blobs: packBlobs}:
case <-wgCtx.Done():
return wgCtx.Err()
}
}
return nil
})
worker := func() error {
for t := range downloadQueue {
err := repo.LoadBlobsFromPack(wgCtx, t.PackID, t.Blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
if err != nil {
var ierr error
// check whether we can get a valid copy somewhere else
buf, ierr = repo.LoadBlob(wgCtx, blob.Type, blob.ID, nil)
if ierr != nil {
// no luck, return the original error
return err
}
}
keepMutex.Lock()
// recheck whether some other worker was faster
shouldKeep := keepBlobs.Has(blob)
if shouldKeep {
keepBlobs.Delete(blob)
}
keepMutex.Unlock()
if !shouldKeep {
return nil
}
// We do want to save already saved blobs!
_, _, _, err = dstRepo.SaveBlob(wgCtx, blob.Type, buf, blob.ID, true)
if err != nil {
return err
}
debug.Log(" saved blob %v", blob.ID)
return nil
})
if err != nil {
return err
}
p.Add(1)
}
return nil
}
// as packs are streamed the concurrency is limited by IO
// reduce by one to ensure that uploading is always possible
repackWorkerCount := int(repo.Connections() - 1)
if repo != dstRepo {
// no need to share the upload and download connections for different repositories
repackWorkerCount = int(repo.Connections())
}
for i := 0; i < repackWorkerCount; i++ {
wg.Go(worker)
}
if err := wg.Wait(); err != nil {
return nil, err
}
if err := dstRepo.Flush(ctx); err != nil {
return nil, err
}
return packs, nil
}