diff --git a/changelog/unreleased/issue-3202 b/changelog/unreleased/issue-3202 new file mode 100644 index 000000000..522af9933 --- /dev/null +++ b/changelog/unreleased/issue-3202 @@ -0,0 +1,8 @@ +Enhancement: Add warmup support on S3 backend before repacks and restores + +Introduce S3 backend options for transitioning pack files from cold to hot +storage on S3 and S3-compatible providers. Note: only works before repacks +(prune/copy) and restore for now. + +https://github.com/restic/restic/issues/3202 +https://github.com/restic/restic/issues/2504 diff --git a/doc/faq.rst b/doc/faq.rst index 74dd77d71..781af4043 100644 --- a/doc/faq.rst +++ b/doc/faq.rst @@ -242,3 +242,31 @@ collect a list of all files, causing the following error: List(data) returned error, retrying after 1s: [...]: request timeout In this case you can increase the timeout using the ``--stuck-request-timeout`` option. + +Are "cold storages" supported? +------------------------------ + +Generally, restic does not natively support "cold storage" solutions. However, +experimental support for restoring from **S3 Glacier** and **S3 Glacier Deep +Archive** storage classes is available: + +.. code-block:: console + + $ restic backup -o s3.storage-class=GLACIER somedir/ + $ restic restore -o s3.enable-restores=1 -o s3.restore-days=7 -o s3.restore-timeout=1d latest + +**Notes:** + +- Expect restores to hang from 1 up to 42 hours depending on your storage + class, provider and luck. Restores from cold storages are known to be + time-consuming. You may need to adjust the `s3.restore-timeout` if a restore + operation takes more than 24 hours. +- Restic will prevent sending metadata files (such as config files, lock files + or tree blobs) to Glacier or Deep Archive. Standard class is used instead to + ensure normal and fast operations for most tasks. +- Currently, only the following commands are known to work or have worked: + + - `backup` + - `copy` + - `prune` + - `restore` diff --git a/internal/backend/azure/azure.go b/internal/backend/azure/azure.go index 27390ee13..569ec7464 100644 --- a/internal/backend/azure/azure.go +++ b/internal/backend/azure/azure.go @@ -475,3 +475,7 @@ func (be *Backend) Delete(ctx context.Context) error { // Close does nothing func (be *Backend) Close() error { return nil } + +// Warmup not implemented +func (be *Backend) Warmup(ctx context.Context, h backend.Handle) (bool, error) { return true, nil } +func (be *Backend) WarmupWait(ctx context.Context, h backend.Handle) error { return nil } diff --git a/internal/backend/b2/b2.go b/internal/backend/b2/b2.go index 3ef2bcbe3..19a503ebe 100644 --- a/internal/backend/b2/b2.go +++ b/internal/backend/b2/b2.go @@ -335,3 +335,7 @@ func (be *b2Backend) Delete(ctx context.Context) error { // Close does nothing func (be *b2Backend) Close() error { return nil } + +// Warmup not implemented +func (be *b2Backend) Warmup(ctx context.Context, h backend.Handle) (bool, error) { return true, nil } +func (be *b2Backend) WarmupWait(ctx context.Context, h backend.Handle) error { return nil } diff --git a/internal/backend/backend.go b/internal/backend/backend.go index f606e1123..b281e9e7b 100644 --- a/internal/backend/backend.go +++ b/internal/backend/backend.go @@ -75,6 +75,21 @@ type Backend interface { // Delete removes all data in the backend. Delete(ctx context.Context) error + + // Warmup ensures that the specified handles are ready for upcoming reads. + // This is particularly useful for transitioning files from cold to hot + // storage. + // + // The method is non-blocking and only schedules the warmup operation. The + // WarmupWait method may be used to wait warmup completion. + // + // Returns: + // - true if the handle is already warm and no action is required. + // - An error if warmup fails. + Warmup(ctx context.Context, h Handle) (bool, error) + + // WarmupWait waits until a Warmup operation completes on a given handle. + WarmupWait(ctx context.Context, h Handle) error } type Unwrapper interface { diff --git a/internal/backend/cache/backend.go b/internal/backend/cache/backend.go index 3754266ba..950688d62 100644 --- a/internal/backend/cache/backend.go +++ b/internal/backend/cache/backend.go @@ -258,3 +258,13 @@ func (b *Backend) List(ctx context.Context, t backend.FileType, fn func(f backen return nil } + +// Warmup delegates to wrapped backend. +func (b *Backend) Warmup(ctx context.Context, h backend.Handle) (bool, error) { + return b.Backend.Warmup(ctx, h) +} + +// WarmupWait delegates to wrapped backend. +func (b *Backend) WarmupWait(ctx context.Context, h backend.Handle) error { + return b.Backend.WarmupWait(ctx, h) +} diff --git a/internal/backend/dryrun/dry_backend.go b/internal/backend/dryrun/dry_backend.go index 8af0ce9ad..b693a6fd2 100644 --- a/internal/backend/dryrun/dry_backend.go +++ b/internal/backend/dryrun/dry_backend.go @@ -82,3 +82,7 @@ func (be *Backend) Load(ctx context.Context, h backend.Handle, length int, offse func (be *Backend) Stat(ctx context.Context, h backend.Handle) (backend.FileInfo, error) { return be.b.Stat(ctx, h) } + +// Warmup should not occur during dry-runs. +func (be *Backend) Warmup(ctx context.Context, h backend.Handle) (bool, error) { return true, nil } +func (be *Backend) WarmupWait(ctx context.Context, h backend.Handle) error { return nil } diff --git a/internal/backend/gs/gs.go b/internal/backend/gs/gs.go index ad50f194b..12b5ab064 100644 --- a/internal/backend/gs/gs.go +++ b/internal/backend/gs/gs.go @@ -363,3 +363,7 @@ func (be *Backend) Delete(ctx context.Context) error { // Close does nothing. func (be *Backend) Close() error { return nil } + +// Warmup not implemented +func (be *Backend) Warmup(ctx context.Context, h backend.Handle) (bool, error) { return true, nil } +func (be *Backend) WarmupWait(ctx context.Context, h backend.Handle) error { return nil } diff --git a/internal/backend/local/local.go b/internal/backend/local/local.go index ee87ae5d6..06de46f4b 100644 --- a/internal/backend/local/local.go +++ b/internal/backend/local/local.go @@ -371,3 +371,7 @@ func (b *Local) Close() error { // same function. return nil } + +// Warmup not implemented +func (be *Local) Warmup(ctx context.Context, h backend.Handle) (bool, error) { return true, nil } +func (be *Local) WarmupWait(ctx context.Context, h backend.Handle) error { return nil } diff --git a/internal/backend/mem/mem_backend.go b/internal/backend/mem/mem_backend.go index 981c0a182..f448ea615 100644 --- a/internal/backend/mem/mem_backend.go +++ b/internal/backend/mem/mem_backend.go @@ -249,3 +249,9 @@ func (be *MemoryBackend) Delete(ctx context.Context) error { func (be *MemoryBackend) Close() error { return nil } + +// Warmup not implemented +func (be *MemoryBackend) Warmup(ctx context.Context, h backend.Handle) (bool, error) { + return true, nil +} +func (be *MemoryBackend) WarmupWait(ctx context.Context, h backend.Handle) error { return nil } diff --git a/internal/backend/mock/backend.go b/internal/backend/mock/backend.go index a03198443..45fff4983 100644 --- a/internal/backend/mock/backend.go +++ b/internal/backend/mock/backend.go @@ -20,6 +20,8 @@ type Backend struct { ListFn func(ctx context.Context, t backend.FileType, fn func(backend.FileInfo) error) error RemoveFn func(ctx context.Context, h backend.Handle) error DeleteFn func(ctx context.Context) error + WarmupFn func(ctx context.Context, h backend.Handle) (bool, error) + WarmupWaitFn func(ctx context.Context, h backend.Handle) error ConnectionsFn func() uint HasherFn func() hash.Hash HasAtomicReplaceFn func() bool @@ -150,5 +152,21 @@ func (m *Backend) Delete(ctx context.Context) error { return m.DeleteFn(ctx) } +func (m *Backend) Warmup(ctx context.Context, h backend.Handle) (bool, error) { + if m.WarmupFn == nil { + return false, errors.New("not implemented") + } + + return m.WarmupFn(ctx, h) +} + +func (m *Backend) WarmupWait(ctx context.Context, h backend.Handle) error { + if m.WarmupWaitFn == nil { + return errors.New("not implemented") + } + + return m.WarmupWaitFn(ctx, h) +} + // Make sure that Backend implements the backend interface. var _ backend.Backend = &Backend{} diff --git a/internal/backend/rclone/backend.go b/internal/backend/rclone/backend.go index 8294aa8c4..af9ddffa4 100644 --- a/internal/backend/rclone/backend.go +++ b/internal/backend/rclone/backend.go @@ -340,3 +340,7 @@ func (be *Backend) Close() error { debug.Log("wait for rclone returned: %v", be.waitResult) return be.waitResult } + +// Warmup not implemented +func (be *Backend) Warmup(ctx context.Context, h backend.Handle) (bool, error) { return true, nil } +func (be *Backend) WarmupWait(ctx context.Context, h backend.Handle) error { return nil } diff --git a/internal/backend/rest/rest.go b/internal/backend/rest/rest.go index 7bdedff39..22f786e63 100644 --- a/internal/backend/rest/rest.go +++ b/internal/backend/rest/rest.go @@ -439,3 +439,7 @@ func (b *Backend) Close() error { func (b *Backend) Delete(ctx context.Context) error { return util.DefaultDelete(ctx, b) } + +// Warmup not implemented +func (be *Backend) Warmup(ctx context.Context, h backend.Handle) (bool, error) { return true, nil } +func (be *Backend) WarmupWait(ctx context.Context, h backend.Handle) error { return nil } diff --git a/internal/backend/retry/backend_retry.go b/internal/backend/retry/backend_retry.go index de8a520ec..b22f88f73 100644 --- a/internal/backend/retry/backend_retry.go +++ b/internal/backend/retry/backend_retry.go @@ -289,3 +289,11 @@ func (be *Backend) List(ctx context.Context, t backend.FileType, fn func(backend func (be *Backend) Unwrap() backend.Backend { return be.Backend } + +// Warmup delegates to wrapped backend +func (b *Backend) Warmup(ctx context.Context, h backend.Handle) (bool, error) { + return b.Backend.Warmup(ctx, h) +} +func (b *Backend) WarmupWait(ctx context.Context, h backend.Handle) error { + return b.Backend.WarmupWait(ctx, h) +} diff --git a/internal/backend/s3/config.go b/internal/backend/s3/config.go index be2a78ce5..3134800d1 100644 --- a/internal/backend/s3/config.go +++ b/internal/backend/s3/config.go @@ -5,6 +5,7 @@ import ( "os" "path" "strings" + "time" "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/errors" @@ -23,6 +24,11 @@ type Config struct { Layout string `option:"layout" help:"use this backend layout (default: auto-detect) (deprecated)"` StorageClass string `option:"storage-class" help:"set S3 storage class (STANDARD, STANDARD_IA, ONEZONE_IA, INTELLIGENT_TIERING or REDUCED_REDUNDANCY)"` + EnableRestore bool `option:"enable-restore" help:"restore objects from GLACIER or DEEP_ARCHIVE storage classes (default: false)"` + RestoreDays int `option:"restore-days" help:"lifetime in days of restored object (default: 7)"` + RestoreTimeout time.Duration `option:"restore-timeout" help:"maximum time to wait for objects transition (default: 1d)"` + RestoreTier string `option:"restore-tier" help:"Retrieval tier at which the restore will be processed. (Standard, Bulk or Expedited) (default: Standard)"` + Connections uint `option:"connections" help:"set a limit for the number of concurrent connections (default: 5)"` MaxRetries uint `option:"retries" help:"set the number of retries attempted"` Region string `option:"region" help:"set region"` @@ -34,8 +40,12 @@ type Config struct { // NewConfig returns a new Config with the default values filled in. func NewConfig() Config { return Config{ - Connections: 5, - ListObjectsV1: false, + Connections: 5, + ListObjectsV1: false, + EnableRestore: false, + RestoreDays: 7, + RestoreTimeout: 24 * time.Hour, + RestoreTier: "Standard", } } diff --git a/internal/backend/s3/s3.go b/internal/backend/s3/s3.go index 2176d289d..f0b82ee61 100644 --- a/internal/backend/s3/s3.go +++ b/internal/backend/s3/s3.go @@ -8,8 +8,11 @@ import ( "net/http" "os" "path" + "slices" "strings" + "time" + "github.com/cenkalti/backoff/v4" "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend/layout" "github.com/restic/restic/internal/backend/location" @@ -32,6 +35,8 @@ type Backend struct { // make sure that *Backend implements backend.Backend var _ backend.Backend = &Backend{} +var archiveClasses = []string{"GLACIER", "DEEP_ARCHIVE"} + func NewFactory() location.Factory { return location.NewHTTPBackendFactory("s3", ParseConfig, location.NoPassword, Create, Open) } @@ -271,9 +276,9 @@ func (be *Backend) Path() string { // For archive storage classes, only data files are stored using that class; metadata // must remain instantly accessible. func (be *Backend) useStorageClass(h backend.Handle) bool { - notArchiveClass := be.cfg.StorageClass != "GLACIER" && be.cfg.StorageClass != "DEEP_ARCHIVE" isDataFile := h.Type == backend.PackFile && !h.IsMetadata - return isDataFile || notArchiveClass + isArchiveClass := slices.Contains(archiveClasses, be.cfg.StorageClass) + return !isArchiveClass || isDataFile } // Save stores data in the backend at the handle. @@ -445,3 +450,99 @@ func (be *Backend) Delete(ctx context.Context) error { // Close does nothing func (be *Backend) Close() error { return nil } + +// Warmup optionally transitions files from cold to hot storage. +func (be *Backend) Warmup(ctx context.Context, h backend.Handle) (bool, error) { + if be.cfg.EnableRestore { + filename := be.Filename(h) + alreadyRestored, err := be.requestRestore(ctx, filename) + if err != nil || alreadyRestored { + return true, err + } + + debug.Log("s3 file needs restore: %s", filename) + return false, nil + } + + return true, nil +} + +// Warmup optionally ensures the handle is not on cold storage. +func (be *Backend) WarmupWait(ctx context.Context, h backend.Handle) error { + if be.cfg.EnableRestore { + filename := be.Filename(h) + err := be.waitForRestore(ctx, filename) + if err != nil { + return err + } + debug.Log("s3 file is restored: %s", filename) + } + + return nil +} + +// requestRestore sends a glacier restore request on a given file. +func (be *Backend) requestRestore(ctx context.Context, filename string) (alreadyRestored bool, err error) { + alreadyRestored = false + + opts := minio.RestoreRequest{} + if be.cfg.RestoreDays != 0 { + opts.SetDays(be.cfg.RestoreDays) + } + opts.SetGlacierJobParameters(minio.GlacierJobParameters{Tier: minio.TierType(be.cfg.RestoreTier)}) + + err = be.client.RestoreObject(ctx, be.cfg.Bucket, filename, "", opts) + if err != nil { + var e minio.ErrorResponse + if errors.As(err, &e) { + switch e.Code { + case "InvalidObjectState": + alreadyRestored = true + err = nil + case "RestoreAlreadyInProgress": + alreadyRestored = false + err = nil + } + } + } + + return +} + +// waitForRestore waits for a given file to be restored. +func (be *Backend) waitForRestore(ctx context.Context, filename string) error { + timeout := time.After(be.cfg.RestoreTimeout) + + for { + var objectInfo minio.ObjectInfo + + // Restore request can last many hours, therefore network may fail + // temporarily, and we don't need to die. + backoff_ := backoff.WithMaxRetries(backoff.NewExponentialBackOff(), 10) + backoff_ = backoff.WithContext(backoff_, ctx) + err := backoff.Retry( + func() (err error) { + objectInfo, err = be.client.StatObject(ctx, be.cfg.Bucket, filename, minio.StatObjectOptions{}) + return + }, + backoff_, + ) + if err != nil { + return err + } + + storageClass := objectInfo.Metadata.Get("X-Amz-Storage-Class") + if !slices.Contains(archiveClasses, storageClass) { + debug.Log("s3 file is restored: %s\n", filename) + return nil + } + + select { + case <-time.After(1 * time.Minute): + case <-timeout: + return errors.New("S3RestoreTimeout") + case <-ctx.Done(): + return ctx.Err() + } + } +} diff --git a/internal/backend/sftp/sftp.go b/internal/backend/sftp/sftp.go index 14819a2df..5ba967a26 100644 --- a/internal/backend/sftp/sftp.go +++ b/internal/backend/sftp/sftp.go @@ -588,3 +588,7 @@ func (r *SFTP) deleteRecursive(ctx context.Context, name string) error { func (r *SFTP) Delete(ctx context.Context) error { return r.deleteRecursive(ctx, r.p) } + +// Warmup not implemented +func (be *SFTP) Warmup(ctx context.Context, h backend.Handle) (bool, error) { return true, nil } +func (be *SFTP) WarmupWait(ctx context.Context, h backend.Handle) error { return nil } diff --git a/internal/backend/swift/swift.go b/internal/backend/swift/swift.go index dfa2055cd..460452fa8 100644 --- a/internal/backend/swift/swift.go +++ b/internal/backend/swift/swift.go @@ -269,3 +269,7 @@ func (be *beSwift) Delete(ctx context.Context) error { // Close does nothing func (be *beSwift) Close() error { return nil } + +// Warmup not implemented +func (be *beSwift) Warmup(ctx context.Context, h backend.Handle) (bool, error) { return true, nil } +func (be *beSwift) WarmupWait(ctx context.Context, h backend.Handle) error { return nil } diff --git a/internal/repository/repack.go b/internal/repository/repack.go index 8c9ca28bb..2e63a078b 100644 --- a/internal/repository/repack.go +++ b/internal/repository/repack.go @@ -50,6 +50,11 @@ func Repack(ctx context.Context, repo restic.Repository, dstRepo restic.Reposito func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Repository, packs restic.IDSet, keepBlobs repackBlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) { wg, wgCtx := errgroup.WithContext(ctx) + packsWarmer := NewPacksWarmer(repo) + if err := packsWarmer.StartWarmup(ctx, packs.List()); err != nil { + return nil, err + } + var keepMutex sync.Mutex downloadQueue := make(chan restic.PackBlobs) wg.Go(func() error { @@ -77,6 +82,9 @@ func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Reposito worker := func() error { for t := range downloadQueue { + if err := packsWarmer.Wait(wgCtx, t.PackID); err != nil { + return err + } err := repo.LoadBlobsFromPack(wgCtx, t.PackID, t.Blobs, func(blob restic.BlobHandle, buf []byte, err error) error { if err != nil { // a required blob couldn't be retrieved diff --git a/internal/repository/warmup.go b/internal/repository/warmup.go new file mode 100644 index 000000000..fbe698e2b --- /dev/null +++ b/internal/repository/warmup.go @@ -0,0 +1,109 @@ +package repository + +import ( + "context" + "errors" + "sync" + + "github.com/restic/restic/internal/backend" + "github.com/restic/restic/internal/restic" +) + +type PacksWarmer struct { + repo restic.Repository + packs restic.IDSet + packsResult map[restic.ID]error + mu sync.Mutex +} + +// WamupPack requests the backend to warmup the specified pack file. +func (repo *Repository) WarmupPack(ctx context.Context, pack restic.ID) (bool, error) { + return repo.be.Warmup(ctx, backend.Handle{Type: restic.PackFile, Name: pack.String()}) +} + +// WamupPackWait requests the backend to wait for the specified pack file to be warm. +func (repo *Repository) WarmupPackWait(ctx context.Context, pack restic.ID) error { + return repo.be.WarmupWait(ctx, backend.Handle{Type: restic.PackFile, Name: pack.String()}) +} + +// NewPacksWarmer creates a new PacksWarmer instance. +func NewPacksWarmer(repo restic.Repository) *PacksWarmer { + return &PacksWarmer{ + repo: repo, + packs: restic.NewIDSet(), + packsResult: make(map[restic.ID]error), + } +} + +// StartWarmup warms up the specified packs +func (packsWarmer *PacksWarmer) StartWarmup(ctx context.Context, packs restic.IDs) error { + for _, packID := range packs { + if !packsWarmer.registerPack(packID) { + continue + } + + isWarm, err := packsWarmer.repo.WarmupPack(ctx, packID) + if err != nil { + packsWarmer.setResult(packID, err) + return err + } + if isWarm { + packsWarmer.setResult(packID, err) + } + } + return nil +} + +// StartWarmup waits for the specified packs to be warm +func (packsWarmer *PacksWarmer) Wait(ctx context.Context, packID restic.ID) error { + packErr, ok := packsWarmer.getResult(packID) + if ok { + return packErr + } + + if !packsWarmer.packs.Has(packID) { + return errors.New("PackNotWarmingUp") + } + + err := packsWarmer.repo.WarmupPackWait(ctx, packID) + packsWarmer.setResult(packID, err) + + return err +} + +// registerPack saves a new pack as "being warming up". It returns true if it +// was already seen before. +func (packsWarmer *PacksWarmer) registerPack(packID restic.ID) bool { + packsWarmer.mu.Lock() + defer packsWarmer.mu.Unlock() + + if packsWarmer.packs.Has(packID) { + return false + } + packsWarmer.packs.Insert(packID) + return true +} + +// getResult gets the result of a warmup. +// Returns: +// - the error returned by the warmup operation +// - true if the warmup is in a terminal state +func (packsWarmer *PacksWarmer) getResult(packID restic.ID) (error, bool) { + packsWarmer.mu.Lock() + defer packsWarmer.mu.Unlock() + + packResult, ok := packsWarmer.packsResult[packID] + if ok { + return packResult, true + } + + return nil, false +} + +// setResult sets the result of a warmup. +func (packsWarmer *PacksWarmer) setResult(packID restic.ID, err error) { + packsWarmer.mu.Lock() + defer packsWarmer.mu.Unlock() + + packsWarmer.packsResult[packID] = err +} diff --git a/internal/repository/warmup_test.go b/internal/repository/warmup_test.go new file mode 100644 index 000000000..f0b8c82dd --- /dev/null +++ b/internal/repository/warmup_test.go @@ -0,0 +1,65 @@ +package repository + +import ( + "context" + "testing" + + "github.com/restic/restic/internal/backend" + "github.com/restic/restic/internal/backend/mock" + "github.com/restic/restic/internal/restic" +) + +func TestWarmupRepository(t *testing.T) { + warmupCalls := []backend.Handle{} + warmupWaitCalls := []backend.Handle{} + isWarm := true + + be := mock.NewBackend() + be.WarmupFn = func(ctx context.Context, h backend.Handle) (bool, error) { + warmupCalls = append(warmupCalls, h) + return isWarm, nil + } + be.WarmupWaitFn = func(ctx context.Context, h backend.Handle) error { + warmupWaitCalls = append(warmupWaitCalls, h) + return nil + } + + repo, _ := New(be, Options{}) + packsWarmer := NewPacksWarmer(repo) + + id1, _ := restic.ParseID("1111111111111111111111111111111111111111111111111111111111111111") + id2, _ := restic.ParseID("2222222222222222222222222222222222222222222222222222222222222222") + id3, _ := restic.ParseID("3333333333333333333333333333333333333333333333333333333333333333") + err := packsWarmer.StartWarmup(context.TODO(), restic.IDs{id1, id2}) + if err != nil { + t.Fatalf("error when starting warmup: %v", err) + } + if len(warmupCalls) != 2 { + t.Fatalf("expected 2 calls to warmup, got %d", len(warmupCalls)) + } + + err = packsWarmer.Wait(context.TODO(), id1) + if err != nil { + t.Fatalf("error when waiting for warmup: %v", err) + } + if len(warmupWaitCalls) != 0 { + t.Fatal("WarmupWait was called on a warm file") + } + + isWarm = false + err = packsWarmer.StartWarmup(context.TODO(), restic.IDs{id3}) + if err != nil { + t.Fatalf("error when adding element to warmup: %v", err) + } + if len(warmupCalls) != 3 { + t.Fatalf("expected 3 calls to warmup, got %d", len(warmupCalls)) + } + err = packsWarmer.Wait(context.TODO(), id3) + if err != nil { + t.Fatalf("error when waiting for warmup: %v", err) + } + if len(warmupWaitCalls) != 1 { + t.Fatalf("expected one call to WarmupWait, got %d", len(warmupWaitCalls)) + } + +} diff --git a/internal/restic/repository.go b/internal/restic/repository.go index b18b036a7..efcfc91c9 100644 --- a/internal/restic/repository.go +++ b/internal/restic/repository.go @@ -60,6 +60,11 @@ type Repository interface { SaveUnpacked(ctx context.Context, t FileType, buf []byte) (ID, error) // RemoveUnpacked removes a file from the repository. This will eventually be restricted to deleting only snapshots. RemoveUnpacked(ctx context.Context, t FileType, id ID) error + + // WamupPack requests the backend to warmup the specified pack file. + WarmupPack(ctx context.Context, pack ID) (bool, error) + // WamupPackWait requests the backend to wait for the specified pack file to be warm. + WarmupPackWait(ctx context.Context, pack ID) error } type FileType = backend.FileType diff --git a/internal/restorer/filerestorer.go b/internal/restorer/filerestorer.go index 31234b960..2bcdb7f7c 100644 --- a/internal/restorer/filerestorer.go +++ b/internal/restorer/filerestorer.go @@ -41,12 +41,17 @@ type packInfo struct { } type blobsLoaderFn func(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error +type warmPackFn func(context.Context, restic.IDs) error +type warmPackWaitFn func(context.Context, restic.ID) error // fileRestorer restores set of files type fileRestorer struct { idx func(restic.BlobType, restic.ID) []restic.PackedBlob blobsLoader blobsLoaderFn + warmPacks warmPackFn + warmPackWait warmPackWaitFn + workerCount int filesWriter *filesWriter zeroChunk restic.ID @@ -66,6 +71,8 @@ func newFileRestorer(dst string, connections uint, sparse bool, allowRecursiveDelete bool, + warmPacks warmPackFn, + warmPackWait warmPackWaitFn, progress *restore.Progress) *fileRestorer { // as packs are streamed the concurrency is limited by IO @@ -74,6 +81,8 @@ func newFileRestorer(dst string, return &fileRestorer{ idx: idx, blobsLoader: blobsLoader, + warmPacks: warmPacks, + warmPackWait: warmPackWait, filesWriter: newFilesWriter(workerCount, allowRecursiveDelete), zeroChunk: repository.ZeroChunk(), sparse: sparse, @@ -192,11 +201,18 @@ func (r *fileRestorer) restoreFiles(ctx context.Context) error { // drop no longer necessary file list r.files = nil + if err := r.warmPacks(ctx, packOrder); err != nil { + return err + } + wg, ctx := errgroup.WithContext(ctx) downloadCh := make(chan *packInfo) worker := func() error { for pack := range downloadCh { + if err := r.warmPackWait(ctx, pack.id); err != nil { + return err + } if err := r.downloadPack(ctx, pack); err != nil { return err } diff --git a/internal/restorer/filerestorer_test.go b/internal/restorer/filerestorer_test.go index f594760e4..5d80f6673 100644 --- a/internal/restorer/filerestorer_test.go +++ b/internal/restorer/filerestorer_test.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "sort" + "sync" "testing" "github.com/restic/restic/internal/errors" @@ -31,6 +32,9 @@ type TestRepo struct { files []*fileInfo filesPathToContent map[string]string + warmupPacks restic.IDSet + warmupPacksWait restic.IDSet + // loader blobsLoaderFn } @@ -44,6 +48,24 @@ func (i *TestRepo) fileContent(file *fileInfo) string { return i.filesPathToContent[file.location] } +var warmupMu sync.Mutex + +func (i *TestRepo) WarmupPacks(ctx context.Context, packs restic.IDs) error { + warmupMu.Lock() + defer warmupMu.Unlock() + + i.warmupPacks.Merge(restic.NewIDSet(packs...)) + return nil +} + +func (i *TestRepo) WarmupPacksWait(ctx context.Context, pack restic.ID) error { + warmupMu.Lock() + defer warmupMu.Unlock() + + i.warmupPacksWait.Insert(pack) + return nil +} + func newTestRepo(content []TestFile) *TestRepo { type Pack struct { name string @@ -111,6 +133,8 @@ func newTestRepo(content []TestFile) *TestRepo { blobs: blobs, files: files, filesPathToContent: filesPathToContent, + warmupPacks: restic.NewIDSet(), + warmupPacksWait: restic.NewIDSet(), } repo.loader = func(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { blobs = append([]restic.Blob{}, blobs...) @@ -144,7 +168,7 @@ func restoreAndVerify(t *testing.T, tempdir string, content []TestFile, files ma t.Helper() repo := newTestRepo(content) - r := newFileRestorer(tempdir, repo.loader, repo.Lookup, 2, sparse, false, nil) + r := newFileRestorer(tempdir, repo.loader, repo.Lookup, 2, sparse, false, repo.WarmupPacks, repo.WarmupPacksWait, nil) if files == nil { r.files = repo.files @@ -177,6 +201,13 @@ func verifyRestore(t *testing.T, r *fileRestorer, repo *TestRepo) { t.Errorf("file %v has wrong content: want %q, got %q", file.location, content, data) } } + + if len(repo.warmupPacks) == 0 { + t.Errorf("warmup did not occur") + } + if len(repo.warmupPacksWait) == 0 { + t.Errorf("warmup wait did not occur") + } } func TestFileRestorerBasic(t *testing.T) { @@ -285,7 +316,7 @@ func TestErrorRestoreFiles(t *testing.T) { return loadError } - r := newFileRestorer(tempdir, repo.loader, repo.Lookup, 2, false, false, nil) + r := newFileRestorer(tempdir, repo.loader, repo.Lookup, 2, false, false, repo.WarmupPacks, repo.WarmupPacksWait, nil) r.files = repo.files err := r.restoreFiles(context.TODO()) @@ -326,7 +357,7 @@ func TestFatalDownloadError(t *testing.T) { }) } - r := newFileRestorer(tempdir, repo.loader, repo.Lookup, 2, false, false, nil) + r := newFileRestorer(tempdir, repo.loader, repo.Lookup, 2, false, false, repo.WarmupPacks, repo.WarmupPacksWait, nil) r.files = repo.files var errors []string diff --git a/internal/restorer/restorer.go b/internal/restorer/restorer.go index 14a8edeac..37f177f10 100644 --- a/internal/restorer/restorer.go +++ b/internal/restorer/restorer.go @@ -11,6 +11,7 @@ import ( "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/fs" + "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/ui/progress" restoreui "github.com/restic/restic/internal/ui/restore" @@ -353,8 +354,9 @@ func (res *Restorer) RestoreTo(ctx context.Context, dst string) (uint64, error) } idx := NewHardlinkIndex[string]() + packsWarmer := repository.NewPacksWarmer(res.repo) filerestorer := newFileRestorer(dst, res.repo.LoadBlobsFromPack, res.repo.LookupBlob, - res.repo.Connections(), res.opts.Sparse, res.opts.Delete, res.opts.Progress) + res.repo.Connections(), res.opts.Sparse, res.opts.Delete, packsWarmer.StartWarmup, packsWarmer.Wait, res.opts.Progress) filerestorer.Error = res.Error debug.Log("first pass for %q", dst)