feat(backends/s3): add warmup support before repacks and restores
This commit introduces basic support for transitioning pack files stored in cold storage to hot storage on S3 and S3-compatible providers. To prevent unexpected behavior for existing users, the feature is gated behind new flags: - `s3.enable-restores`: opt-in flag (defaults to false) - `s3.restore-days`: number of days for the restored objects to remain in hot storage (defaults to `7`) - `s3.restore-timeout`: maximum time to wait for a single restoration (default to `1 day`) - `s3.restore-tier`: retrieval tier at which the restore will be processed. (default to `Standard`) As restoration times can be lengthy, this implementation preemptively restores selected packs to prevent incessant restore-delays during downloads. This is slightly sub-optimal as we could process packs out-of-order (as soon as they're transitioned), but this would really add too much complexity for a marginal gain in speed. To maintain simplicity and prevent resources exhautions with lots of packs, no new concurrency mechanisms or goroutines were added. This just hooks gracefully into the existing routines. **Limitations:** - Tests against the backend were not written due to the lack of cold storage class support in MinIO. Testing was done manually on Scaleway's S3-compatible object storage. If necessary, we could explore testing with LocalStack or mocks, though this requires further discussion. - Currently, this feature only warms up before restores and repacks (prune/copy), as those are the two main use-cases I came across. Support for other commands may be added in future iterations, as long as affected packs can be calculated in advance. - There is no explicit user notification for ongoing pack restorations. While I think it is not necessary because of the opt-in flag, showing some notice may improve usability (but would probably require major refactoring in the progress bar which I didn't want to start). Another possibility would be to add a flag to send restores requests and fail early. See https://github.com/restic/restic/issues/3202
This commit is contained in:
parent
d7d9af4c9f
commit
aeeecf2c97
25 changed files with 484 additions and 8 deletions
8
changelog/unreleased/issue-3202
Normal file
8
changelog/unreleased/issue-3202
Normal file
|
@ -0,0 +1,8 @@
|
|||
Enhancement: Add warmup support on S3 backend before repacks and restores
|
||||
|
||||
Introduce S3 backend options for transitioning pack files from cold to hot
|
||||
storage on S3 and S3-compatible providers. Note: only works before repacks
|
||||
(prune/copy) and restore for now.
|
||||
|
||||
https://github.com/restic/restic/issues/3202
|
||||
https://github.com/restic/restic/issues/2504
|
28
doc/faq.rst
28
doc/faq.rst
|
@ -242,3 +242,31 @@ collect a list of all files, causing the following error:
|
|||
List(data) returned error, retrying after 1s: [...]: request timeout
|
||||
|
||||
In this case you can increase the timeout using the ``--stuck-request-timeout`` option.
|
||||
|
||||
Are "cold storages" supported?
|
||||
------------------------------
|
||||
|
||||
Generally, restic does not natively support "cold storage" solutions. However,
|
||||
experimental support for restoring from **S3 Glacier** and **S3 Glacier Deep
|
||||
Archive** storage classes is available:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
$ restic backup -o s3.storage-class=GLACIER somedir/
|
||||
$ restic restore -o s3.enable-restores=1 -o s3.restore-days=7 -o s3.restore-timeout=1d latest
|
||||
|
||||
**Notes:**
|
||||
|
||||
- Expect restores to hang from 1 up to 42 hours depending on your storage
|
||||
class, provider and luck. Restores from cold storages are known to be
|
||||
time-consuming. You may need to adjust the `s3.restore-timeout` if a restore
|
||||
operation takes more than 24 hours.
|
||||
- Restic will prevent sending metadata files (such as config files, lock files
|
||||
or tree blobs) to Glacier or Deep Archive. Standard class is used instead to
|
||||
ensure normal and fast operations for most tasks.
|
||||
- Currently, only the following commands are known to work or have worked:
|
||||
|
||||
- `backup`
|
||||
- `copy`
|
||||
- `prune`
|
||||
- `restore`
|
||||
|
|
|
@ -475,3 +475,7 @@ func (be *Backend) Delete(ctx context.Context) error {
|
|||
|
||||
// Close does nothing
|
||||
func (be *Backend) Close() error { return nil }
|
||||
|
||||
// Warmup not implemented
|
||||
func (be *Backend) Warmup(ctx context.Context, h backend.Handle) (bool, error) { return true, nil }
|
||||
func (be *Backend) WarmupWait(ctx context.Context, h backend.Handle) error { return nil }
|
||||
|
|
|
@ -335,3 +335,7 @@ func (be *b2Backend) Delete(ctx context.Context) error {
|
|||
|
||||
// Close does nothing
|
||||
func (be *b2Backend) Close() error { return nil }
|
||||
|
||||
// Warmup not implemented
|
||||
func (be *b2Backend) Warmup(ctx context.Context, h backend.Handle) (bool, error) { return true, nil }
|
||||
func (be *b2Backend) WarmupWait(ctx context.Context, h backend.Handle) error { return nil }
|
||||
|
|
|
@ -75,6 +75,21 @@ type Backend interface {
|
|||
|
||||
// Delete removes all data in the backend.
|
||||
Delete(ctx context.Context) error
|
||||
|
||||
// Warmup ensures that the specified handles are ready for upcoming reads.
|
||||
// This is particularly useful for transitioning files from cold to hot
|
||||
// storage.
|
||||
//
|
||||
// The method is non-blocking and only schedules the warmup operation. The
|
||||
// WarmupWait method may be used to wait warmup completion.
|
||||
//
|
||||
// Returns:
|
||||
// - true if the handle is already warm and no action is required.
|
||||
// - An error if warmup fails.
|
||||
Warmup(ctx context.Context, h Handle) (bool, error)
|
||||
|
||||
// WarmupWait waits until a Warmup operation completes on a given handle.
|
||||
WarmupWait(ctx context.Context, h Handle) error
|
||||
}
|
||||
|
||||
type Unwrapper interface {
|
||||
|
|
10
internal/backend/cache/backend.go
vendored
10
internal/backend/cache/backend.go
vendored
|
@ -258,3 +258,13 @@ func (b *Backend) List(ctx context.Context, t backend.FileType, fn func(f backen
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Warmup delegates to wrapped backend.
|
||||
func (b *Backend) Warmup(ctx context.Context, h backend.Handle) (bool, error) {
|
||||
return b.Backend.Warmup(ctx, h)
|
||||
}
|
||||
|
||||
// WarmupWait delegates to wrapped backend.
|
||||
func (b *Backend) WarmupWait(ctx context.Context, h backend.Handle) error {
|
||||
return b.Backend.WarmupWait(ctx, h)
|
||||
}
|
||||
|
|
|
@ -82,3 +82,7 @@ func (be *Backend) Load(ctx context.Context, h backend.Handle, length int, offse
|
|||
func (be *Backend) Stat(ctx context.Context, h backend.Handle) (backend.FileInfo, error) {
|
||||
return be.b.Stat(ctx, h)
|
||||
}
|
||||
|
||||
// Warmup should not occur during dry-runs.
|
||||
func (be *Backend) Warmup(ctx context.Context, h backend.Handle) (bool, error) { return true, nil }
|
||||
func (be *Backend) WarmupWait(ctx context.Context, h backend.Handle) error { return nil }
|
||||
|
|
|
@ -363,3 +363,7 @@ func (be *Backend) Delete(ctx context.Context) error {
|
|||
|
||||
// Close does nothing.
|
||||
func (be *Backend) Close() error { return nil }
|
||||
|
||||
// Warmup not implemented
|
||||
func (be *Backend) Warmup(ctx context.Context, h backend.Handle) (bool, error) { return true, nil }
|
||||
func (be *Backend) WarmupWait(ctx context.Context, h backend.Handle) error { return nil }
|
||||
|
|
|
@ -371,3 +371,7 @@ func (b *Local) Close() error {
|
|||
// same function.
|
||||
return nil
|
||||
}
|
||||
|
||||
// Warmup not implemented
|
||||
func (be *Local) Warmup(ctx context.Context, h backend.Handle) (bool, error) { return true, nil }
|
||||
func (be *Local) WarmupWait(ctx context.Context, h backend.Handle) error { return nil }
|
||||
|
|
|
@ -249,3 +249,9 @@ func (be *MemoryBackend) Delete(ctx context.Context) error {
|
|||
func (be *MemoryBackend) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Warmup not implemented
|
||||
func (be *MemoryBackend) Warmup(ctx context.Context, h backend.Handle) (bool, error) {
|
||||
return true, nil
|
||||
}
|
||||
func (be *MemoryBackend) WarmupWait(ctx context.Context, h backend.Handle) error { return nil }
|
||||
|
|
|
@ -20,6 +20,8 @@ type Backend struct {
|
|||
ListFn func(ctx context.Context, t backend.FileType, fn func(backend.FileInfo) error) error
|
||||
RemoveFn func(ctx context.Context, h backend.Handle) error
|
||||
DeleteFn func(ctx context.Context) error
|
||||
WarmupFn func(ctx context.Context, h backend.Handle) (bool, error)
|
||||
WarmupWaitFn func(ctx context.Context, h backend.Handle) error
|
||||
ConnectionsFn func() uint
|
||||
HasherFn func() hash.Hash
|
||||
HasAtomicReplaceFn func() bool
|
||||
|
@ -150,5 +152,21 @@ func (m *Backend) Delete(ctx context.Context) error {
|
|||
return m.DeleteFn(ctx)
|
||||
}
|
||||
|
||||
func (m *Backend) Warmup(ctx context.Context, h backend.Handle) (bool, error) {
|
||||
if m.WarmupFn == nil {
|
||||
return false, errors.New("not implemented")
|
||||
}
|
||||
|
||||
return m.WarmupFn(ctx, h)
|
||||
}
|
||||
|
||||
func (m *Backend) WarmupWait(ctx context.Context, h backend.Handle) error {
|
||||
if m.WarmupWaitFn == nil {
|
||||
return errors.New("not implemented")
|
||||
}
|
||||
|
||||
return m.WarmupWaitFn(ctx, h)
|
||||
}
|
||||
|
||||
// Make sure that Backend implements the backend interface.
|
||||
var _ backend.Backend = &Backend{}
|
||||
|
|
|
@ -340,3 +340,7 @@ func (be *Backend) Close() error {
|
|||
debug.Log("wait for rclone returned: %v", be.waitResult)
|
||||
return be.waitResult
|
||||
}
|
||||
|
||||
// Warmup not implemented
|
||||
func (be *Backend) Warmup(ctx context.Context, h backend.Handle) (bool, error) { return true, nil }
|
||||
func (be *Backend) WarmupWait(ctx context.Context, h backend.Handle) error { return nil }
|
||||
|
|
|
@ -439,3 +439,7 @@ func (b *Backend) Close() error {
|
|||
func (b *Backend) Delete(ctx context.Context) error {
|
||||
return util.DefaultDelete(ctx, b)
|
||||
}
|
||||
|
||||
// Warmup not implemented
|
||||
func (be *Backend) Warmup(ctx context.Context, h backend.Handle) (bool, error) { return true, nil }
|
||||
func (be *Backend) WarmupWait(ctx context.Context, h backend.Handle) error { return nil }
|
||||
|
|
|
@ -289,3 +289,11 @@ func (be *Backend) List(ctx context.Context, t backend.FileType, fn func(backend
|
|||
func (be *Backend) Unwrap() backend.Backend {
|
||||
return be.Backend
|
||||
}
|
||||
|
||||
// Warmup delegates to wrapped backend
|
||||
func (b *Backend) Warmup(ctx context.Context, h backend.Handle) (bool, error) {
|
||||
return b.Backend.Warmup(ctx, h)
|
||||
}
|
||||
func (b *Backend) WarmupWait(ctx context.Context, h backend.Handle) error {
|
||||
return b.Backend.WarmupWait(ctx, h)
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
|
@ -23,6 +24,11 @@ type Config struct {
|
|||
Layout string `option:"layout" help:"use this backend layout (default: auto-detect) (deprecated)"`
|
||||
StorageClass string `option:"storage-class" help:"set S3 storage class (STANDARD, STANDARD_IA, ONEZONE_IA, INTELLIGENT_TIERING or REDUCED_REDUNDANCY)"`
|
||||
|
||||
EnableRestore bool `option:"enable-restore" help:"restore objects from GLACIER or DEEP_ARCHIVE storage classes (default: false)"`
|
||||
RestoreDays int `option:"restore-days" help:"lifetime in days of restored object (default: 7)"`
|
||||
RestoreTimeout time.Duration `option:"restore-timeout" help:"maximum time to wait for objects transition (default: 1d)"`
|
||||
RestoreTier string `option:"restore-tier" help:"Retrieval tier at which the restore will be processed. (Standard, Bulk or Expedited) (default: Standard)"`
|
||||
|
||||
Connections uint `option:"connections" help:"set a limit for the number of concurrent connections (default: 5)"`
|
||||
MaxRetries uint `option:"retries" help:"set the number of retries attempted"`
|
||||
Region string `option:"region" help:"set region"`
|
||||
|
@ -34,8 +40,12 @@ type Config struct {
|
|||
// NewConfig returns a new Config with the default values filled in.
|
||||
func NewConfig() Config {
|
||||
return Config{
|
||||
Connections: 5,
|
||||
ListObjectsV1: false,
|
||||
Connections: 5,
|
||||
ListObjectsV1: false,
|
||||
EnableRestore: false,
|
||||
RestoreDays: 7,
|
||||
RestoreTimeout: 24 * time.Hour,
|
||||
RestoreTier: "Standard",
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -8,8 +8,11 @@ import (
|
|||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"slices"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/backend/layout"
|
||||
"github.com/restic/restic/internal/backend/location"
|
||||
|
@ -32,6 +35,8 @@ type Backend struct {
|
|||
// make sure that *Backend implements backend.Backend
|
||||
var _ backend.Backend = &Backend{}
|
||||
|
||||
var archiveClasses = []string{"GLACIER", "DEEP_ARCHIVE"}
|
||||
|
||||
func NewFactory() location.Factory {
|
||||
return location.NewHTTPBackendFactory("s3", ParseConfig, location.NoPassword, Create, Open)
|
||||
}
|
||||
|
@ -271,9 +276,9 @@ func (be *Backend) Path() string {
|
|||
// For archive storage classes, only data files are stored using that class; metadata
|
||||
// must remain instantly accessible.
|
||||
func (be *Backend) useStorageClass(h backend.Handle) bool {
|
||||
notArchiveClass := be.cfg.StorageClass != "GLACIER" && be.cfg.StorageClass != "DEEP_ARCHIVE"
|
||||
isDataFile := h.Type == backend.PackFile && !h.IsMetadata
|
||||
return isDataFile || notArchiveClass
|
||||
isArchiveClass := slices.Contains(archiveClasses, be.cfg.StorageClass)
|
||||
return !isArchiveClass || isDataFile
|
||||
}
|
||||
|
||||
// Save stores data in the backend at the handle.
|
||||
|
@ -445,3 +450,99 @@ func (be *Backend) Delete(ctx context.Context) error {
|
|||
|
||||
// Close does nothing
|
||||
func (be *Backend) Close() error { return nil }
|
||||
|
||||
// Warmup optionally transitions files from cold to hot storage.
|
||||
func (be *Backend) Warmup(ctx context.Context, h backend.Handle) (bool, error) {
|
||||
if be.cfg.EnableRestore {
|
||||
filename := be.Filename(h)
|
||||
alreadyRestored, err := be.requestRestore(ctx, filename)
|
||||
if err != nil || alreadyRestored {
|
||||
return true, err
|
||||
}
|
||||
|
||||
debug.Log("s3 file needs restore: %s", filename)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Warmup optionally ensures the handle is not on cold storage.
|
||||
func (be *Backend) WarmupWait(ctx context.Context, h backend.Handle) error {
|
||||
if be.cfg.EnableRestore {
|
||||
filename := be.Filename(h)
|
||||
err := be.waitForRestore(ctx, filename)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
debug.Log("s3 file is restored: %s", filename)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// requestRestore sends a glacier restore request on a given file.
|
||||
func (be *Backend) requestRestore(ctx context.Context, filename string) (alreadyRestored bool, err error) {
|
||||
alreadyRestored = false
|
||||
|
||||
opts := minio.RestoreRequest{}
|
||||
if be.cfg.RestoreDays != 0 {
|
||||
opts.SetDays(be.cfg.RestoreDays)
|
||||
}
|
||||
opts.SetGlacierJobParameters(minio.GlacierJobParameters{Tier: minio.TierType(be.cfg.RestoreTier)})
|
||||
|
||||
err = be.client.RestoreObject(ctx, be.cfg.Bucket, filename, "", opts)
|
||||
if err != nil {
|
||||
var e minio.ErrorResponse
|
||||
if errors.As(err, &e) {
|
||||
switch e.Code {
|
||||
case "InvalidObjectState":
|
||||
alreadyRestored = true
|
||||
err = nil
|
||||
case "RestoreAlreadyInProgress":
|
||||
alreadyRestored = false
|
||||
err = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// waitForRestore waits for a given file to be restored.
|
||||
func (be *Backend) waitForRestore(ctx context.Context, filename string) error {
|
||||
timeout := time.After(be.cfg.RestoreTimeout)
|
||||
|
||||
for {
|
||||
var objectInfo minio.ObjectInfo
|
||||
|
||||
// Restore request can last many hours, therefore network may fail
|
||||
// temporarily, and we don't need to die.
|
||||
backoff_ := backoff.WithMaxRetries(backoff.NewExponentialBackOff(), 10)
|
||||
backoff_ = backoff.WithContext(backoff_, ctx)
|
||||
err := backoff.Retry(
|
||||
func() (err error) {
|
||||
objectInfo, err = be.client.StatObject(ctx, be.cfg.Bucket, filename, minio.StatObjectOptions{})
|
||||
return
|
||||
},
|
||||
backoff_,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
storageClass := objectInfo.Metadata.Get("X-Amz-Storage-Class")
|
||||
if !slices.Contains(archiveClasses, storageClass) {
|
||||
debug.Log("s3 file is restored: %s\n", filename)
|
||||
return nil
|
||||
}
|
||||
|
||||
select {
|
||||
case <-time.After(1 * time.Minute):
|
||||
case <-timeout:
|
||||
return errors.New("S3RestoreTimeout")
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -588,3 +588,7 @@ func (r *SFTP) deleteRecursive(ctx context.Context, name string) error {
|
|||
func (r *SFTP) Delete(ctx context.Context) error {
|
||||
return r.deleteRecursive(ctx, r.p)
|
||||
}
|
||||
|
||||
// Warmup not implemented
|
||||
func (be *SFTP) Warmup(ctx context.Context, h backend.Handle) (bool, error) { return true, nil }
|
||||
func (be *SFTP) WarmupWait(ctx context.Context, h backend.Handle) error { return nil }
|
||||
|
|
|
@ -269,3 +269,7 @@ func (be *beSwift) Delete(ctx context.Context) error {
|
|||
|
||||
// Close does nothing
|
||||
func (be *beSwift) Close() error { return nil }
|
||||
|
||||
// Warmup not implemented
|
||||
func (be *beSwift) Warmup(ctx context.Context, h backend.Handle) (bool, error) { return true, nil }
|
||||
func (be *beSwift) WarmupWait(ctx context.Context, h backend.Handle) error { return nil }
|
||||
|
|
|
@ -50,6 +50,11 @@ func Repack(ctx context.Context, repo restic.Repository, dstRepo restic.Reposito
|
|||
func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Repository, packs restic.IDSet, keepBlobs repackBlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) {
|
||||
wg, wgCtx := errgroup.WithContext(ctx)
|
||||
|
||||
packsWarmer := NewPacksWarmer(repo)
|
||||
if err := packsWarmer.StartWarmup(ctx, packs.List()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var keepMutex sync.Mutex
|
||||
downloadQueue := make(chan restic.PackBlobs)
|
||||
wg.Go(func() error {
|
||||
|
@ -77,6 +82,9 @@ func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Reposito
|
|||
|
||||
worker := func() error {
|
||||
for t := range downloadQueue {
|
||||
if err := packsWarmer.Wait(wgCtx, t.PackID); err != nil {
|
||||
return err
|
||||
}
|
||||
err := repo.LoadBlobsFromPack(wgCtx, t.PackID, t.Blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
|
||||
if err != nil {
|
||||
// a required blob couldn't be retrieved
|
||||
|
|
109
internal/repository/warmup.go
Normal file
109
internal/repository/warmup.go
Normal file
|
@ -0,0 +1,109 @@
|
|||
package repository
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
)
|
||||
|
||||
type PacksWarmer struct {
|
||||
repo restic.Repository
|
||||
packs restic.IDSet
|
||||
packsResult map[restic.ID]error
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// WamupPack requests the backend to warmup the specified pack file.
|
||||
func (repo *Repository) WarmupPack(ctx context.Context, pack restic.ID) (bool, error) {
|
||||
return repo.be.Warmup(ctx, backend.Handle{Type: restic.PackFile, Name: pack.String()})
|
||||
}
|
||||
|
||||
// WamupPackWait requests the backend to wait for the specified pack file to be warm.
|
||||
func (repo *Repository) WarmupPackWait(ctx context.Context, pack restic.ID) error {
|
||||
return repo.be.WarmupWait(ctx, backend.Handle{Type: restic.PackFile, Name: pack.String()})
|
||||
}
|
||||
|
||||
// NewPacksWarmer creates a new PacksWarmer instance.
|
||||
func NewPacksWarmer(repo restic.Repository) *PacksWarmer {
|
||||
return &PacksWarmer{
|
||||
repo: repo,
|
||||
packs: restic.NewIDSet(),
|
||||
packsResult: make(map[restic.ID]error),
|
||||
}
|
||||
}
|
||||
|
||||
// StartWarmup warms up the specified packs
|
||||
func (packsWarmer *PacksWarmer) StartWarmup(ctx context.Context, packs restic.IDs) error {
|
||||
for _, packID := range packs {
|
||||
if !packsWarmer.registerPack(packID) {
|
||||
continue
|
||||
}
|
||||
|
||||
isWarm, err := packsWarmer.repo.WarmupPack(ctx, packID)
|
||||
if err != nil {
|
||||
packsWarmer.setResult(packID, err)
|
||||
return err
|
||||
}
|
||||
if isWarm {
|
||||
packsWarmer.setResult(packID, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// StartWarmup waits for the specified packs to be warm
|
||||
func (packsWarmer *PacksWarmer) Wait(ctx context.Context, packID restic.ID) error {
|
||||
packErr, ok := packsWarmer.getResult(packID)
|
||||
if ok {
|
||||
return packErr
|
||||
}
|
||||
|
||||
if !packsWarmer.packs.Has(packID) {
|
||||
return errors.New("PackNotWarmingUp")
|
||||
}
|
||||
|
||||
err := packsWarmer.repo.WarmupPackWait(ctx, packID)
|
||||
packsWarmer.setResult(packID, err)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// registerPack saves a new pack as "being warming up". It returns true if it
|
||||
// was already seen before.
|
||||
func (packsWarmer *PacksWarmer) registerPack(packID restic.ID) bool {
|
||||
packsWarmer.mu.Lock()
|
||||
defer packsWarmer.mu.Unlock()
|
||||
|
||||
if packsWarmer.packs.Has(packID) {
|
||||
return false
|
||||
}
|
||||
packsWarmer.packs.Insert(packID)
|
||||
return true
|
||||
}
|
||||
|
||||
// getResult gets the result of a warmup.
|
||||
// Returns:
|
||||
// - the error returned by the warmup operation
|
||||
// - true if the warmup is in a terminal state
|
||||
func (packsWarmer *PacksWarmer) getResult(packID restic.ID) (error, bool) {
|
||||
packsWarmer.mu.Lock()
|
||||
defer packsWarmer.mu.Unlock()
|
||||
|
||||
packResult, ok := packsWarmer.packsResult[packID]
|
||||
if ok {
|
||||
return packResult, true
|
||||
}
|
||||
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// setResult sets the result of a warmup.
|
||||
func (packsWarmer *PacksWarmer) setResult(packID restic.ID, err error) {
|
||||
packsWarmer.mu.Lock()
|
||||
defer packsWarmer.mu.Unlock()
|
||||
|
||||
packsWarmer.packsResult[packID] = err
|
||||
}
|
65
internal/repository/warmup_test.go
Normal file
65
internal/repository/warmup_test.go
Normal file
|
@ -0,0 +1,65 @@
|
|||
package repository
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/backend/mock"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
)
|
||||
|
||||
func TestWarmupRepository(t *testing.T) {
|
||||
warmupCalls := []backend.Handle{}
|
||||
warmupWaitCalls := []backend.Handle{}
|
||||
isWarm := true
|
||||
|
||||
be := mock.NewBackend()
|
||||
be.WarmupFn = func(ctx context.Context, h backend.Handle) (bool, error) {
|
||||
warmupCalls = append(warmupCalls, h)
|
||||
return isWarm, nil
|
||||
}
|
||||
be.WarmupWaitFn = func(ctx context.Context, h backend.Handle) error {
|
||||
warmupWaitCalls = append(warmupWaitCalls, h)
|
||||
return nil
|
||||
}
|
||||
|
||||
repo, _ := New(be, Options{})
|
||||
packsWarmer := NewPacksWarmer(repo)
|
||||
|
||||
id1, _ := restic.ParseID("1111111111111111111111111111111111111111111111111111111111111111")
|
||||
id2, _ := restic.ParseID("2222222222222222222222222222222222222222222222222222222222222222")
|
||||
id3, _ := restic.ParseID("3333333333333333333333333333333333333333333333333333333333333333")
|
||||
err := packsWarmer.StartWarmup(context.TODO(), restic.IDs{id1, id2})
|
||||
if err != nil {
|
||||
t.Fatalf("error when starting warmup: %v", err)
|
||||
}
|
||||
if len(warmupCalls) != 2 {
|
||||
t.Fatalf("expected 2 calls to warmup, got %d", len(warmupCalls))
|
||||
}
|
||||
|
||||
err = packsWarmer.Wait(context.TODO(), id1)
|
||||
if err != nil {
|
||||
t.Fatalf("error when waiting for warmup: %v", err)
|
||||
}
|
||||
if len(warmupWaitCalls) != 0 {
|
||||
t.Fatal("WarmupWait was called on a warm file")
|
||||
}
|
||||
|
||||
isWarm = false
|
||||
err = packsWarmer.StartWarmup(context.TODO(), restic.IDs{id3})
|
||||
if err != nil {
|
||||
t.Fatalf("error when adding element to warmup: %v", err)
|
||||
}
|
||||
if len(warmupCalls) != 3 {
|
||||
t.Fatalf("expected 3 calls to warmup, got %d", len(warmupCalls))
|
||||
}
|
||||
err = packsWarmer.Wait(context.TODO(), id3)
|
||||
if err != nil {
|
||||
t.Fatalf("error when waiting for warmup: %v", err)
|
||||
}
|
||||
if len(warmupWaitCalls) != 1 {
|
||||
t.Fatalf("expected one call to WarmupWait, got %d", len(warmupWaitCalls))
|
||||
}
|
||||
|
||||
}
|
|
@ -60,6 +60,11 @@ type Repository interface {
|
|||
SaveUnpacked(ctx context.Context, t FileType, buf []byte) (ID, error)
|
||||
// RemoveUnpacked removes a file from the repository. This will eventually be restricted to deleting only snapshots.
|
||||
RemoveUnpacked(ctx context.Context, t FileType, id ID) error
|
||||
|
||||
// WamupPack requests the backend to warmup the specified pack file.
|
||||
WarmupPack(ctx context.Context, pack ID) (bool, error)
|
||||
// WamupPackWait requests the backend to wait for the specified pack file to be warm.
|
||||
WarmupPackWait(ctx context.Context, pack ID) error
|
||||
}
|
||||
|
||||
type FileType = backend.FileType
|
||||
|
|
|
@ -41,12 +41,17 @@ type packInfo struct {
|
|||
}
|
||||
|
||||
type blobsLoaderFn func(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error
|
||||
type warmPackFn func(context.Context, restic.IDs) error
|
||||
type warmPackWaitFn func(context.Context, restic.ID) error
|
||||
|
||||
// fileRestorer restores set of files
|
||||
type fileRestorer struct {
|
||||
idx func(restic.BlobType, restic.ID) []restic.PackedBlob
|
||||
blobsLoader blobsLoaderFn
|
||||
|
||||
warmPacks warmPackFn
|
||||
warmPackWait warmPackWaitFn
|
||||
|
||||
workerCount int
|
||||
filesWriter *filesWriter
|
||||
zeroChunk restic.ID
|
||||
|
@ -66,6 +71,8 @@ func newFileRestorer(dst string,
|
|||
connections uint,
|
||||
sparse bool,
|
||||
allowRecursiveDelete bool,
|
||||
warmPacks warmPackFn,
|
||||
warmPackWait warmPackWaitFn,
|
||||
progress *restore.Progress) *fileRestorer {
|
||||
|
||||
// as packs are streamed the concurrency is limited by IO
|
||||
|
@ -74,6 +81,8 @@ func newFileRestorer(dst string,
|
|||
return &fileRestorer{
|
||||
idx: idx,
|
||||
blobsLoader: blobsLoader,
|
||||
warmPacks: warmPacks,
|
||||
warmPackWait: warmPackWait,
|
||||
filesWriter: newFilesWriter(workerCount, allowRecursiveDelete),
|
||||
zeroChunk: repository.ZeroChunk(),
|
||||
sparse: sparse,
|
||||
|
@ -192,11 +201,18 @@ func (r *fileRestorer) restoreFiles(ctx context.Context) error {
|
|||
// drop no longer necessary file list
|
||||
r.files = nil
|
||||
|
||||
if err := r.warmPacks(ctx, packOrder); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
wg, ctx := errgroup.WithContext(ctx)
|
||||
downloadCh := make(chan *packInfo)
|
||||
|
||||
worker := func() error {
|
||||
for pack := range downloadCh {
|
||||
if err := r.warmPackWait(ctx, pack.id); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := r.downloadPack(ctx, pack); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"fmt"
|
||||
"os"
|
||||
"sort"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/restic/restic/internal/errors"
|
||||
|
@ -31,6 +32,9 @@ type TestRepo struct {
|
|||
files []*fileInfo
|
||||
filesPathToContent map[string]string
|
||||
|
||||
warmupPacks restic.IDSet
|
||||
warmupPacksWait restic.IDSet
|
||||
|
||||
//
|
||||
loader blobsLoaderFn
|
||||
}
|
||||
|
@ -44,6 +48,24 @@ func (i *TestRepo) fileContent(file *fileInfo) string {
|
|||
return i.filesPathToContent[file.location]
|
||||
}
|
||||
|
||||
var warmupMu sync.Mutex
|
||||
|
||||
func (i *TestRepo) WarmupPacks(ctx context.Context, packs restic.IDs) error {
|
||||
warmupMu.Lock()
|
||||
defer warmupMu.Unlock()
|
||||
|
||||
i.warmupPacks.Merge(restic.NewIDSet(packs...))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *TestRepo) WarmupPacksWait(ctx context.Context, pack restic.ID) error {
|
||||
warmupMu.Lock()
|
||||
defer warmupMu.Unlock()
|
||||
|
||||
i.warmupPacksWait.Insert(pack)
|
||||
return nil
|
||||
}
|
||||
|
||||
func newTestRepo(content []TestFile) *TestRepo {
|
||||
type Pack struct {
|
||||
name string
|
||||
|
@ -111,6 +133,8 @@ func newTestRepo(content []TestFile) *TestRepo {
|
|||
blobs: blobs,
|
||||
files: files,
|
||||
filesPathToContent: filesPathToContent,
|
||||
warmupPacks: restic.NewIDSet(),
|
||||
warmupPacksWait: restic.NewIDSet(),
|
||||
}
|
||||
repo.loader = func(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||
blobs = append([]restic.Blob{}, blobs...)
|
||||
|
@ -144,7 +168,7 @@ func restoreAndVerify(t *testing.T, tempdir string, content []TestFile, files ma
|
|||
t.Helper()
|
||||
repo := newTestRepo(content)
|
||||
|
||||
r := newFileRestorer(tempdir, repo.loader, repo.Lookup, 2, sparse, false, nil)
|
||||
r := newFileRestorer(tempdir, repo.loader, repo.Lookup, 2, sparse, false, repo.WarmupPacks, repo.WarmupPacksWait, nil)
|
||||
|
||||
if files == nil {
|
||||
r.files = repo.files
|
||||
|
@ -177,6 +201,13 @@ func verifyRestore(t *testing.T, r *fileRestorer, repo *TestRepo) {
|
|||
t.Errorf("file %v has wrong content: want %q, got %q", file.location, content, data)
|
||||
}
|
||||
}
|
||||
|
||||
if len(repo.warmupPacks) == 0 {
|
||||
t.Errorf("warmup did not occur")
|
||||
}
|
||||
if len(repo.warmupPacksWait) == 0 {
|
||||
t.Errorf("warmup wait did not occur")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFileRestorerBasic(t *testing.T) {
|
||||
|
@ -285,7 +316,7 @@ func TestErrorRestoreFiles(t *testing.T) {
|
|||
return loadError
|
||||
}
|
||||
|
||||
r := newFileRestorer(tempdir, repo.loader, repo.Lookup, 2, false, false, nil)
|
||||
r := newFileRestorer(tempdir, repo.loader, repo.Lookup, 2, false, false, repo.WarmupPacks, repo.WarmupPacksWait, nil)
|
||||
r.files = repo.files
|
||||
|
||||
err := r.restoreFiles(context.TODO())
|
||||
|
@ -326,7 +357,7 @@ func TestFatalDownloadError(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
r := newFileRestorer(tempdir, repo.loader, repo.Lookup, 2, false, false, nil)
|
||||
r := newFileRestorer(tempdir, repo.loader, repo.Lookup, 2, false, false, repo.WarmupPacks, repo.WarmupPacksWait, nil)
|
||||
r.files = repo.files
|
||||
|
||||
var errors []string
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/fs"
|
||||
"github.com/restic/restic/internal/repository"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
"github.com/restic/restic/internal/ui/progress"
|
||||
restoreui "github.com/restic/restic/internal/ui/restore"
|
||||
|
@ -353,8 +354,9 @@ func (res *Restorer) RestoreTo(ctx context.Context, dst string) (uint64, error)
|
|||
}
|
||||
|
||||
idx := NewHardlinkIndex[string]()
|
||||
packsWarmer := repository.NewPacksWarmer(res.repo)
|
||||
filerestorer := newFileRestorer(dst, res.repo.LoadBlobsFromPack, res.repo.LookupBlob,
|
||||
res.repo.Connections(), res.opts.Sparse, res.opts.Delete, res.opts.Progress)
|
||||
res.repo.Connections(), res.opts.Sparse, res.opts.Delete, packsWarmer.StartWarmup, packsWarmer.Wait, res.opts.Progress)
|
||||
filerestorer.Error = res.Error
|
||||
|
||||
debug.Log("first pass for %q", dst)
|
||||
|
|
Loading…
Reference in a new issue