diff --git a/cmd/restic/global.go b/cmd/restic/global.go index a45ced8e8..3e6c5f3a9 100644 --- a/cmd/restic/global.go +++ b/cmd/restic/global.go @@ -20,10 +20,12 @@ import ( "github.com/restic/restic/internal/backend/limiter" "github.com/restic/restic/internal/backend/local" "github.com/restic/restic/internal/backend/location" + "github.com/restic/restic/internal/backend/logger" "github.com/restic/restic/internal/backend/rclone" "github.com/restic/restic/internal/backend/rest" "github.com/restic/restic/internal/backend/retry" "github.com/restic/restic/internal/backend/s3" + "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/backend/sftp" "github.com/restic/restic/internal/backend/swift" "github.com/restic/restic/internal/cache" @@ -744,6 +746,9 @@ func open(ctx context.Context, s string, gopts GlobalOptions, opts options.Optio return nil, errors.Fatalf("unable to open repository at %v: %v", location.StripPassword(s), err) } + // wrap with debug logging and connection limiting + be = logger.New(sema.NewBackend(be)) + // wrap backend if a test specified an inner hook if gopts.backendInnerTestHook != nil { be, err = gopts.backendInnerTestHook(be) @@ -788,27 +793,34 @@ func create(ctx context.Context, s string, opts options.Options) (restic.Backend return nil, err } + var be restic.Backend switch loc.Scheme { case "local": - return local.Create(ctx, cfg.(local.Config)) + be, err = local.Create(ctx, cfg.(local.Config)) case "sftp": - return sftp.Create(ctx, cfg.(sftp.Config)) + be, err = sftp.Create(ctx, cfg.(sftp.Config)) case "s3": - return s3.Create(ctx, cfg.(s3.Config), rt) + be, err = s3.Create(ctx, cfg.(s3.Config), rt) case "gs": - return gs.Create(cfg.(gs.Config), rt) + be, err = gs.Create(ctx, cfg.(gs.Config), rt) case "azure": - return azure.Create(ctx, cfg.(azure.Config), rt) + be, err = azure.Create(ctx, cfg.(azure.Config), rt) case "swift": - return swift.Open(ctx, cfg.(swift.Config), rt) + be, err = swift.Open(ctx, cfg.(swift.Config), rt) case "b2": - return b2.Create(ctx, cfg.(b2.Config), rt) + be, err = b2.Create(ctx, cfg.(b2.Config), rt) case "rest": - return rest.Create(ctx, cfg.(rest.Config), rt) + be, err = rest.Create(ctx, cfg.(rest.Config), rt) case "rclone": - return rclone.Create(ctx, cfg.(rclone.Config)) + be, err = rclone.Create(ctx, cfg.(rclone.Config)) + default: + debug.Log("invalid repository scheme: %v", s) + return nil, errors.Fatalf("invalid scheme %q", loc.Scheme) } - debug.Log("invalid repository scheme: %v", s) - return nil, errors.Fatalf("invalid scheme %q", loc.Scheme) + if err != nil { + return nil, err + } + + return logger.New(sema.NewBackend(be)), nil } diff --git a/internal/backend/azure/azure.go b/internal/backend/azure/azure.go index c92fa3f89..9a3695f0f 100644 --- a/internal/backend/azure/azure.go +++ b/internal/backend/azure/azure.go @@ -14,7 +14,6 @@ import ( "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend/layout" - "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" @@ -26,7 +25,6 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob" azContainer "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" - "github.com/cenkalti/backoff/v4" ) // Backend stores data on an azure endpoint. @@ -34,7 +32,6 @@ type Backend struct { cfg Config container *azContainer.Client connections uint - sem sema.Semaphore prefix string listMaxItems int layout.Layout @@ -96,16 +93,10 @@ func open(cfg Config, rt http.RoundTripper) (*Backend, error) { return nil, errors.New("no azure authentication information found") } - sem, err := sema.New(cfg.Connections) - if err != nil { - return nil, err - } - be := &Backend{ container: client, cfg: cfg, connections: cfg.Connections, - sem: sem, Layout: &layout.DefaultLayout{ Path: cfg.Prefix, Join: path.Join, @@ -152,7 +143,6 @@ func (be *Backend) SetListMaxItems(i int) { // IsNotExist returns true if the error is caused by a not existing file. func (be *Backend) IsNotExist(err error) bool { - debug.Log("IsNotExist(%T, %#v)", err, err) return bloberror.HasCode(err, bloberror.BlobNotFound) } @@ -187,16 +177,8 @@ func (be *Backend) Path() string { // Save stores data in the backend at the handle. func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { - if err := h.Valid(); err != nil { - return backoff.Permanent(err) - } - objName := be.Filename(h) - debug.Log("Save %v at %v", h, objName) - - be.sem.GetToken() - debug.Log("InsertObject(%v, %v)", be.cfg.AccountName, objName) var err error @@ -208,9 +190,6 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe err = be.saveLarge(ctx, objName, rd) } - be.sem.ReleaseToken() - debug.Log("%v, err %#v", objName, err) - return err } @@ -299,23 +278,9 @@ func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset } func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { - debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h)) - if err := h.Valid(); err != nil { - return nil, backoff.Permanent(err) - } - - if offset < 0 { - return nil, errors.New("offset is negative") - } - - if length < 0 { - return nil, errors.Errorf("invalid length %d", length) - } - objName := be.Filename(h) blockBlobClient := be.container.NewBlobClient(objName) - be.sem.GetToken() resp, err := blockBlobClient.DownloadStream(ctx, &blob.DownloadStreamOptions{ Range: azblob.HTTPRange{ Offset: offset, @@ -324,26 +289,20 @@ func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, }) if err != nil { - be.sem.ReleaseToken() return nil, err } - return be.sem.ReleaseTokenOnClose(resp.Body, nil), err + return resp.Body, err } // Stat returns information about a blob. func (be *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) { - debug.Log("%v", h) - objName := be.Filename(h) blobClient := be.container.NewBlobClient(objName) - be.sem.GetToken() props, err := blobClient.GetProperties(ctx, nil) - be.sem.ReleaseToken() if err != nil { - debug.Log("blob.GetProperties err %v", err) return restic.FileInfo{}, errors.Wrap(err, "blob.GetProperties") } @@ -359,11 +318,7 @@ func (be *Backend) Remove(ctx context.Context, h restic.Handle) error { objName := be.Filename(h) blob := be.container.NewBlobClient(objName) - be.sem.GetToken() _, err := blob.Delete(ctx, &azblob.DeleteBlobOptions{}) - be.sem.ReleaseToken() - - debug.Log("Remove(%v) at %v -> err %v", h, objName, err) if be.IsNotExist(err) { return nil @@ -375,8 +330,6 @@ func (be *Backend) Remove(ctx context.Context, h restic.Handle) error { // List runs fn for each file in the backend which has the type t. When an // error occurs (or fn returns an error), List stops and returns it. func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error { - debug.Log("listing %v", t) - prefix, _ := be.Basedir(t) // make sure prefix ends with a slash @@ -393,9 +346,7 @@ func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.F lister := be.container.NewListBlobsFlatPager(opts) for lister.More() { - be.sem.GetToken() resp, err := lister.NextPage(ctx) - be.sem.ReleaseToken() if err != nil { return err @@ -433,30 +384,9 @@ func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.F return ctx.Err() } -// Remove keys for a specified backend type. -func (be *Backend) removeKeys(ctx context.Context, t restic.FileType) error { - return be.List(ctx, t, func(fi restic.FileInfo) error { - return be.Remove(ctx, restic.Handle{Type: t, Name: fi.Name}) - }) -} - // Delete removes all restic keys in the bucket. It will not remove the bucket itself. func (be *Backend) Delete(ctx context.Context) error { - alltypes := []restic.FileType{ - restic.PackFile, - restic.KeyFile, - restic.LockFile, - restic.SnapshotFile, - restic.IndexFile} - - for _, t := range alltypes { - err := be.removeKeys(ctx, t) - if err != nil { - return nil - } - } - - return be.Remove(ctx, restic.Handle{Type: restic.ConfigFile}) + return backend.DefaultDelete(ctx, be) } // Close does nothing diff --git a/internal/backend/b2/b2.go b/internal/backend/b2/b2.go index 40dbbf893..738df198d 100644 --- a/internal/backend/b2/b2.go +++ b/internal/backend/b2/b2.go @@ -11,12 +11,10 @@ import ( "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend/layout" - "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" - "github.com/cenkalti/backoff/v4" "github.com/kurin/blazer/b2" "github.com/kurin/blazer/base" ) @@ -28,7 +26,6 @@ type b2Backend struct { cfg Config listMaxItems int layout.Layout - sem sema.Semaphore canDelete bool } @@ -92,11 +89,6 @@ func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend return nil, errors.Wrap(err, "Bucket") } - sem, err := sema.New(cfg.Connections) - if err != nil { - return nil, err - } - be := &b2Backend{ client: client, bucket: bucket, @@ -106,7 +98,6 @@ func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend Path: cfg.Prefix, }, listMaxItems: defaultListMaxItems, - sem: sem, canDelete: true, } @@ -134,11 +125,6 @@ func Create(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backe return nil, errors.Wrap(err, "NewBucket") } - sem, err := sema.New(cfg.Connections) - if err != nil { - return nil, err - } - be := &b2Backend{ client: client, bucket: bucket, @@ -148,7 +134,6 @@ func Create(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backe Path: cfg.Prefix, }, listMaxItems: defaultListMaxItems, - sem: sem, } _, err = be.Stat(ctx, restic.Handle{Type: restic.ConfigFile}) @@ -202,33 +187,18 @@ func (be *b2Backend) IsNotExist(err error) bool { // Load runs fn with a reader that yields the contents of the file at h at the // given offset. func (be *b2Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn) } func (be *b2Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { - debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h)) - if err := h.Valid(); err != nil { - return nil, backoff.Permanent(err) - } - - if offset < 0 { - return nil, errors.New("offset is negative") - } - - if length < 0 { - return nil, errors.Errorf("invalid length %d", length) - } - - ctx, cancel := context.WithCancel(ctx) - - be.sem.GetToken() - name := be.Layout.Filename(h) obj := be.bucket.Object(name) if offset == 0 && length == 0 { - rd := obj.NewReader(ctx) - return be.sem.ReleaseTokenOnClose(rd, cancel), nil + return obj.NewReader(ctx), nil } // pass a negative length to NewRangeReader so that the remainder of the @@ -237,8 +207,7 @@ func (be *b2Backend) openReader(ctx context.Context, h restic.Handle, length int length = -1 } - rd := obj.NewRangeReader(ctx, offset, int64(length)) - return be.sem.ReleaseTokenOnClose(rd, cancel), nil + return obj.NewRangeReader(ctx, offset, int64(length)), nil } // Save stores data in the backend at the handle. @@ -246,21 +215,12 @@ func (be *b2Backend) Save(ctx context.Context, h restic.Handle, rd restic.Rewind ctx, cancel := context.WithCancel(ctx) defer cancel() - if err := h.Valid(); err != nil { - return backoff.Permanent(err) - } - - be.sem.GetToken() - defer be.sem.ReleaseToken() - name := be.Filename(h) - debug.Log("Save %v, name %v", h, name) obj := be.bucket.Object(name) // b2 always requires sha1 checksums for uploaded file parts w := obj.NewWriter(ctx) n, err := io.Copy(w, rd) - debug.Log(" saved %d bytes, err %v", n, err) if err != nil { _ = w.Close() @@ -276,16 +236,10 @@ func (be *b2Backend) Save(ctx context.Context, h restic.Handle, rd restic.Rewind // Stat returns information about a blob. func (be *b2Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInfo, err error) { - debug.Log("Stat %v", h) - - be.sem.GetToken() - defer be.sem.ReleaseToken() - name := be.Filename(h) obj := be.bucket.Object(name) info, err := obj.Attrs(ctx) if err != nil { - debug.Log("Attrs() err %v", err) return restic.FileInfo{}, errors.Wrap(err, "Stat") } return restic.FileInfo{Size: info.Size, Name: h.Name}, nil @@ -293,11 +247,6 @@ func (be *b2Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileI // Remove removes the blob with the given name and type. func (be *b2Backend) Remove(ctx context.Context, h restic.Handle) error { - debug.Log("Remove %v", h) - - be.sem.GetToken() - defer be.sem.ReleaseToken() - // the retry backend will also repeat the remove method up to 10 times for i := 0; i < 3; i++ { obj := be.bucket.Object(be.Filename(h)) @@ -332,22 +281,13 @@ func (be *b2Backend) Remove(ctx context.Context, h restic.Handle) error { return errors.New("failed to delete all file versions") } -type semLocker struct { - sema.Semaphore -} - -func (sm *semLocker) Lock() { sm.GetToken() } -func (sm *semLocker) Unlock() { sm.ReleaseToken() } - // List returns a channel that yields all names of blobs of type t. func (be *b2Backend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error { - debug.Log("List %v", t) - ctx, cancel := context.WithCancel(ctx) defer cancel() prefix, _ := be.Basedir(t) - iter := be.bucket.List(ctx, b2.ListPrefix(prefix), b2.ListPageSize(be.listMaxItems), b2.ListLocker(&semLocker{be.sem})) + iter := be.bucket.List(ctx, b2.ListPrefix(prefix), b2.ListPageSize(be.listMaxItems)) for iter.Next() { obj := iter.Object() @@ -367,41 +307,14 @@ func (be *b2Backend) List(ctx context.Context, t restic.FileType, fn func(restic } } if err := iter.Err(); err != nil { - debug.Log("List: %v", err) return err } return nil } -// Remove keys for a specified backend type. -func (be *b2Backend) removeKeys(ctx context.Context, t restic.FileType) error { - debug.Log("removeKeys %v", t) - return be.List(ctx, t, func(fi restic.FileInfo) error { - return be.Remove(ctx, restic.Handle{Type: t, Name: fi.Name}) - }) -} - // Delete removes all restic keys in the bucket. It will not remove the bucket itself. func (be *b2Backend) Delete(ctx context.Context) error { - alltypes := []restic.FileType{ - restic.PackFile, - restic.KeyFile, - restic.LockFile, - restic.SnapshotFile, - restic.IndexFile} - - for _, t := range alltypes { - err := be.removeKeys(ctx, t) - if err != nil { - return nil - } - } - err := be.Remove(ctx, restic.Handle{Type: restic.ConfigFile}) - if err != nil && be.IsNotExist(err) { - err = nil - } - - return err + return backend.DefaultDelete(ctx, be) } // Close does nothing diff --git a/internal/backend/dryrun/dry_backend.go b/internal/backend/dryrun/dry_backend.go index 37569c320..487e2bc33 100644 --- a/internal/backend/dryrun/dry_backend.go +++ b/internal/backend/dryrun/dry_backend.go @@ -18,10 +18,9 @@ type Backend struct { b restic.Backend } -// statically ensure that RetryBackend implements restic.Backend. +// statically ensure that Backend implements restic.Backend. var _ restic.Backend = &Backend{} -// New returns a new backend that saves all data in a map in memory. func New(be restic.Backend) *Backend { b := &Backend{b: be} debug.Log("created new dry backend") @@ -34,8 +33,6 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe return err } - debug.Log("faked saving %v bytes at %v", rd.Length(), h) - // don't save anything, just return ok return nil } diff --git a/internal/backend/dryrun/dry_backend_test.go b/internal/backend/dryrun/dry_backend_test.go index 6b8f74e0f..69716c340 100644 --- a/internal/backend/dryrun/dry_backend_test.go +++ b/internal/backend/dryrun/dry_backend_test.go @@ -40,11 +40,9 @@ func TestDry(t *testing.T) { {d, "delete", "", "", ""}, {d, "stat", "a", "", "not found"}, {d, "list", "", "", ""}, - {d, "save", "", "", "invalid"}, {m, "save", "a", "baz", ""}, // save a directly to the mem backend {d, "save", "b", "foob", ""}, // b is not saved {d, "save", "b", "xxx", ""}, // no error as b is not saved - {d, "stat", "", "", "invalid"}, {d, "stat", "a", "a 3", ""}, {d, "load", "a", "baz", ""}, {d, "load", "b", "", "not found"}, diff --git a/internal/backend/gs/gs.go b/internal/backend/gs/gs.go index 77cbcda97..62e5c4954 100644 --- a/internal/backend/gs/gs.go +++ b/internal/backend/gs/gs.go @@ -15,7 +15,6 @@ import ( "github.com/pkg/errors" "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend/layout" - "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/restic" @@ -37,7 +36,6 @@ type Backend struct { gcsClient *storage.Client projectID string connections uint - sem sema.Semaphore bucketName string bucket *storage.BucketHandle prefix string @@ -99,16 +97,10 @@ func open(cfg Config, rt http.RoundTripper) (*Backend, error) { return nil, errors.Wrap(err, "getStorageClient") } - sem, err := sema.New(cfg.Connections) - if err != nil { - return nil, err - } - be := &Backend{ gcsClient: gcsClient, projectID: cfg.ProjectID, connections: cfg.Connections, - sem: sem, bucketName: cfg.Bucket, bucket: gcsClient.Bucket(cfg.Bucket), prefix: cfg.Prefix, @@ -132,14 +124,13 @@ func Open(cfg Config, rt http.RoundTripper) (restic.Backend, error) { // // The service account must have the "storage.buckets.create" permission to // create a bucket the does not yet exist. -func Create(cfg Config, rt http.RoundTripper) (restic.Backend, error) { +func Create(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend, error) { be, err := open(cfg, rt) if err != nil { return nil, errors.Wrap(err, "open") } // Try to determine if the bucket exists. If it does not, try to create it. - ctx := context.Background() exists, err := be.bucketExists(ctx, be.bucket) if err != nil { if e, ok := err.(*googleapi.Error); ok && e.Code == http.StatusForbidden { @@ -169,7 +160,6 @@ func (be *Backend) SetListMaxItems(i int) { // IsNotExist returns true if the error is caused by a not existing file. func (be *Backend) IsNotExist(err error) bool { - debug.Log("IsNotExist(%T, %#v)", err, err) return errors.Is(err, storage.ErrObjectNotExist) } @@ -204,18 +194,8 @@ func (be *Backend) Path() string { // Save stores data in the backend at the handle. func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { - if err := h.Valid(); err != nil { - return err - } - objName := be.Filename(h) - debug.Log("Save %v at %v", h, objName) - - be.sem.GetToken() - - debug.Log("InsertObject(%v, %v)", be.bucketName, objName) - // Set chunk size to zero to disable resumable uploads. // // With a non-zero chunk size (the default is @@ -250,14 +230,10 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe err = cerr } - be.sem.ReleaseToken() - if err != nil { - debug.Log("%v: err %#v: %v", objName, err, err) return errors.Wrap(err, "service.Objects.Insert") } - debug.Log("%v -> %v bytes", objName, wbytes) // sanity check if wbytes != rd.Length() { return errors.Errorf("wrote %d bytes instead of the expected %d bytes", wbytes, rd.Length()) @@ -268,22 +244,13 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe // Load runs fn with a reader that yields the contents of the file at h at the // given offset. func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn) } func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { - debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h)) - if err := h.Valid(); err != nil { - return nil, err - } - - if offset < 0 { - return nil, errors.New("offset is negative") - } - - if length < 0 { - return nil, errors.Errorf("invalid length %d", length) - } if length == 0 { // negative length indicates read till end to GCS lib length = -1 @@ -291,32 +258,21 @@ func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, objName := be.Filename(h) - be.sem.GetToken() - - ctx, cancel := context.WithCancel(ctx) - r, err := be.bucket.Object(objName).NewRangeReader(ctx, offset, int64(length)) if err != nil { - cancel() - be.sem.ReleaseToken() return nil, err } - return be.sem.ReleaseTokenOnClose(r, cancel), err + return r, err } // Stat returns information about a blob. func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInfo, err error) { - debug.Log("%v", h) - objName := be.Filename(h) - be.sem.GetToken() attr, err := be.bucket.Object(objName).Attrs(ctx) - be.sem.ReleaseToken() if err != nil { - debug.Log("GetObjectAttributes() err %v", err) return restic.FileInfo{}, errors.Wrap(err, "service.Objects.Get") } @@ -327,23 +283,18 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf func (be *Backend) Remove(ctx context.Context, h restic.Handle) error { objName := be.Filename(h) - be.sem.GetToken() err := be.bucket.Object(objName).Delete(ctx) - be.sem.ReleaseToken() - if err == storage.ErrObjectNotExist { + if be.IsNotExist(err) { err = nil } - debug.Log("Remove(%v) at %v -> err %v", h, objName, err) return errors.Wrap(err, "client.RemoveObject") } // List runs fn for each file in the backend which has the type t. When an // error occurs (or fn returns an error), List stops and returns it. func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error { - debug.Log("listing %v", t) - prefix, _ := be.Basedir(t) // make sure prefix ends with a slash @@ -357,9 +308,7 @@ func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.F itr := be.bucket.Objects(ctx, &storage.Query{Prefix: prefix}) for { - be.sem.GetToken() attrs, err := itr.Next() - be.sem.ReleaseToken() if err == iterator.Done { break } @@ -389,30 +338,9 @@ func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.F return ctx.Err() } -// Remove keys for a specified backend type. -func (be *Backend) removeKeys(ctx context.Context, t restic.FileType) error { - return be.List(ctx, t, func(fi restic.FileInfo) error { - return be.Remove(ctx, restic.Handle{Type: t, Name: fi.Name}) - }) -} - // Delete removes all restic keys in the bucket. It will not remove the bucket itself. func (be *Backend) Delete(ctx context.Context) error { - alltypes := []restic.FileType{ - restic.PackFile, - restic.KeyFile, - restic.LockFile, - restic.SnapshotFile, - restic.IndexFile} - - for _, t := range alltypes { - err := be.removeKeys(ctx, t) - if err != nil { - return nil - } - } - - return be.Remove(ctx, restic.Handle{Type: restic.ConfigFile}) + return backend.DefaultDelete(ctx, be) } // Close does nothing. diff --git a/internal/backend/gs/gs_test.go b/internal/backend/gs/gs_test.go index 77f8986f1..19ae8b829 100644 --- a/internal/backend/gs/gs_test.go +++ b/internal/backend/gs/gs_test.go @@ -42,7 +42,7 @@ func newGSTestSuite(t testing.TB) *test.Suite { Create: func(config interface{}) (restic.Backend, error) { cfg := config.(gs.Config) - be, err := gs.Create(cfg, tr) + be, err := gs.Create(context.Background(), cfg, tr) if err != nil { return nil, err } diff --git a/internal/backend/limiter/limiter_backend.go b/internal/backend/limiter/limiter_backend.go index f1b508327..7fcca59cc 100644 --- a/internal/backend/limiter/limiter_backend.go +++ b/internal/backend/limiter/limiter_backend.go @@ -46,6 +46,8 @@ func (r rateLimitedBackend) Load(ctx context.Context, h restic.Handle, length in }) } +func (r rateLimitedBackend) Unwrap() restic.Backend { return r.Backend } + type limitedReader struct { io.Reader writerTo io.WriterTo diff --git a/internal/backend/local/local.go b/internal/backend/local/local.go index 1716e0f07..ca806f754 100644 --- a/internal/backend/local/local.go +++ b/internal/backend/local/local.go @@ -10,7 +10,6 @@ import ( "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend/layout" - "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/fs" @@ -22,7 +21,6 @@ import ( // Local is a backend in a local directory. type Local struct { Config - sem sema.Semaphore layout.Layout backend.Modes } @@ -38,11 +36,6 @@ func open(ctx context.Context, cfg Config) (*Local, error) { return nil, err } - sem, err := sema.New(cfg.Connections) - if err != nil { - return nil, err - } - fi, err := fs.Stat(l.Filename(restic.Handle{Type: restic.ConfigFile})) m := backend.DeriveModesFromFileInfo(fi, err) debug.Log("using (%03O file, %03O dir) permissions", m.File, m.Dir) @@ -50,7 +43,6 @@ func open(ctx context.Context, cfg Config) (*Local, error) { return &Local{ Config: cfg, Layout: l, - sem: sem, Modes: m, }, nil } @@ -114,11 +106,6 @@ func (b *Local) IsNotExist(err error) bool { // Save stores data in the backend at the handle. func (b *Local) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) (err error) { - debug.Log("Save %v", h) - if err := h.Valid(); err != nil { - return backoff.Permanent(err) - } - finalname := b.Filename(h) dir := filepath.Dir(finalname) @@ -129,9 +116,6 @@ func (b *Local) Save(ctx context.Context, h restic.Handle, rd restic.RewindReade } }() - b.sem.GetToken() - defer b.sem.ReleaseToken() - // Create new file with a temporary name. tmpname := filepath.Base(finalname) + "-tmp-" f, err := tempFile(dir, tmpname) @@ -217,50 +201,28 @@ func (b *Local) Load(ctx context.Context, h restic.Handle, length int, offset in } func (b *Local) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { - debug.Log("Load %v, length %v, offset %v", h, length, offset) - if err := h.Valid(); err != nil { - return nil, backoff.Permanent(err) - } - - if offset < 0 { - return nil, errors.New("offset is negative") - } - - b.sem.GetToken() f, err := fs.Open(b.Filename(h)) if err != nil { - b.sem.ReleaseToken() return nil, err } if offset > 0 { _, err = f.Seek(offset, 0) if err != nil { - b.sem.ReleaseToken() _ = f.Close() return nil, err } } - r := b.sem.ReleaseTokenOnClose(f, nil) - if length > 0 { - return backend.LimitReadCloser(r, int64(length)), nil + return backend.LimitReadCloser(f, int64(length)), nil } - return r, nil + return f, nil } // Stat returns information about a blob. func (b *Local) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) { - debug.Log("Stat %v", h) - if err := h.Valid(); err != nil { - return restic.FileInfo{}, backoff.Permanent(err) - } - - b.sem.GetToken() - defer b.sem.ReleaseToken() - fi, err := fs.Stat(b.Filename(h)) if err != nil { return restic.FileInfo{}, errors.WithStack(err) @@ -271,12 +233,8 @@ func (b *Local) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, err // Remove removes the blob with the given name and type. func (b *Local) Remove(ctx context.Context, h restic.Handle) error { - debug.Log("Remove %v", h) fn := b.Filename(h) - b.sem.GetToken() - defer b.sem.ReleaseToken() - // reset read-only flag err := fs.Chmod(fn, 0666) if err != nil && !os.IsPermission(err) { @@ -289,8 +247,6 @@ func (b *Local) Remove(ctx context.Context, h restic.Handle) error { // List runs fn for each file in the backend which has the type t. When an // error occurs (or fn returns an error), List stops and returns it. func (b *Local) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) (err error) { - debug.Log("List %v", t) - basedir, subdirs := b.Basedir(t) if subdirs { err = visitDirs(ctx, basedir, fn) @@ -384,13 +340,11 @@ func visitFiles(ctx context.Context, dir string, fn func(restic.FileInfo) error, // Delete removes the repository and all files. func (b *Local) Delete(ctx context.Context) error { - debug.Log("Delete()") return fs.RemoveAll(b.Path) } // Close closes all open files. func (b *Local) Close() error { - debug.Log("Close()") // this does not need to do anything, all open files are closed within the // same function. return nil diff --git a/internal/backend/logger/log.go b/internal/backend/logger/log.go new file mode 100644 index 000000000..6c860cfae --- /dev/null +++ b/internal/backend/logger/log.go @@ -0,0 +1,79 @@ +package logger + +import ( + "context" + "io" + + "github.com/restic/restic/internal/debug" + "github.com/restic/restic/internal/restic" +) + +type Backend struct { + restic.Backend +} + +// statically ensure that Backend implements restic.Backend. +var _ restic.Backend = &Backend{} + +func New(be restic.Backend) *Backend { + return &Backend{Backend: be} +} + +func (be *Backend) IsNotExist(err error) bool { + isNotExist := be.Backend.IsNotExist(err) + debug.Log("IsNotExist(%T, %#v, %v)", err, err, isNotExist) + return isNotExist +} + +// Save adds new Data to the backend. +func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { + debug.Log("Save(%v, %v)", h, rd.Length()) + err := be.Backend.Save(ctx, h, rd) + debug.Log(" save err %v", err) + return err +} + +// Remove deletes a file from the backend. +func (be *Backend) Remove(ctx context.Context, h restic.Handle) error { + debug.Log("Remove(%v)", h) + err := be.Backend.Remove(ctx, h) + debug.Log(" remove err %v", err) + return err +} + +func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(io.Reader) error) error { + debug.Log("Load(%v, length %v, offset %v)", h, length, offset) + err := be.Backend.Load(ctx, h, length, offset, fn) + debug.Log(" load err %v", err) + return err +} + +func (be *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) { + debug.Log("Stat(%v)", h) + fi, err := be.Backend.Stat(ctx, h) + debug.Log(" stat err %v", err) + return fi, err +} + +func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error { + debug.Log("List(%v)", t) + err := be.Backend.List(ctx, t, fn) + debug.Log(" list err %v", err) + return err +} + +func (be *Backend) Delete(ctx context.Context) error { + debug.Log("Delete()") + err := be.Backend.Delete(ctx) + debug.Log(" delete err %v", err) + return err +} + +func (be *Backend) Close() error { + debug.Log("Close()") + err := be.Backend.Close() + debug.Log(" close err %v", err) + return err +} + +func (be *Backend) Unwrap() restic.Backend { return be.Backend } diff --git a/internal/backend/mem/mem_backend.go b/internal/backend/mem/mem_backend.go index 0c46dcd6e..618ef5752 100644 --- a/internal/backend/mem/mem_backend.go +++ b/internal/backend/mem/mem_backend.go @@ -10,12 +10,9 @@ import ( "github.com/cespare/xxhash/v2" "github.com/restic/restic/internal/backend" - "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" - - "github.com/cenkalti/backoff/v4" ) type memMap map[restic.Handle][]byte @@ -32,19 +29,12 @@ const connectionCount = 2 type MemoryBackend struct { data memMap m sync.Mutex - sem sema.Semaphore } // New returns a new backend that saves all data in a map in memory. func New() *MemoryBackend { - sem, err := sema.New(connectionCount) - if err != nil { - panic(err) - } - be := &MemoryBackend{ data: make(memMap), - sem: sem, } debug.Log("created new memory backend") @@ -59,13 +49,6 @@ func (be *MemoryBackend) IsNotExist(err error) bool { // Save adds new Data to the backend. func (be *MemoryBackend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { - if err := h.Valid(); err != nil { - return backoff.Permanent(err) - } - - be.sem.GetToken() - defer be.sem.ReleaseToken() - be.m.Lock() defer be.m.Unlock() @@ -102,7 +85,6 @@ func (be *MemoryBackend) Save(ctx context.Context, h restic.Handle, rd restic.Re } be.data[h] = buf - debug.Log("saved %v bytes at %v", len(buf), h) return ctx.Err() } @@ -114,11 +96,6 @@ func (be *MemoryBackend) Load(ctx context.Context, h restic.Handle, length int, } func (be *MemoryBackend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { - if err := h.Valid(); err != nil { - return nil, backoff.Permanent(err) - } - - be.sem.GetToken() be.m.Lock() defer be.m.Unlock() @@ -127,21 +104,12 @@ func (be *MemoryBackend) openReader(ctx context.Context, h restic.Handle, length h.Name = "" } - debug.Log("Load %v offset %v len %v", h, offset, length) - - if offset < 0 { - be.sem.ReleaseToken() - return nil, errors.New("offset is negative") - } - if _, ok := be.data[h]; !ok { - be.sem.ReleaseToken() return nil, errNotFound } buf := be.data[h] if offset > int64(len(buf)) { - be.sem.ReleaseToken() return nil, errors.New("offset beyond end of file") } @@ -150,18 +118,11 @@ func (be *MemoryBackend) openReader(ctx context.Context, h restic.Handle, length buf = buf[:length] } - return be.sem.ReleaseTokenOnClose(io.NopCloser(bytes.NewReader(buf)), nil), ctx.Err() + return io.NopCloser(bytes.NewReader(buf)), ctx.Err() } // Stat returns information about a file in the backend. func (be *MemoryBackend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) { - if err := h.Valid(); err != nil { - return restic.FileInfo{}, backoff.Permanent(err) - } - - be.sem.GetToken() - defer be.sem.ReleaseToken() - be.m.Lock() defer be.m.Unlock() @@ -170,8 +131,6 @@ func (be *MemoryBackend) Stat(ctx context.Context, h restic.Handle) (restic.File h.Name = "" } - debug.Log("stat %v", h) - e, ok := be.data[h] if !ok { return restic.FileInfo{}, errNotFound @@ -182,14 +141,9 @@ func (be *MemoryBackend) Stat(ctx context.Context, h restic.Handle) (restic.File // Remove deletes a file from the backend. func (be *MemoryBackend) Remove(ctx context.Context, h restic.Handle) error { - be.sem.GetToken() - defer be.sem.ReleaseToken() - be.m.Lock() defer be.m.Unlock() - debug.Log("Remove %v", h) - h.ContainedBlobType = restic.InvalidBlob if _, ok := be.data[h]; !ok { return errNotFound diff --git a/internal/backend/rest/rest.go b/internal/backend/rest/rest.go index f4c2897b9..7be5a07c7 100644 --- a/internal/backend/rest/rest.go +++ b/internal/backend/rest/rest.go @@ -11,13 +11,11 @@ import ( "path" "strings" + "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend/layout" - "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" - - "github.com/cenkalti/backoff/v4" ) // make sure the rest backend implements restic.Backend @@ -27,7 +25,6 @@ var _ restic.Backend = &Backend{} type Backend struct { url *url.URL connections uint - sem sema.Semaphore client http.Client layout.Layout } @@ -40,11 +37,6 @@ const ( // Open opens the REST backend with the given config. func Open(cfg Config, rt http.RoundTripper) (*Backend, error) { - sem, err := sema.New(cfg.Connections) - if err != nil { - return nil, err - } - // use url without trailing slash for layout url := cfg.URL.String() if url[len(url)-1] == '/' { @@ -56,7 +48,6 @@ func Open(cfg Config, rt http.RoundTripper) (*Backend, error) { client: http.Client{Transport: rt}, Layout: &layout.RESTLayout{URL: url, Join: path.Join}, connections: cfg.Connections, - sem: sem, } return be, nil @@ -123,10 +114,6 @@ func (b *Backend) HasAtomicReplace() bool { // Save stores data in the backend at the handle. func (b *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { - if err := h.Valid(); err != nil { - return backoff.Permanent(err) - } - ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -143,9 +130,7 @@ func (b *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRea // let's the server know what's coming. req.ContentLength = rd.Length() - b.sem.GetToken() resp, err := b.client.Do(req) - b.sem.ReleaseToken() var cerr error if resp != nil { @@ -212,19 +197,6 @@ func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset } func (b *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { - debug.Log("Load %v, length %v, offset %v", h, length, offset) - if err := h.Valid(); err != nil { - return nil, backoff.Permanent(err) - } - - if offset < 0 { - return nil, errors.New("offset is negative") - } - - if length < 0 { - return nil, errors.Errorf("invalid length %d", length) - } - req, err := http.NewRequestWithContext(ctx, "GET", b.Filename(h), nil) if err != nil { return nil, errors.WithStack(err) @@ -236,11 +208,8 @@ func (b *Backend) openReader(ctx context.Context, h restic.Handle, length int, o } req.Header.Set("Range", byteRange) req.Header.Set("Accept", ContentTypeV2) - debug.Log("Load(%v) send range %v", h, byteRange) - b.sem.GetToken() resp, err := b.client.Do(req) - b.sem.ReleaseToken() if err != nil { if resp != nil { @@ -265,19 +234,13 @@ func (b *Backend) openReader(ctx context.Context, h restic.Handle, length int, o // Stat returns information about a blob. func (b *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) { - if err := h.Valid(); err != nil { - return restic.FileInfo{}, backoff.Permanent(err) - } - req, err := http.NewRequestWithContext(ctx, http.MethodHead, b.Filename(h), nil) if err != nil { return restic.FileInfo{}, errors.WithStack(err) } req.Header.Set("Accept", ContentTypeV2) - b.sem.GetToken() resp, err := b.client.Do(req) - b.sem.ReleaseToken() if err != nil { return restic.FileInfo{}, errors.WithStack(err) } @@ -310,19 +273,13 @@ func (b *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, e // Remove removes the blob with the given name and type. func (b *Backend) Remove(ctx context.Context, h restic.Handle) error { - if err := h.Valid(); err != nil { - return backoff.Permanent(err) - } - req, err := http.NewRequestWithContext(ctx, "DELETE", b.Filename(h), nil) if err != nil { return errors.WithStack(err) } req.Header.Set("Accept", ContentTypeV2) - b.sem.GetToken() resp, err := b.client.Do(req) - b.sem.ReleaseToken() if err != nil { return errors.Wrap(err, "client.Do") @@ -359,9 +316,7 @@ func (b *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.Fi } req.Header.Set("Accept", ContentTypeV2) - b.sem.GetToken() resp, err := b.client.Do(req) - b.sem.ReleaseToken() if err != nil { return errors.Wrap(err, "List") @@ -457,32 +412,7 @@ func (b *Backend) Close() error { return nil } -// Remove keys for a specified backend type. -func (b *Backend) removeKeys(ctx context.Context, t restic.FileType) error { - return b.List(ctx, t, func(fi restic.FileInfo) error { - return b.Remove(ctx, restic.Handle{Type: t, Name: fi.Name}) - }) -} - // Delete removes all data in the backend. func (b *Backend) Delete(ctx context.Context) error { - alltypes := []restic.FileType{ - restic.PackFile, - restic.KeyFile, - restic.LockFile, - restic.SnapshotFile, - restic.IndexFile} - - for _, t := range alltypes { - err := b.removeKeys(ctx, t) - if err != nil { - return nil - } - } - - err := b.Remove(ctx, restic.Handle{Type: restic.ConfigFile}) - if err != nil && b.IsNotExist(err) { - return nil - } - return err + return backend.DefaultDelete(ctx, b) } diff --git a/internal/backend/retry/backend_retry.go b/internal/backend/retry/backend_retry.go index b5f2706f4..9c51efedc 100644 --- a/internal/backend/retry/backend_retry.go +++ b/internal/backend/retry/backend_retry.go @@ -191,3 +191,7 @@ func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.F return err } + +func (be *Backend) Unwrap() restic.Backend { + return be.Backend +} diff --git a/internal/backend/s3/s3.go b/internal/backend/s3/s3.go index ad652a206..7b7a761ce 100644 --- a/internal/backend/s3/s3.go +++ b/internal/backend/s3/s3.go @@ -13,12 +13,10 @@ import ( "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend/layout" - "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" - "github.com/cenkalti/backoff/v4" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" ) @@ -26,7 +24,6 @@ import ( // Backend stores data on an S3 endpoint. type Backend struct { client *minio.Client - sem sema.Semaphore cfg Config layout.Layout } @@ -102,14 +99,8 @@ func open(ctx context.Context, cfg Config, rt http.RoundTripper) (*Backend, erro return nil, errors.Wrap(err, "minio.New") } - sem, err := sema.New(cfg.Connections) - if err != nil { - return nil, err - } - be := &Backend{ client: client, - sem: sem, cfg: cfg, } @@ -169,8 +160,6 @@ func isAccessDenied(err error) bool { // IsNotExist returns true if the error is caused by a not existing file. func (be *Backend) IsNotExist(err error) bool { - debug.Log("IsNotExist(%T, %#v)", err, err) - var e minio.ErrorResponse return errors.As(err, &e) && e.Code == "NoSuchKey" } @@ -273,17 +262,8 @@ func (be *Backend) Path() string { // Save stores data in the backend at the handle. func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { - debug.Log("Save %v", h) - - if err := h.Valid(); err != nil { - return backoff.Permanent(err) - } - objName := be.Filename(h) - be.sem.GetToken() - defer be.sem.ReleaseToken() - opts := minio.PutObjectOptions{StorageClass: be.cfg.StorageClass} opts.ContentType = "application/octet-stream" // the only option with the high-level api is to let the library handle the checksum computation @@ -291,11 +271,8 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe // only use multipart uploads for very large files opts.PartSize = 200 * 1024 * 1024 - debug.Log("PutObject(%v, %v, %v)", be.cfg.Bucket, objName, rd.Length()) info, err := be.client.PutObject(ctx, be.cfg.Bucket, objName, io.NopCloser(rd), int64(rd.Length()), opts) - debug.Log("%v -> %v bytes, err %#v: %v", objName, info.Size, err, err) - // sanity check if err == nil && info.Size != rd.Length() { return errors.Errorf("wrote %d bytes instead of the expected %d bytes", info.Size, rd.Length()) @@ -307,32 +284,20 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe // Load runs fn with a reader that yields the contents of the file at h at the // given offset. func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn) } func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { - debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h)) - if err := h.Valid(); err != nil { - return nil, backoff.Permanent(err) - } - - if offset < 0 { - return nil, errors.New("offset is negative") - } - - if length < 0 { - return nil, errors.Errorf("invalid length %d", length) - } - objName := be.Filename(h) opts := minio.GetObjectOptions{} var err error if length > 0 { - debug.Log("range: %v-%v", offset, offset+int64(length)-1) err = opts.SetRange(offset, offset+int64(length)-1) } else if offset > 0 { - debug.Log("range: %v-", offset) err = opts.SetRange(offset, 0) } @@ -340,41 +305,30 @@ func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, return nil, errors.Wrap(err, "SetRange") } - be.sem.GetToken() - ctx, cancel := context.WithCancel(ctx) - coreClient := minio.Core{Client: be.client} rd, _, _, err := coreClient.GetObject(ctx, be.cfg.Bucket, objName, opts) if err != nil { - cancel() - be.sem.ReleaseToken() return nil, err } - return be.sem.ReleaseTokenOnClose(rd, cancel), err + return rd, err } // Stat returns information about a blob. func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInfo, err error) { - debug.Log("%v", h) - objName := be.Filename(h) var obj *minio.Object opts := minio.GetObjectOptions{} - be.sem.GetToken() obj, err = be.client.GetObject(ctx, be.cfg.Bucket, objName, opts) if err != nil { - debug.Log("GetObject() err %v", err) - be.sem.ReleaseToken() return restic.FileInfo{}, errors.Wrap(err, "client.GetObject") } // make sure that the object is closed properly. defer func() { e := obj.Close() - be.sem.ReleaseToken() if err == nil { err = errors.Wrap(e, "Close") } @@ -382,7 +336,6 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf fi, err := obj.Stat() if err != nil { - debug.Log("Stat() err %v", err) return restic.FileInfo{}, errors.Wrap(err, "Stat") } @@ -393,11 +346,7 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf func (be *Backend) Remove(ctx context.Context, h restic.Handle) error { objName := be.Filename(h) - be.sem.GetToken() err := be.client.RemoveObject(ctx, be.cfg.Bucket, objName, minio.RemoveObjectOptions{}) - be.sem.ReleaseToken() - - debug.Log("Remove(%v) at %v -> err %v", h, objName, err) if be.IsNotExist(err) { err = nil @@ -409,8 +358,6 @@ func (be *Backend) Remove(ctx context.Context, h restic.Handle) error { // List runs fn for each file in the backend which has the type t. When an // error occurs (or fn returns an error), List stops and returns it. func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error { - debug.Log("listing %v", t) - prefix, recursive := be.Basedir(t) // make sure prefix ends with a slash @@ -464,30 +411,9 @@ func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.F return ctx.Err() } -// Remove keys for a specified backend type. -func (be *Backend) removeKeys(ctx context.Context, t restic.FileType) error { - return be.List(ctx, restic.PackFile, func(fi restic.FileInfo) error { - return be.Remove(ctx, restic.Handle{Type: t, Name: fi.Name}) - }) -} - // Delete removes all restic keys in the bucket. It will not remove the bucket itself. func (be *Backend) Delete(ctx context.Context) error { - alltypes := []restic.FileType{ - restic.PackFile, - restic.KeyFile, - restic.LockFile, - restic.SnapshotFile, - restic.IndexFile} - - for _, t := range alltypes { - err := be.removeKeys(ctx, t) - if err != nil { - return nil - } - } - - return be.Remove(ctx, restic.Handle{Type: restic.ConfigFile}) + return backend.DefaultDelete(ctx, be) } // Close does nothing diff --git a/internal/backend/sema/backend.go b/internal/backend/sema/backend.go new file mode 100644 index 000000000..fc4a9dde5 --- /dev/null +++ b/internal/backend/sema/backend.go @@ -0,0 +1,91 @@ +package sema + +import ( + "context" + "io" + + "github.com/cenkalti/backoff/v4" + "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/restic" +) + +// make sure that connectionLimitedBackend implements restic.Backend +var _ restic.Backend = &connectionLimitedBackend{} + +// connectionLimitedBackend limits the number of concurrent operations. +type connectionLimitedBackend struct { + restic.Backend + sem semaphore +} + +// NewBackend creates a backend that limits the concurrent operations on the underlying backend +func NewBackend(be restic.Backend) restic.Backend { + sem, err := newSemaphore(be.Connections()) + if err != nil { + panic(err) + } + + return &connectionLimitedBackend{ + Backend: be, + sem: sem, + } +} + +// Save adds new Data to the backend. +func (be *connectionLimitedBackend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { + if err := h.Valid(); err != nil { + return backoff.Permanent(err) + } + + be.sem.GetToken() + defer be.sem.ReleaseToken() + + return be.Backend.Save(ctx, h, rd) +} + +// Load runs fn with a reader that yields the contents of the file at h at the +// given offset. +func (be *connectionLimitedBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + if err := h.Valid(); err != nil { + return backoff.Permanent(err) + } + if offset < 0 { + return backoff.Permanent(errors.New("offset is negative")) + } + if length < 0 { + return backoff.Permanent(errors.Errorf("invalid length %d", length)) + } + + be.sem.GetToken() + defer be.sem.ReleaseToken() + + return be.Backend.Load(ctx, h, length, offset, fn) +} + +// Stat returns information about a file in the backend. +func (be *connectionLimitedBackend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) { + if err := h.Valid(); err != nil { + return restic.FileInfo{}, backoff.Permanent(err) + } + + be.sem.GetToken() + defer be.sem.ReleaseToken() + + return be.Backend.Stat(ctx, h) +} + +// Remove deletes a file from the backend. +func (be *connectionLimitedBackend) Remove(ctx context.Context, h restic.Handle) error { + if err := h.Valid(); err != nil { + return backoff.Permanent(err) + } + + be.sem.GetToken() + defer be.sem.ReleaseToken() + + return be.Backend.Remove(ctx, h) +} + +func (be *connectionLimitedBackend) Unwrap() restic.Backend { + return be.Backend +} diff --git a/internal/backend/sema/backend_test.go b/internal/backend/sema/backend_test.go new file mode 100644 index 000000000..db9559840 --- /dev/null +++ b/internal/backend/sema/backend_test.go @@ -0,0 +1,180 @@ +package sema_test + +import ( + "context" + "io" + "sync/atomic" + "testing" + "time" + + "github.com/restic/restic/internal/backend/mock" + "github.com/restic/restic/internal/backend/sema" + "github.com/restic/restic/internal/restic" + "github.com/restic/restic/internal/test" + "golang.org/x/sync/errgroup" +) + +func TestParameterValidationSave(t *testing.T) { + m := mock.NewBackend() + m.SaveFn = func(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { + return nil + } + be := sema.NewBackend(m) + + err := be.Save(context.TODO(), restic.Handle{}, nil) + test.Assert(t, err != nil, "Save() with invalid handle did not return an error") +} + +func TestParameterValidationLoad(t *testing.T) { + m := mock.NewBackend() + m.OpenReaderFn = func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { + return io.NopCloser(nil), nil + } + + be := sema.NewBackend(m) + nilCb := func(rd io.Reader) error { return nil } + + err := be.Load(context.TODO(), restic.Handle{}, 10, 0, nilCb) + test.Assert(t, err != nil, "Load() with invalid handle did not return an error") + + h := restic.Handle{Type: restic.PackFile, Name: "foobar"} + err = be.Load(context.TODO(), h, 10, -1, nilCb) + test.Assert(t, err != nil, "Save() with negative offset did not return an error") + err = be.Load(context.TODO(), h, -1, 0, nilCb) + test.Assert(t, err != nil, "Save() with negative length did not return an error") +} + +func TestParameterValidationStat(t *testing.T) { + m := mock.NewBackend() + m.StatFn = func(ctx context.Context, h restic.Handle) (restic.FileInfo, error) { + return restic.FileInfo{}, nil + } + be := sema.NewBackend(m) + + _, err := be.Stat(context.TODO(), restic.Handle{}) + test.Assert(t, err != nil, "Stat() with invalid handle did not return an error") +} + +func TestParameterValidationRemove(t *testing.T) { + m := mock.NewBackend() + m.RemoveFn = func(ctx context.Context, h restic.Handle) error { + return nil + } + be := sema.NewBackend(m) + + err := be.Remove(context.TODO(), restic.Handle{}) + test.Assert(t, err != nil, "Remove() with invalid handle did not return an error") +} + +func TestUnwrap(t *testing.T) { + m := mock.NewBackend() + be := sema.NewBackend(m) + + unwrapper := be.(restic.BackendUnwrapper) + test.Assert(t, unwrapper.Unwrap() == m, "Unwrap() returned wrong backend") +} + +func countingBlocker() (func(), func(int) int) { + ctr := int64(0) + blocker := make(chan struct{}) + + wait := func() { + // count how many goroutines were allowed by the semaphore + atomic.AddInt64(&ctr, 1) + // block until the test can retrieve the counter + <-blocker + } + + unblock := func(expected int) int { + // give goroutines enough time to block + var blocked int64 + for i := 0; i < 100 && blocked != int64(expected); i++ { + time.Sleep(100 * time.Microsecond) + blocked = atomic.LoadInt64(&ctr) + } + close(blocker) + return int(blocked) + } + + return wait, unblock +} + +func concurrencyTester(t *testing.T, setup func(m *mock.Backend), handler func(be restic.Backend) func() error, unblock func(int) int) { + expectBlocked := int(2) + + m := mock.NewBackend() + setup(m) + m.ConnectionsFn = func() uint { return uint(expectBlocked) } + be := sema.NewBackend(m) + + var wg errgroup.Group + for i := 0; i < int(expectBlocked+1); i++ { + wg.Go(handler(be)) + } + + blocked := unblock(expectBlocked) + test.Assert(t, blocked == expectBlocked, "Unexpected number of goroutines blocked: %v", blocked) + test.OK(t, wg.Wait()) +} + +func TestConcurrencyLimitSave(t *testing.T) { + wait, unblock := countingBlocker() + concurrencyTester(t, func(m *mock.Backend) { + m.SaveFn = func(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { + wait() + return nil + } + }, func(be restic.Backend) func() error { + return func() error { + h := restic.Handle{Type: restic.PackFile, Name: "foobar"} + return be.Save(context.TODO(), h, nil) + } + }, unblock) +} + +func TestConcurrencyLimitLoad(t *testing.T) { + wait, unblock := countingBlocker() + concurrencyTester(t, func(m *mock.Backend) { + m.OpenReaderFn = func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { + wait() + return io.NopCloser(nil), nil + } + }, func(be restic.Backend) func() error { + return func() error { + h := restic.Handle{Type: restic.PackFile, Name: "foobar"} + nilCb := func(rd io.Reader) error { return nil } + return be.Load(context.TODO(), h, 10, 0, nilCb) + } + }, unblock) +} + +func TestConcurrencyLimitStat(t *testing.T) { + wait, unblock := countingBlocker() + concurrencyTester(t, func(m *mock.Backend) { + m.StatFn = func(ctx context.Context, h restic.Handle) (restic.FileInfo, error) { + wait() + return restic.FileInfo{}, nil + } + }, func(be restic.Backend) func() error { + return func() error { + h := restic.Handle{Type: restic.PackFile, Name: "foobar"} + _, err := be.Stat(context.TODO(), h) + return err + } + }, unblock) +} + +func TestConcurrencyLimitDelete(t *testing.T) { + wait, unblock := countingBlocker() + concurrencyTester(t, func(m *mock.Backend) { + m.RemoveFn = func(ctx context.Context, h restic.Handle) error { + wait() + return nil + } + }, func(be restic.Backend) func() error { + return func() error { + h := restic.Handle{Type: restic.PackFile, Name: "foobar"} + return be.Remove(context.TODO(), h) + } + }, unblock) +} diff --git a/internal/backend/sema/semaphore.go b/internal/backend/sema/semaphore.go index 7ee912979..c664eef7c 100644 --- a/internal/backend/sema/semaphore.go +++ b/internal/backend/sema/semaphore.go @@ -2,64 +2,30 @@ package sema import ( - "context" - "io" - + "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" ) -// A Semaphore limits access to a restricted resource. -type Semaphore struct { +// A semaphore limits access to a restricted resource. +type semaphore struct { ch chan struct{} } -// New returns a new semaphore with capacity n. -func New(n uint) (Semaphore, error) { +// newSemaphore returns a new semaphore with capacity n. +func newSemaphore(n uint) (semaphore, error) { if n == 0 { - return Semaphore{}, errors.New("capacity must be a positive number") + return semaphore{}, errors.New("capacity must be a positive number") } - return Semaphore{ + return semaphore{ ch: make(chan struct{}, n), }, nil } // GetToken blocks until a Token is available. -func (s Semaphore) GetToken() { s.ch <- struct{}{} } +func (s semaphore) GetToken() { + s.ch <- struct{}{} + debug.Log("acquired token") +} // ReleaseToken returns a token. -func (s Semaphore) ReleaseToken() { <-s.ch } - -// ReleaseTokenOnClose wraps an io.ReadCloser to return a token on Close. -// Before returning the token, cancel, if not nil, will be run -// to free up context resources. -func (s Semaphore) ReleaseTokenOnClose(rc io.ReadCloser, cancel context.CancelFunc) io.ReadCloser { - return &wrapReader{ReadCloser: rc, sem: s, cancel: cancel} -} - -type wrapReader struct { - io.ReadCloser - eofSeen bool - sem Semaphore - cancel context.CancelFunc -} - -func (wr *wrapReader) Read(p []byte) (int, error) { - if wr.eofSeen { // XXX Why do we do this? - return 0, io.EOF - } - - n, err := wr.ReadCloser.Read(p) - if err == io.EOF { - wr.eofSeen = true - } - return n, err -} - -func (wr *wrapReader) Close() error { - err := wr.ReadCloser.Close() - if wr.cancel != nil { - wr.cancel() - } - wr.sem.ReleaseToken() - return err -} +func (s semaphore) ReleaseToken() { <-s.ch } diff --git a/internal/backend/sftp/sftp.go b/internal/backend/sftp/sftp.go index 514dd58da..e97a5f9c8 100644 --- a/internal/backend/sftp/sftp.go +++ b/internal/backend/sftp/sftp.go @@ -15,7 +15,6 @@ import ( "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend/layout" - "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" @@ -35,7 +34,6 @@ type SFTP struct { posixRename bool - sem sema.Semaphore layout.Layout Config backend.Modes @@ -140,11 +138,7 @@ func Open(ctx context.Context, cfg Config) (*SFTP, error) { } func open(ctx context.Context, sftp *SFTP, cfg Config) (*SFTP, error) { - sem, err := sema.New(cfg.Connections) - if err != nil { - return nil, err - } - + var err error sftp.Layout, err = layout.ParseLayout(ctx, sftp, cfg.Layout, defaultLayout, cfg.Path) if err != nil { return nil, err @@ -158,7 +152,6 @@ func open(ctx context.Context, sftp *SFTP, cfg Config) (*SFTP, error) { sftp.Config = cfg sftp.p = cfg.Path - sftp.sem = sem sftp.Modes = m return sftp, nil } @@ -304,22 +297,14 @@ func tempSuffix() string { // Save stores data in the backend at the handle. func (r *SFTP) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { - debug.Log("Save %v", h) if err := r.clientError(); err != nil { return err } - if err := h.Valid(); err != nil { - return backoff.Permanent(err) - } - filename := r.Filename(h) tmpFilename := filename + "-restic-temp-" + tempSuffix() dirname := r.Dirname(h) - r.sem.GetToken() - defer r.sem.ReleaseToken() - // create new file f, err := r.c.OpenFile(tmpFilename, os.O_CREATE|os.O_EXCL|os.O_WRONLY) @@ -415,77 +400,35 @@ func (r *SFTP) Load(ctx context.Context, h restic.Handle, length int, offset int return backend.DefaultLoad(ctx, h, length, offset, r.openReader, fn) } -// wrapReader wraps an io.ReadCloser to run an additional function on Close. -type wrapReader struct { - io.ReadCloser - io.WriterTo - f func() -} - -func (wr *wrapReader) Close() error { - err := wr.ReadCloser.Close() - wr.f() - return err -} - func (r *SFTP) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { - debug.Log("Load %v, length %v, offset %v", h, length, offset) - if err := h.Valid(); err != nil { - return nil, backoff.Permanent(err) - } - - if offset < 0 { - return nil, errors.New("offset is negative") - } - - r.sem.GetToken() f, err := r.c.Open(r.Filename(h)) if err != nil { - r.sem.ReleaseToken() return nil, err } if offset > 0 { _, err = f.Seek(offset, 0) if err != nil { - r.sem.ReleaseToken() _ = f.Close() return nil, err } } - // use custom close wrapper to also provide WriteTo() on the wrapper - rd := &wrapReader{ - ReadCloser: f, - WriterTo: f, - f: func() { - r.sem.ReleaseToken() - }, - } - if length > 0 { // unlimited reads usually use io.Copy which needs WriteTo support at the underlying reader // limited reads are usually combined with io.ReadFull which reads all required bytes into a buffer in one go - return backend.LimitReadCloser(rd, int64(length)), nil + return backend.LimitReadCloser(f, int64(length)), nil } - return rd, nil + return f, nil } // Stat returns information about a blob. func (r *SFTP) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) { - debug.Log("Stat(%v)", h) if err := r.clientError(); err != nil { return restic.FileInfo{}, err } - if err := h.Valid(); err != nil { - return restic.FileInfo{}, backoff.Permanent(err) - } - - r.sem.GetToken() - defer r.sem.ReleaseToken() - fi, err := r.c.Lstat(r.Filename(h)) if err != nil { return restic.FileInfo{}, errors.Wrap(err, "Lstat") @@ -496,28 +439,20 @@ func (r *SFTP) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, erro // Remove removes the content stored at name. func (r *SFTP) Remove(ctx context.Context, h restic.Handle) error { - debug.Log("Remove(%v)", h) if err := r.clientError(); err != nil { return err } - r.sem.GetToken() - defer r.sem.ReleaseToken() - return r.c.Remove(r.Filename(h)) } // List runs fn for each file in the backend which has the type t. When an // error occurs (or fn returns an error), List stops and returns it. func (r *SFTP) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error { - debug.Log("List %v", t) - basedir, subdirs := r.Basedir(t) walker := r.c.Walk(basedir) for { - r.sem.GetToken() ok := walker.Step() - r.sem.ReleaseToken() if !ok { break } @@ -572,7 +507,6 @@ var closeTimeout = 2 * time.Second // Close closes the sftp connection and terminates the underlying command. func (r *SFTP) Close() error { - debug.Log("Close") if r == nil { return nil } diff --git a/internal/backend/swift/swift.go b/internal/backend/swift/swift.go index 764c7bb62..cfa9ed665 100644 --- a/internal/backend/swift/swift.go +++ b/internal/backend/swift/swift.go @@ -15,12 +15,10 @@ import ( "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend/layout" - "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" - "github.com/cenkalti/backoff/v4" "github.com/ncw/swift/v2" ) @@ -28,7 +26,6 @@ import ( type beSwift struct { conn *swift.Connection connections uint - sem sema.Semaphore container string // Container name prefix string // Prefix of object names in the container layout.Layout @@ -42,11 +39,6 @@ var _ restic.Backend = &beSwift{} func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend, error) { debug.Log("config %#v", cfg) - sem, err := sema.New(cfg.Connections) - if err != nil { - return nil, err - } - be := &beSwift{ conn: &swift.Connection{ UserName: cfg.UserName, @@ -72,7 +64,6 @@ func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend Transport: rt, }, connections: cfg.Connections, - sem: sem, container: cfg.Container, prefix: cfg.Prefix, Layout: &layout.DefaultLayout{ @@ -143,18 +134,6 @@ func (be *beSwift) Load(ctx context.Context, h restic.Handle, length int, offset } func (be *beSwift) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { - debug.Log("Load %v, length %v, offset %v", h, length, offset) - if err := h.Valid(); err != nil { - return nil, backoff.Permanent(err) - } - - if offset < 0 { - return nil, errors.New("offset is negative") - } - - if length < 0 { - return nil, errors.Errorf("invalid length %d", length) - } objName := be.Filename(h) @@ -167,59 +146,34 @@ func (be *beSwift) openReader(ctx context.Context, h restic.Handle, length int, headers["Range"] = fmt.Sprintf("bytes=%d-%d", offset, offset+int64(length)-1) } - if _, ok := headers["Range"]; ok { - debug.Log("Load(%v) send range %v", h, headers["Range"]) - } - - be.sem.GetToken() obj, _, err := be.conn.ObjectOpen(ctx, be.container, objName, false, headers) if err != nil { - debug.Log(" err %v", err) - be.sem.ReleaseToken() return nil, errors.Wrap(err, "conn.ObjectOpen") } - return be.sem.ReleaseTokenOnClose(obj, nil), nil + return obj, nil } // Save stores data in the backend at the handle. func (be *beSwift) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { - if err := h.Valid(); err != nil { - return backoff.Permanent(err) - } - objName := be.Filename(h) - - debug.Log("Save %v at %v", h, objName) - - be.sem.GetToken() - defer be.sem.ReleaseToken() - encoding := "binary/octet-stream" - debug.Log("PutObject(%v, %v, %v)", be.container, objName, encoding) hdr := swift.Headers{"Content-Length": strconv.FormatInt(rd.Length(), 10)} _, err := be.conn.ObjectPut(ctx, be.container, objName, rd, true, hex.EncodeToString(rd.Hash()), encoding, hdr) // swift does not return the upload length - debug.Log("%v, err %#v", objName, err) return errors.Wrap(err, "client.PutObject") } // Stat returns information about a blob. func (be *beSwift) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInfo, err error) { - debug.Log("%v", h) - objName := be.Filename(h) - be.sem.GetToken() - defer be.sem.ReleaseToken() - obj, _, err := be.conn.Object(ctx, be.container, objName) if err != nil { - debug.Log("Object() err %v", err) return restic.FileInfo{}, errors.Wrap(err, "conn.Object") } @@ -230,27 +184,19 @@ func (be *beSwift) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf func (be *beSwift) Remove(ctx context.Context, h restic.Handle) error { objName := be.Filename(h) - be.sem.GetToken() - defer be.sem.ReleaseToken() - err := be.conn.ObjectDelete(ctx, be.container, objName) - debug.Log("Remove(%v) -> err %v", h, err) return errors.Wrap(err, "conn.ObjectDelete") } // List runs fn for each file in the backend which has the type t. When an // error occurs (or fn returns an error), List stops and returns it. func (be *beSwift) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error { - debug.Log("listing %v", t) - prefix, _ := be.Basedir(t) prefix += "/" err := be.conn.ObjectsWalk(ctx, be.container, &swift.ObjectsOpts{Prefix: prefix}, func(ctx context.Context, opts *swift.ObjectsOpts) (interface{}, error) { - be.sem.GetToken() newObjects, err := be.conn.Objects(ctx, be.container, opts) - be.sem.ReleaseToken() if err != nil { return nil, errors.Wrap(err, "conn.ObjectNames") @@ -285,13 +231,6 @@ func (be *beSwift) List(ctx context.Context, t restic.FileType, fn func(restic.F return ctx.Err() } -// Remove keys for a specified backend type. -func (be *beSwift) removeKeys(ctx context.Context, t restic.FileType) error { - return be.List(ctx, t, func(fi restic.FileInfo) error { - return be.Remove(ctx, restic.Handle{Type: t, Name: fi.Name}) - }) -} - // IsNotExist returns true if the error is caused by a not existing file. func (be *beSwift) IsNotExist(err error) bool { var e *swift.Error @@ -301,26 +240,7 @@ func (be *beSwift) IsNotExist(err error) bool { // Delete removes all restic objects in the container. // It will not remove the container itself. func (be *beSwift) Delete(ctx context.Context) error { - alltypes := []restic.FileType{ - restic.PackFile, - restic.KeyFile, - restic.LockFile, - restic.SnapshotFile, - restic.IndexFile} - - for _, t := range alltypes { - err := be.removeKeys(ctx, t) - if err != nil { - return nil - } - } - - err := be.Remove(ctx, restic.Handle{Type: restic.ConfigFile}) - if err != nil && !be.IsNotExist(err) { - return err - } - - return nil + return backend.DefaultDelete(ctx, be) } // Close does nothing diff --git a/internal/backend/test/tests.go b/internal/backend/test/tests.go index b98af59c3..53a10f446 100644 --- a/internal/backend/test/tests.go +++ b/internal/backend/test/tests.go @@ -124,17 +124,7 @@ func (s *Suite) TestLoad(t *testing.T) { b := s.open(t) defer s.close(t, b) - noop := func(rd io.Reader) error { - return nil - } - - err := b.Load(context.TODO(), restic.Handle{}, 0, 0, noop) - if err == nil { - t.Fatalf("Load() did not return an error for invalid handle") - } - test.Assert(t, !b.IsNotExist(err), "IsNotExist() should not accept an invalid handle error: %v", err) - - err = testLoad(b, restic.Handle{Type: restic.PackFile, Name: "foobar"}, 0, 0) + err := testLoad(b, restic.Handle{Type: restic.PackFile, Name: "foobar"}, 0, 0) if err == nil { t.Fatalf("Load() did not return an error for non-existing blob") } @@ -153,11 +143,6 @@ func (s *Suite) TestLoad(t *testing.T) { t.Logf("saved %d bytes as %v", length, handle) - err = b.Load(context.TODO(), handle, 100, -1, noop) - if err == nil { - t.Fatalf("Load() returned no error for negative offset!") - } - err = b.Load(context.TODO(), handle, 0, 0, func(rd io.Reader) error { _, err := io.Copy(io.Discard, rd) if err != nil { diff --git a/internal/backend/utils.go b/internal/backend/utils.go index d2ac44670..cd6614f34 100644 --- a/internal/backend/utils.go +++ b/internal/backend/utils.go @@ -62,6 +62,7 @@ func LimitReadCloser(r io.ReadCloser, n int64) *LimitedReadCloser { func DefaultLoad(ctx context.Context, h restic.Handle, length int, offset int64, openReader func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error), fn func(rd io.Reader) error) error { + rd, err := openReader(ctx, h, length, offset) if err != nil { return err @@ -74,6 +75,31 @@ func DefaultLoad(ctx context.Context, h restic.Handle, length int, offset int64, return rd.Close() } +// DefaultDelete removes all restic keys in the bucket. It will not remove the bucket itself. +func DefaultDelete(ctx context.Context, be restic.Backend) error { + alltypes := []restic.FileType{ + restic.PackFile, + restic.KeyFile, + restic.LockFile, + restic.SnapshotFile, + restic.IndexFile} + + for _, t := range alltypes { + err := be.List(ctx, t, func(fi restic.FileInfo) error { + return be.Remove(ctx, restic.Handle{Type: t, Name: fi.Name}) + }) + if err != nil { + return nil + } + } + err := be.Remove(ctx, restic.Handle{Type: restic.ConfigFile}) + if err != nil && be.IsNotExist(err) { + err = nil + } + + return err +} + type memorizedLister struct { fileInfos []restic.FileInfo tpe restic.FileType diff --git a/internal/cache/backend.go b/internal/cache/backend.go index a707f8243..08ec1facd 100644 --- a/internal/cache/backend.go +++ b/internal/cache/backend.go @@ -211,3 +211,7 @@ func (b *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, e func (b *Backend) IsNotExist(err error) bool { return b.Backend.IsNotExist(err) } + +func (b *Backend) Unwrap() restic.Backend { + return b.Backend +} diff --git a/internal/migrations/s3_layout.go b/internal/migrations/s3_layout.go index d42b94bf8..a5293ef16 100644 --- a/internal/migrations/s3_layout.go +++ b/internal/migrations/s3_layout.go @@ -8,7 +8,6 @@ import ( "github.com/restic/restic/internal/backend/layout" "github.com/restic/restic/internal/backend/s3" - "github.com/restic/restic/internal/cache" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" @@ -22,24 +21,26 @@ func init() { // "default" layout. type S3Layout struct{} -func toS3Backend(repo restic.Repository) *s3.Backend { - b := repo.Backend() - // unwrap cache - if be, ok := b.(*cache.Backend); ok { - b = be.Backend - } +func toS3Backend(b restic.Backend) *s3.Backend { + for b != nil { + if be, ok := b.(*s3.Backend); ok { + return be + } - be, ok := b.(*s3.Backend) - if !ok { - debug.Log("backend is not s3") - return nil + if be, ok := b.(restic.BackendUnwrapper); ok { + b = be.Unwrap() + } else { + // not the backend we're looking for + break + } } - return be + debug.Log("backend is not s3") + return nil } // Check tests whether the migration can be applied. func (m *S3Layout) Check(ctx context.Context, repo restic.Repository) (bool, string, error) { - be := toS3Backend(repo) + be := toS3Backend(repo.Backend()) if be == nil { debug.Log("backend is not s3") return false, "backend is not s3", nil @@ -91,7 +92,7 @@ func (m *S3Layout) moveFiles(ctx context.Context, be *s3.Backend, l layout.Layou // Apply runs the migration. func (m *S3Layout) Apply(ctx context.Context, repo restic.Repository) error { - be := toS3Backend(repo) + be := toS3Backend(repo.Backend()) if be == nil { debug.Log("backend is not s3") return errors.New("backend is not s3") diff --git a/internal/migrations/s3_layout_test.go b/internal/migrations/s3_layout_test.go new file mode 100644 index 000000000..ad0eedea6 --- /dev/null +++ b/internal/migrations/s3_layout_test.go @@ -0,0 +1,27 @@ +package migrations + +import ( + "testing" + + "github.com/restic/restic/internal/backend/mock" + "github.com/restic/restic/internal/backend/s3" + "github.com/restic/restic/internal/cache" + "github.com/restic/restic/internal/test" +) + +func TestS3UnwrapBackend(t *testing.T) { + // toS3Backend(b restic.Backend) *s3.Backend + + m := mock.NewBackend() + test.Assert(t, toS3Backend(m) == nil, "mock backend is not an s3 backend") + + // uninitialized fake backend for testing + s3 := &s3.Backend{} + test.Assert(t, toS3Backend(s3) == s3, "s3 was not returned") + + c := &cache.Backend{Backend: s3} + test.Assert(t, toS3Backend(c) == s3, "failed to unwrap s3 backend") + + c.Backend = m + test.Assert(t, toS3Backend(c) == nil, "a wrapped mock backend is not an s3 backend") +} diff --git a/internal/restic/backend.go b/internal/restic/backend.go index bc139fc8b..b01071132 100644 --- a/internal/restic/backend.go +++ b/internal/restic/backend.go @@ -70,6 +70,11 @@ type Backend interface { Delete(ctx context.Context) error } +type BackendUnwrapper interface { + // Unwrap returns the underlying backend or nil if there is none. + Unwrap() Backend +} + // FileInfo is contains information about a file in the backend. type FileInfo struct { Size int64