Merge pull request #4286 from MichaelEischer/backend-cleanup-logging
Normalize backend logging and connection limiting
This commit is contained in:
commit
51d823348d
25 changed files with 502 additions and 736 deletions
|
@ -20,10 +20,12 @@ import (
|
|||
"github.com/restic/restic/internal/backend/limiter"
|
||||
"github.com/restic/restic/internal/backend/local"
|
||||
"github.com/restic/restic/internal/backend/location"
|
||||
"github.com/restic/restic/internal/backend/logger"
|
||||
"github.com/restic/restic/internal/backend/rclone"
|
||||
"github.com/restic/restic/internal/backend/rest"
|
||||
"github.com/restic/restic/internal/backend/retry"
|
||||
"github.com/restic/restic/internal/backend/s3"
|
||||
"github.com/restic/restic/internal/backend/sema"
|
||||
"github.com/restic/restic/internal/backend/sftp"
|
||||
"github.com/restic/restic/internal/backend/swift"
|
||||
"github.com/restic/restic/internal/cache"
|
||||
|
@ -744,6 +746,9 @@ func open(ctx context.Context, s string, gopts GlobalOptions, opts options.Optio
|
|||
return nil, errors.Fatalf("unable to open repository at %v: %v", location.StripPassword(s), err)
|
||||
}
|
||||
|
||||
// wrap with debug logging and connection limiting
|
||||
be = logger.New(sema.NewBackend(be))
|
||||
|
||||
// wrap backend if a test specified an inner hook
|
||||
if gopts.backendInnerTestHook != nil {
|
||||
be, err = gopts.backendInnerTestHook(be)
|
||||
|
@ -788,27 +793,34 @@ func create(ctx context.Context, s string, opts options.Options) (restic.Backend
|
|||
return nil, err
|
||||
}
|
||||
|
||||
var be restic.Backend
|
||||
switch loc.Scheme {
|
||||
case "local":
|
||||
return local.Create(ctx, cfg.(local.Config))
|
||||
be, err = local.Create(ctx, cfg.(local.Config))
|
||||
case "sftp":
|
||||
return sftp.Create(ctx, cfg.(sftp.Config))
|
||||
be, err = sftp.Create(ctx, cfg.(sftp.Config))
|
||||
case "s3":
|
||||
return s3.Create(ctx, cfg.(s3.Config), rt)
|
||||
be, err = s3.Create(ctx, cfg.(s3.Config), rt)
|
||||
case "gs":
|
||||
return gs.Create(cfg.(gs.Config), rt)
|
||||
be, err = gs.Create(ctx, cfg.(gs.Config), rt)
|
||||
case "azure":
|
||||
return azure.Create(ctx, cfg.(azure.Config), rt)
|
||||
be, err = azure.Create(ctx, cfg.(azure.Config), rt)
|
||||
case "swift":
|
||||
return swift.Open(ctx, cfg.(swift.Config), rt)
|
||||
be, err = swift.Open(ctx, cfg.(swift.Config), rt)
|
||||
case "b2":
|
||||
return b2.Create(ctx, cfg.(b2.Config), rt)
|
||||
be, err = b2.Create(ctx, cfg.(b2.Config), rt)
|
||||
case "rest":
|
||||
return rest.Create(ctx, cfg.(rest.Config), rt)
|
||||
be, err = rest.Create(ctx, cfg.(rest.Config), rt)
|
||||
case "rclone":
|
||||
return rclone.Create(ctx, cfg.(rclone.Config))
|
||||
be, err = rclone.Create(ctx, cfg.(rclone.Config))
|
||||
default:
|
||||
debug.Log("invalid repository scheme: %v", s)
|
||||
return nil, errors.Fatalf("invalid scheme %q", loc.Scheme)
|
||||
}
|
||||
|
||||
debug.Log("invalid repository scheme: %v", s)
|
||||
return nil, errors.Fatalf("invalid scheme %q", loc.Scheme)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return logger.New(sema.NewBackend(be)), nil
|
||||
}
|
||||
|
|
|
@ -14,7 +14,6 @@ import (
|
|||
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/backend/layout"
|
||||
"github.com/restic/restic/internal/backend/sema"
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
|
@ -26,7 +25,6 @@ import (
|
|||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
|
||||
azContainer "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
)
|
||||
|
||||
// Backend stores data on an azure endpoint.
|
||||
|
@ -34,7 +32,6 @@ type Backend struct {
|
|||
cfg Config
|
||||
container *azContainer.Client
|
||||
connections uint
|
||||
sem sema.Semaphore
|
||||
prefix string
|
||||
listMaxItems int
|
||||
layout.Layout
|
||||
|
@ -96,16 +93,10 @@ func open(cfg Config, rt http.RoundTripper) (*Backend, error) {
|
|||
return nil, errors.New("no azure authentication information found")
|
||||
}
|
||||
|
||||
sem, err := sema.New(cfg.Connections)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
be := &Backend{
|
||||
container: client,
|
||||
cfg: cfg,
|
||||
connections: cfg.Connections,
|
||||
sem: sem,
|
||||
Layout: &layout.DefaultLayout{
|
||||
Path: cfg.Prefix,
|
||||
Join: path.Join,
|
||||
|
@ -152,7 +143,6 @@ func (be *Backend) SetListMaxItems(i int) {
|
|||
|
||||
// IsNotExist returns true if the error is caused by a not existing file.
|
||||
func (be *Backend) IsNotExist(err error) bool {
|
||||
debug.Log("IsNotExist(%T, %#v)", err, err)
|
||||
return bloberror.HasCode(err, bloberror.BlobNotFound)
|
||||
}
|
||||
|
||||
|
@ -187,16 +177,8 @@ func (be *Backend) Path() string {
|
|||
|
||||
// Save stores data in the backend at the handle.
|
||||
func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
|
||||
if err := h.Valid(); err != nil {
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
|
||||
objName := be.Filename(h)
|
||||
|
||||
debug.Log("Save %v at %v", h, objName)
|
||||
|
||||
be.sem.GetToken()
|
||||
|
||||
debug.Log("InsertObject(%v, %v)", be.cfg.AccountName, objName)
|
||||
|
||||
var err error
|
||||
|
@ -208,9 +190,6 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe
|
|||
err = be.saveLarge(ctx, objName, rd)
|
||||
}
|
||||
|
||||
be.sem.ReleaseToken()
|
||||
debug.Log("%v, err %#v", objName, err)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -299,23 +278,9 @@ func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset
|
|||
}
|
||||
|
||||
func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h))
|
||||
if err := h.Valid(); err != nil {
|
||||
return nil, backoff.Permanent(err)
|
||||
}
|
||||
|
||||
if offset < 0 {
|
||||
return nil, errors.New("offset is negative")
|
||||
}
|
||||
|
||||
if length < 0 {
|
||||
return nil, errors.Errorf("invalid length %d", length)
|
||||
}
|
||||
|
||||
objName := be.Filename(h)
|
||||
blockBlobClient := be.container.NewBlobClient(objName)
|
||||
|
||||
be.sem.GetToken()
|
||||
resp, err := blockBlobClient.DownloadStream(ctx, &blob.DownloadStreamOptions{
|
||||
Range: azblob.HTTPRange{
|
||||
Offset: offset,
|
||||
|
@ -324,26 +289,20 @@ func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int,
|
|||
})
|
||||
|
||||
if err != nil {
|
||||
be.sem.ReleaseToken()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return be.sem.ReleaseTokenOnClose(resp.Body, nil), err
|
||||
return resp.Body, err
|
||||
}
|
||||
|
||||
// Stat returns information about a blob.
|
||||
func (be *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) {
|
||||
debug.Log("%v", h)
|
||||
|
||||
objName := be.Filename(h)
|
||||
blobClient := be.container.NewBlobClient(objName)
|
||||
|
||||
be.sem.GetToken()
|
||||
props, err := blobClient.GetProperties(ctx, nil)
|
||||
be.sem.ReleaseToken()
|
||||
|
||||
if err != nil {
|
||||
debug.Log("blob.GetProperties err %v", err)
|
||||
return restic.FileInfo{}, errors.Wrap(err, "blob.GetProperties")
|
||||
}
|
||||
|
||||
|
@ -359,11 +318,7 @@ func (be *Backend) Remove(ctx context.Context, h restic.Handle) error {
|
|||
objName := be.Filename(h)
|
||||
blob := be.container.NewBlobClient(objName)
|
||||
|
||||
be.sem.GetToken()
|
||||
_, err := blob.Delete(ctx, &azblob.DeleteBlobOptions{})
|
||||
be.sem.ReleaseToken()
|
||||
|
||||
debug.Log("Remove(%v) at %v -> err %v", h, objName, err)
|
||||
|
||||
if be.IsNotExist(err) {
|
||||
return nil
|
||||
|
@ -375,8 +330,6 @@ func (be *Backend) Remove(ctx context.Context, h restic.Handle) error {
|
|||
// List runs fn for each file in the backend which has the type t. When an
|
||||
// error occurs (or fn returns an error), List stops and returns it.
|
||||
func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error {
|
||||
debug.Log("listing %v", t)
|
||||
|
||||
prefix, _ := be.Basedir(t)
|
||||
|
||||
// make sure prefix ends with a slash
|
||||
|
@ -393,9 +346,7 @@ func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.F
|
|||
lister := be.container.NewListBlobsFlatPager(opts)
|
||||
|
||||
for lister.More() {
|
||||
be.sem.GetToken()
|
||||
resp, err := lister.NextPage(ctx)
|
||||
be.sem.ReleaseToken()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -433,30 +384,9 @@ func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.F
|
|||
return ctx.Err()
|
||||
}
|
||||
|
||||
// Remove keys for a specified backend type.
|
||||
func (be *Backend) removeKeys(ctx context.Context, t restic.FileType) error {
|
||||
return be.List(ctx, t, func(fi restic.FileInfo) error {
|
||||
return be.Remove(ctx, restic.Handle{Type: t, Name: fi.Name})
|
||||
})
|
||||
}
|
||||
|
||||
// Delete removes all restic keys in the bucket. It will not remove the bucket itself.
|
||||
func (be *Backend) Delete(ctx context.Context) error {
|
||||
alltypes := []restic.FileType{
|
||||
restic.PackFile,
|
||||
restic.KeyFile,
|
||||
restic.LockFile,
|
||||
restic.SnapshotFile,
|
||||
restic.IndexFile}
|
||||
|
||||
for _, t := range alltypes {
|
||||
err := be.removeKeys(ctx, t)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return be.Remove(ctx, restic.Handle{Type: restic.ConfigFile})
|
||||
return backend.DefaultDelete(ctx, be)
|
||||
}
|
||||
|
||||
// Close does nothing
|
||||
|
|
|
@ -11,12 +11,10 @@ import (
|
|||
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/backend/layout"
|
||||
"github.com/restic/restic/internal/backend/sema"
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/kurin/blazer/b2"
|
||||
"github.com/kurin/blazer/base"
|
||||
)
|
||||
|
@ -28,7 +26,6 @@ type b2Backend struct {
|
|||
cfg Config
|
||||
listMaxItems int
|
||||
layout.Layout
|
||||
sem sema.Semaphore
|
||||
|
||||
canDelete bool
|
||||
}
|
||||
|
@ -92,11 +89,6 @@ func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend
|
|||
return nil, errors.Wrap(err, "Bucket")
|
||||
}
|
||||
|
||||
sem, err := sema.New(cfg.Connections)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
be := &b2Backend{
|
||||
client: client,
|
||||
bucket: bucket,
|
||||
|
@ -106,7 +98,6 @@ func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend
|
|||
Path: cfg.Prefix,
|
||||
},
|
||||
listMaxItems: defaultListMaxItems,
|
||||
sem: sem,
|
||||
canDelete: true,
|
||||
}
|
||||
|
||||
|
@ -134,11 +125,6 @@ func Create(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backe
|
|||
return nil, errors.Wrap(err, "NewBucket")
|
||||
}
|
||||
|
||||
sem, err := sema.New(cfg.Connections)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
be := &b2Backend{
|
||||
client: client,
|
||||
bucket: bucket,
|
||||
|
@ -148,7 +134,6 @@ func Create(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backe
|
|||
Path: cfg.Prefix,
|
||||
},
|
||||
listMaxItems: defaultListMaxItems,
|
||||
sem: sem,
|
||||
}
|
||||
|
||||
_, err = be.Stat(ctx, restic.Handle{Type: restic.ConfigFile})
|
||||
|
@ -202,33 +187,18 @@ func (be *b2Backend) IsNotExist(err error) bool {
|
|||
// Load runs fn with a reader that yields the contents of the file at h at the
|
||||
// given offset.
|
||||
func (be *b2Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn)
|
||||
}
|
||||
|
||||
func (be *b2Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h))
|
||||
if err := h.Valid(); err != nil {
|
||||
return nil, backoff.Permanent(err)
|
||||
}
|
||||
|
||||
if offset < 0 {
|
||||
return nil, errors.New("offset is negative")
|
||||
}
|
||||
|
||||
if length < 0 {
|
||||
return nil, errors.Errorf("invalid length %d", length)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
be.sem.GetToken()
|
||||
|
||||
name := be.Layout.Filename(h)
|
||||
obj := be.bucket.Object(name)
|
||||
|
||||
if offset == 0 && length == 0 {
|
||||
rd := obj.NewReader(ctx)
|
||||
return be.sem.ReleaseTokenOnClose(rd, cancel), nil
|
||||
return obj.NewReader(ctx), nil
|
||||
}
|
||||
|
||||
// pass a negative length to NewRangeReader so that the remainder of the
|
||||
|
@ -237,8 +207,7 @@ func (be *b2Backend) openReader(ctx context.Context, h restic.Handle, length int
|
|||
length = -1
|
||||
}
|
||||
|
||||
rd := obj.NewRangeReader(ctx, offset, int64(length))
|
||||
return be.sem.ReleaseTokenOnClose(rd, cancel), nil
|
||||
return obj.NewRangeReader(ctx, offset, int64(length)), nil
|
||||
}
|
||||
|
||||
// Save stores data in the backend at the handle.
|
||||
|
@ -246,21 +215,12 @@ func (be *b2Backend) Save(ctx context.Context, h restic.Handle, rd restic.Rewind
|
|||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
if err := h.Valid(); err != nil {
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
|
||||
be.sem.GetToken()
|
||||
defer be.sem.ReleaseToken()
|
||||
|
||||
name := be.Filename(h)
|
||||
debug.Log("Save %v, name %v", h, name)
|
||||
obj := be.bucket.Object(name)
|
||||
|
||||
// b2 always requires sha1 checksums for uploaded file parts
|
||||
w := obj.NewWriter(ctx)
|
||||
n, err := io.Copy(w, rd)
|
||||
debug.Log(" saved %d bytes, err %v", n, err)
|
||||
|
||||
if err != nil {
|
||||
_ = w.Close()
|
||||
|
@ -276,16 +236,10 @@ func (be *b2Backend) Save(ctx context.Context, h restic.Handle, rd restic.Rewind
|
|||
|
||||
// Stat returns information about a blob.
|
||||
func (be *b2Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInfo, err error) {
|
||||
debug.Log("Stat %v", h)
|
||||
|
||||
be.sem.GetToken()
|
||||
defer be.sem.ReleaseToken()
|
||||
|
||||
name := be.Filename(h)
|
||||
obj := be.bucket.Object(name)
|
||||
info, err := obj.Attrs(ctx)
|
||||
if err != nil {
|
||||
debug.Log("Attrs() err %v", err)
|
||||
return restic.FileInfo{}, errors.Wrap(err, "Stat")
|
||||
}
|
||||
return restic.FileInfo{Size: info.Size, Name: h.Name}, nil
|
||||
|
@ -293,11 +247,6 @@ func (be *b2Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileI
|
|||
|
||||
// Remove removes the blob with the given name and type.
|
||||
func (be *b2Backend) Remove(ctx context.Context, h restic.Handle) error {
|
||||
debug.Log("Remove %v", h)
|
||||
|
||||
be.sem.GetToken()
|
||||
defer be.sem.ReleaseToken()
|
||||
|
||||
// the retry backend will also repeat the remove method up to 10 times
|
||||
for i := 0; i < 3; i++ {
|
||||
obj := be.bucket.Object(be.Filename(h))
|
||||
|
@ -332,22 +281,13 @@ func (be *b2Backend) Remove(ctx context.Context, h restic.Handle) error {
|
|||
return errors.New("failed to delete all file versions")
|
||||
}
|
||||
|
||||
type semLocker struct {
|
||||
sema.Semaphore
|
||||
}
|
||||
|
||||
func (sm *semLocker) Lock() { sm.GetToken() }
|
||||
func (sm *semLocker) Unlock() { sm.ReleaseToken() }
|
||||
|
||||
// List returns a channel that yields all names of blobs of type t.
|
||||
func (be *b2Backend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error {
|
||||
debug.Log("List %v", t)
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
prefix, _ := be.Basedir(t)
|
||||
iter := be.bucket.List(ctx, b2.ListPrefix(prefix), b2.ListPageSize(be.listMaxItems), b2.ListLocker(&semLocker{be.sem}))
|
||||
iter := be.bucket.List(ctx, b2.ListPrefix(prefix), b2.ListPageSize(be.listMaxItems))
|
||||
|
||||
for iter.Next() {
|
||||
obj := iter.Object()
|
||||
|
@ -367,41 +307,14 @@ func (be *b2Backend) List(ctx context.Context, t restic.FileType, fn func(restic
|
|||
}
|
||||
}
|
||||
if err := iter.Err(); err != nil {
|
||||
debug.Log("List: %v", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Remove keys for a specified backend type.
|
||||
func (be *b2Backend) removeKeys(ctx context.Context, t restic.FileType) error {
|
||||
debug.Log("removeKeys %v", t)
|
||||
return be.List(ctx, t, func(fi restic.FileInfo) error {
|
||||
return be.Remove(ctx, restic.Handle{Type: t, Name: fi.Name})
|
||||
})
|
||||
}
|
||||
|
||||
// Delete removes all restic keys in the bucket. It will not remove the bucket itself.
|
||||
func (be *b2Backend) Delete(ctx context.Context) error {
|
||||
alltypes := []restic.FileType{
|
||||
restic.PackFile,
|
||||
restic.KeyFile,
|
||||
restic.LockFile,
|
||||
restic.SnapshotFile,
|
||||
restic.IndexFile}
|
||||
|
||||
for _, t := range alltypes {
|
||||
err := be.removeKeys(ctx, t)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
err := be.Remove(ctx, restic.Handle{Type: restic.ConfigFile})
|
||||
if err != nil && be.IsNotExist(err) {
|
||||
err = nil
|
||||
}
|
||||
|
||||
return err
|
||||
return backend.DefaultDelete(ctx, be)
|
||||
}
|
||||
|
||||
// Close does nothing
|
||||
|
|
|
@ -18,10 +18,9 @@ type Backend struct {
|
|||
b restic.Backend
|
||||
}
|
||||
|
||||
// statically ensure that RetryBackend implements restic.Backend.
|
||||
// statically ensure that Backend implements restic.Backend.
|
||||
var _ restic.Backend = &Backend{}
|
||||
|
||||
// New returns a new backend that saves all data in a map in memory.
|
||||
func New(be restic.Backend) *Backend {
|
||||
b := &Backend{b: be}
|
||||
debug.Log("created new dry backend")
|
||||
|
@ -34,8 +33,6 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe
|
|||
return err
|
||||
}
|
||||
|
||||
debug.Log("faked saving %v bytes at %v", rd.Length(), h)
|
||||
|
||||
// don't save anything, just return ok
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -40,11 +40,9 @@ func TestDry(t *testing.T) {
|
|||
{d, "delete", "", "", ""},
|
||||
{d, "stat", "a", "", "not found"},
|
||||
{d, "list", "", "", ""},
|
||||
{d, "save", "", "", "invalid"},
|
||||
{m, "save", "a", "baz", ""}, // save a directly to the mem backend
|
||||
{d, "save", "b", "foob", ""}, // b is not saved
|
||||
{d, "save", "b", "xxx", ""}, // no error as b is not saved
|
||||
{d, "stat", "", "", "invalid"},
|
||||
{d, "stat", "a", "a 3", ""},
|
||||
{d, "load", "a", "baz", ""},
|
||||
{d, "load", "b", "", "not found"},
|
||||
|
|
|
@ -15,7 +15,6 @@ import (
|
|||
"github.com/pkg/errors"
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/backend/layout"
|
||||
"github.com/restic/restic/internal/backend/sema"
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
|
||||
|
@ -37,7 +36,6 @@ type Backend struct {
|
|||
gcsClient *storage.Client
|
||||
projectID string
|
||||
connections uint
|
||||
sem sema.Semaphore
|
||||
bucketName string
|
||||
bucket *storage.BucketHandle
|
||||
prefix string
|
||||
|
@ -99,16 +97,10 @@ func open(cfg Config, rt http.RoundTripper) (*Backend, error) {
|
|||
return nil, errors.Wrap(err, "getStorageClient")
|
||||
}
|
||||
|
||||
sem, err := sema.New(cfg.Connections)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
be := &Backend{
|
||||
gcsClient: gcsClient,
|
||||
projectID: cfg.ProjectID,
|
||||
connections: cfg.Connections,
|
||||
sem: sem,
|
||||
bucketName: cfg.Bucket,
|
||||
bucket: gcsClient.Bucket(cfg.Bucket),
|
||||
prefix: cfg.Prefix,
|
||||
|
@ -132,14 +124,13 @@ func Open(cfg Config, rt http.RoundTripper) (restic.Backend, error) {
|
|||
//
|
||||
// The service account must have the "storage.buckets.create" permission to
|
||||
// create a bucket the does not yet exist.
|
||||
func Create(cfg Config, rt http.RoundTripper) (restic.Backend, error) {
|
||||
func Create(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend, error) {
|
||||
be, err := open(cfg, rt)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "open")
|
||||
}
|
||||
|
||||
// Try to determine if the bucket exists. If it does not, try to create it.
|
||||
ctx := context.Background()
|
||||
exists, err := be.bucketExists(ctx, be.bucket)
|
||||
if err != nil {
|
||||
if e, ok := err.(*googleapi.Error); ok && e.Code == http.StatusForbidden {
|
||||
|
@ -169,7 +160,6 @@ func (be *Backend) SetListMaxItems(i int) {
|
|||
|
||||
// IsNotExist returns true if the error is caused by a not existing file.
|
||||
func (be *Backend) IsNotExist(err error) bool {
|
||||
debug.Log("IsNotExist(%T, %#v)", err, err)
|
||||
return errors.Is(err, storage.ErrObjectNotExist)
|
||||
}
|
||||
|
||||
|
@ -204,18 +194,8 @@ func (be *Backend) Path() string {
|
|||
|
||||
// Save stores data in the backend at the handle.
|
||||
func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
|
||||
if err := h.Valid(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
objName := be.Filename(h)
|
||||
|
||||
debug.Log("Save %v at %v", h, objName)
|
||||
|
||||
be.sem.GetToken()
|
||||
|
||||
debug.Log("InsertObject(%v, %v)", be.bucketName, objName)
|
||||
|
||||
// Set chunk size to zero to disable resumable uploads.
|
||||
//
|
||||
// With a non-zero chunk size (the default is
|
||||
|
@ -250,14 +230,10 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe
|
|||
err = cerr
|
||||
}
|
||||
|
||||
be.sem.ReleaseToken()
|
||||
|
||||
if err != nil {
|
||||
debug.Log("%v: err %#v: %v", objName, err, err)
|
||||
return errors.Wrap(err, "service.Objects.Insert")
|
||||
}
|
||||
|
||||
debug.Log("%v -> %v bytes", objName, wbytes)
|
||||
// sanity check
|
||||
if wbytes != rd.Length() {
|
||||
return errors.Errorf("wrote %d bytes instead of the expected %d bytes", wbytes, rd.Length())
|
||||
|
@ -268,22 +244,13 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe
|
|||
// Load runs fn with a reader that yields the contents of the file at h at the
|
||||
// given offset.
|
||||
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn)
|
||||
}
|
||||
|
||||
func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h))
|
||||
if err := h.Valid(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if offset < 0 {
|
||||
return nil, errors.New("offset is negative")
|
||||
}
|
||||
|
||||
if length < 0 {
|
||||
return nil, errors.Errorf("invalid length %d", length)
|
||||
}
|
||||
if length == 0 {
|
||||
// negative length indicates read till end to GCS lib
|
||||
length = -1
|
||||
|
@ -291,32 +258,21 @@ func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int,
|
|||
|
||||
objName := be.Filename(h)
|
||||
|
||||
be.sem.GetToken()
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
r, err := be.bucket.Object(objName).NewRangeReader(ctx, offset, int64(length))
|
||||
if err != nil {
|
||||
cancel()
|
||||
be.sem.ReleaseToken()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return be.sem.ReleaseTokenOnClose(r, cancel), err
|
||||
return r, err
|
||||
}
|
||||
|
||||
// Stat returns information about a blob.
|
||||
func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInfo, err error) {
|
||||
debug.Log("%v", h)
|
||||
|
||||
objName := be.Filename(h)
|
||||
|
||||
be.sem.GetToken()
|
||||
attr, err := be.bucket.Object(objName).Attrs(ctx)
|
||||
be.sem.ReleaseToken()
|
||||
|
||||
if err != nil {
|
||||
debug.Log("GetObjectAttributes() err %v", err)
|
||||
return restic.FileInfo{}, errors.Wrap(err, "service.Objects.Get")
|
||||
}
|
||||
|
||||
|
@ -327,23 +283,18 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf
|
|||
func (be *Backend) Remove(ctx context.Context, h restic.Handle) error {
|
||||
objName := be.Filename(h)
|
||||
|
||||
be.sem.GetToken()
|
||||
err := be.bucket.Object(objName).Delete(ctx)
|
||||
be.sem.ReleaseToken()
|
||||
|
||||
if err == storage.ErrObjectNotExist {
|
||||
if be.IsNotExist(err) {
|
||||
err = nil
|
||||
}
|
||||
|
||||
debug.Log("Remove(%v) at %v -> err %v", h, objName, err)
|
||||
return errors.Wrap(err, "client.RemoveObject")
|
||||
}
|
||||
|
||||
// List runs fn for each file in the backend which has the type t. When an
|
||||
// error occurs (or fn returns an error), List stops and returns it.
|
||||
func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error {
|
||||
debug.Log("listing %v", t)
|
||||
|
||||
prefix, _ := be.Basedir(t)
|
||||
|
||||
// make sure prefix ends with a slash
|
||||
|
@ -357,9 +308,7 @@ func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.F
|
|||
itr := be.bucket.Objects(ctx, &storage.Query{Prefix: prefix})
|
||||
|
||||
for {
|
||||
be.sem.GetToken()
|
||||
attrs, err := itr.Next()
|
||||
be.sem.ReleaseToken()
|
||||
if err == iterator.Done {
|
||||
break
|
||||
}
|
||||
|
@ -389,30 +338,9 @@ func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.F
|
|||
return ctx.Err()
|
||||
}
|
||||
|
||||
// Remove keys for a specified backend type.
|
||||
func (be *Backend) removeKeys(ctx context.Context, t restic.FileType) error {
|
||||
return be.List(ctx, t, func(fi restic.FileInfo) error {
|
||||
return be.Remove(ctx, restic.Handle{Type: t, Name: fi.Name})
|
||||
})
|
||||
}
|
||||
|
||||
// Delete removes all restic keys in the bucket. It will not remove the bucket itself.
|
||||
func (be *Backend) Delete(ctx context.Context) error {
|
||||
alltypes := []restic.FileType{
|
||||
restic.PackFile,
|
||||
restic.KeyFile,
|
||||
restic.LockFile,
|
||||
restic.SnapshotFile,
|
||||
restic.IndexFile}
|
||||
|
||||
for _, t := range alltypes {
|
||||
err := be.removeKeys(ctx, t)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return be.Remove(ctx, restic.Handle{Type: restic.ConfigFile})
|
||||
return backend.DefaultDelete(ctx, be)
|
||||
}
|
||||
|
||||
// Close does nothing.
|
||||
|
|
|
@ -42,7 +42,7 @@ func newGSTestSuite(t testing.TB) *test.Suite {
|
|||
Create: func(config interface{}) (restic.Backend, error) {
|
||||
cfg := config.(gs.Config)
|
||||
|
||||
be, err := gs.Create(cfg, tr)
|
||||
be, err := gs.Create(context.Background(), cfg, tr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -46,6 +46,8 @@ func (r rateLimitedBackend) Load(ctx context.Context, h restic.Handle, length in
|
|||
})
|
||||
}
|
||||
|
||||
func (r rateLimitedBackend) Unwrap() restic.Backend { return r.Backend }
|
||||
|
||||
type limitedReader struct {
|
||||
io.Reader
|
||||
writerTo io.WriterTo
|
||||
|
|
|
@ -10,7 +10,6 @@ import (
|
|||
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/backend/layout"
|
||||
"github.com/restic/restic/internal/backend/sema"
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/fs"
|
||||
|
@ -22,7 +21,6 @@ import (
|
|||
// Local is a backend in a local directory.
|
||||
type Local struct {
|
||||
Config
|
||||
sem sema.Semaphore
|
||||
layout.Layout
|
||||
backend.Modes
|
||||
}
|
||||
|
@ -38,11 +36,6 @@ func open(ctx context.Context, cfg Config) (*Local, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
sem, err := sema.New(cfg.Connections)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fi, err := fs.Stat(l.Filename(restic.Handle{Type: restic.ConfigFile}))
|
||||
m := backend.DeriveModesFromFileInfo(fi, err)
|
||||
debug.Log("using (%03O file, %03O dir) permissions", m.File, m.Dir)
|
||||
|
@ -50,7 +43,6 @@ func open(ctx context.Context, cfg Config) (*Local, error) {
|
|||
return &Local{
|
||||
Config: cfg,
|
||||
Layout: l,
|
||||
sem: sem,
|
||||
Modes: m,
|
||||
}, nil
|
||||
}
|
||||
|
@ -114,11 +106,6 @@ func (b *Local) IsNotExist(err error) bool {
|
|||
|
||||
// Save stores data in the backend at the handle.
|
||||
func (b *Local) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) (err error) {
|
||||
debug.Log("Save %v", h)
|
||||
if err := h.Valid(); err != nil {
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
|
||||
finalname := b.Filename(h)
|
||||
dir := filepath.Dir(finalname)
|
||||
|
||||
|
@ -129,9 +116,6 @@ func (b *Local) Save(ctx context.Context, h restic.Handle, rd restic.RewindReade
|
|||
}
|
||||
}()
|
||||
|
||||
b.sem.GetToken()
|
||||
defer b.sem.ReleaseToken()
|
||||
|
||||
// Create new file with a temporary name.
|
||||
tmpname := filepath.Base(finalname) + "-tmp-"
|
||||
f, err := tempFile(dir, tmpname)
|
||||
|
@ -217,50 +201,28 @@ func (b *Local) Load(ctx context.Context, h restic.Handle, length int, offset in
|
|||
}
|
||||
|
||||
func (b *Local) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
debug.Log("Load %v, length %v, offset %v", h, length, offset)
|
||||
if err := h.Valid(); err != nil {
|
||||
return nil, backoff.Permanent(err)
|
||||
}
|
||||
|
||||
if offset < 0 {
|
||||
return nil, errors.New("offset is negative")
|
||||
}
|
||||
|
||||
b.sem.GetToken()
|
||||
f, err := fs.Open(b.Filename(h))
|
||||
if err != nil {
|
||||
b.sem.ReleaseToken()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if offset > 0 {
|
||||
_, err = f.Seek(offset, 0)
|
||||
if err != nil {
|
||||
b.sem.ReleaseToken()
|
||||
_ = f.Close()
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
r := b.sem.ReleaseTokenOnClose(f, nil)
|
||||
|
||||
if length > 0 {
|
||||
return backend.LimitReadCloser(r, int64(length)), nil
|
||||
return backend.LimitReadCloser(f, int64(length)), nil
|
||||
}
|
||||
|
||||
return r, nil
|
||||
return f, nil
|
||||
}
|
||||
|
||||
// Stat returns information about a blob.
|
||||
func (b *Local) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) {
|
||||
debug.Log("Stat %v", h)
|
||||
if err := h.Valid(); err != nil {
|
||||
return restic.FileInfo{}, backoff.Permanent(err)
|
||||
}
|
||||
|
||||
b.sem.GetToken()
|
||||
defer b.sem.ReleaseToken()
|
||||
|
||||
fi, err := fs.Stat(b.Filename(h))
|
||||
if err != nil {
|
||||
return restic.FileInfo{}, errors.WithStack(err)
|
||||
|
@ -271,12 +233,8 @@ func (b *Local) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, err
|
|||
|
||||
// Remove removes the blob with the given name and type.
|
||||
func (b *Local) Remove(ctx context.Context, h restic.Handle) error {
|
||||
debug.Log("Remove %v", h)
|
||||
fn := b.Filename(h)
|
||||
|
||||
b.sem.GetToken()
|
||||
defer b.sem.ReleaseToken()
|
||||
|
||||
// reset read-only flag
|
||||
err := fs.Chmod(fn, 0666)
|
||||
if err != nil && !os.IsPermission(err) {
|
||||
|
@ -289,8 +247,6 @@ func (b *Local) Remove(ctx context.Context, h restic.Handle) error {
|
|||
// List runs fn for each file in the backend which has the type t. When an
|
||||
// error occurs (or fn returns an error), List stops and returns it.
|
||||
func (b *Local) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) (err error) {
|
||||
debug.Log("List %v", t)
|
||||
|
||||
basedir, subdirs := b.Basedir(t)
|
||||
if subdirs {
|
||||
err = visitDirs(ctx, basedir, fn)
|
||||
|
@ -384,13 +340,11 @@ func visitFiles(ctx context.Context, dir string, fn func(restic.FileInfo) error,
|
|||
|
||||
// Delete removes the repository and all files.
|
||||
func (b *Local) Delete(ctx context.Context) error {
|
||||
debug.Log("Delete()")
|
||||
return fs.RemoveAll(b.Path)
|
||||
}
|
||||
|
||||
// Close closes all open files.
|
||||
func (b *Local) Close() error {
|
||||
debug.Log("Close()")
|
||||
// this does not need to do anything, all open files are closed within the
|
||||
// same function.
|
||||
return nil
|
||||
|
|
79
internal/backend/logger/log.go
Normal file
79
internal/backend/logger/log.go
Normal file
|
@ -0,0 +1,79 @@
|
|||
package logger
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
)
|
||||
|
||||
type Backend struct {
|
||||
restic.Backend
|
||||
}
|
||||
|
||||
// statically ensure that Backend implements restic.Backend.
|
||||
var _ restic.Backend = &Backend{}
|
||||
|
||||
func New(be restic.Backend) *Backend {
|
||||
return &Backend{Backend: be}
|
||||
}
|
||||
|
||||
func (be *Backend) IsNotExist(err error) bool {
|
||||
isNotExist := be.Backend.IsNotExist(err)
|
||||
debug.Log("IsNotExist(%T, %#v, %v)", err, err, isNotExist)
|
||||
return isNotExist
|
||||
}
|
||||
|
||||
// Save adds new Data to the backend.
|
||||
func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
|
||||
debug.Log("Save(%v, %v)", h, rd.Length())
|
||||
err := be.Backend.Save(ctx, h, rd)
|
||||
debug.Log(" save err %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Remove deletes a file from the backend.
|
||||
func (be *Backend) Remove(ctx context.Context, h restic.Handle) error {
|
||||
debug.Log("Remove(%v)", h)
|
||||
err := be.Backend.Remove(ctx, h)
|
||||
debug.Log(" remove err %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(io.Reader) error) error {
|
||||
debug.Log("Load(%v, length %v, offset %v)", h, length, offset)
|
||||
err := be.Backend.Load(ctx, h, length, offset, fn)
|
||||
debug.Log(" load err %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (be *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) {
|
||||
debug.Log("Stat(%v)", h)
|
||||
fi, err := be.Backend.Stat(ctx, h)
|
||||
debug.Log(" stat err %v", err)
|
||||
return fi, err
|
||||
}
|
||||
|
||||
func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error {
|
||||
debug.Log("List(%v)", t)
|
||||
err := be.Backend.List(ctx, t, fn)
|
||||
debug.Log(" list err %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (be *Backend) Delete(ctx context.Context) error {
|
||||
debug.Log("Delete()")
|
||||
err := be.Backend.Delete(ctx)
|
||||
debug.Log(" delete err %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (be *Backend) Close() error {
|
||||
debug.Log("Close()")
|
||||
err := be.Backend.Close()
|
||||
debug.Log(" close err %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (be *Backend) Unwrap() restic.Backend { return be.Backend }
|
|
@ -10,12 +10,9 @@ import (
|
|||
|
||||
"github.com/cespare/xxhash/v2"
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/backend/sema"
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
)
|
||||
|
||||
type memMap map[restic.Handle][]byte
|
||||
|
@ -32,19 +29,12 @@ const connectionCount = 2
|
|||
type MemoryBackend struct {
|
||||
data memMap
|
||||
m sync.Mutex
|
||||
sem sema.Semaphore
|
||||
}
|
||||
|
||||
// New returns a new backend that saves all data in a map in memory.
|
||||
func New() *MemoryBackend {
|
||||
sem, err := sema.New(connectionCount)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
be := &MemoryBackend{
|
||||
data: make(memMap),
|
||||
sem: sem,
|
||||
}
|
||||
|
||||
debug.Log("created new memory backend")
|
||||
|
@ -59,13 +49,6 @@ func (be *MemoryBackend) IsNotExist(err error) bool {
|
|||
|
||||
// Save adds new Data to the backend.
|
||||
func (be *MemoryBackend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
|
||||
if err := h.Valid(); err != nil {
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
|
||||
be.sem.GetToken()
|
||||
defer be.sem.ReleaseToken()
|
||||
|
||||
be.m.Lock()
|
||||
defer be.m.Unlock()
|
||||
|
||||
|
@ -102,7 +85,6 @@ func (be *MemoryBackend) Save(ctx context.Context, h restic.Handle, rd restic.Re
|
|||
}
|
||||
|
||||
be.data[h] = buf
|
||||
debug.Log("saved %v bytes at %v", len(buf), h)
|
||||
|
||||
return ctx.Err()
|
||||
}
|
||||
|
@ -114,11 +96,6 @@ func (be *MemoryBackend) Load(ctx context.Context, h restic.Handle, length int,
|
|||
}
|
||||
|
||||
func (be *MemoryBackend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
if err := h.Valid(); err != nil {
|
||||
return nil, backoff.Permanent(err)
|
||||
}
|
||||
|
||||
be.sem.GetToken()
|
||||
be.m.Lock()
|
||||
defer be.m.Unlock()
|
||||
|
||||
|
@ -127,21 +104,12 @@ func (be *MemoryBackend) openReader(ctx context.Context, h restic.Handle, length
|
|||
h.Name = ""
|
||||
}
|
||||
|
||||
debug.Log("Load %v offset %v len %v", h, offset, length)
|
||||
|
||||
if offset < 0 {
|
||||
be.sem.ReleaseToken()
|
||||
return nil, errors.New("offset is negative")
|
||||
}
|
||||
|
||||
if _, ok := be.data[h]; !ok {
|
||||
be.sem.ReleaseToken()
|
||||
return nil, errNotFound
|
||||
}
|
||||
|
||||
buf := be.data[h]
|
||||
if offset > int64(len(buf)) {
|
||||
be.sem.ReleaseToken()
|
||||
return nil, errors.New("offset beyond end of file")
|
||||
}
|
||||
|
||||
|
@ -150,18 +118,11 @@ func (be *MemoryBackend) openReader(ctx context.Context, h restic.Handle, length
|
|||
buf = buf[:length]
|
||||
}
|
||||
|
||||
return be.sem.ReleaseTokenOnClose(io.NopCloser(bytes.NewReader(buf)), nil), ctx.Err()
|
||||
return io.NopCloser(bytes.NewReader(buf)), ctx.Err()
|
||||
}
|
||||
|
||||
// Stat returns information about a file in the backend.
|
||||
func (be *MemoryBackend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) {
|
||||
if err := h.Valid(); err != nil {
|
||||
return restic.FileInfo{}, backoff.Permanent(err)
|
||||
}
|
||||
|
||||
be.sem.GetToken()
|
||||
defer be.sem.ReleaseToken()
|
||||
|
||||
be.m.Lock()
|
||||
defer be.m.Unlock()
|
||||
|
||||
|
@ -170,8 +131,6 @@ func (be *MemoryBackend) Stat(ctx context.Context, h restic.Handle) (restic.File
|
|||
h.Name = ""
|
||||
}
|
||||
|
||||
debug.Log("stat %v", h)
|
||||
|
||||
e, ok := be.data[h]
|
||||
if !ok {
|
||||
return restic.FileInfo{}, errNotFound
|
||||
|
@ -182,14 +141,9 @@ func (be *MemoryBackend) Stat(ctx context.Context, h restic.Handle) (restic.File
|
|||
|
||||
// Remove deletes a file from the backend.
|
||||
func (be *MemoryBackend) Remove(ctx context.Context, h restic.Handle) error {
|
||||
be.sem.GetToken()
|
||||
defer be.sem.ReleaseToken()
|
||||
|
||||
be.m.Lock()
|
||||
defer be.m.Unlock()
|
||||
|
||||
debug.Log("Remove %v", h)
|
||||
|
||||
h.ContainedBlobType = restic.InvalidBlob
|
||||
if _, ok := be.data[h]; !ok {
|
||||
return errNotFound
|
||||
|
|
|
@ -11,13 +11,11 @@ import (
|
|||
"path"
|
||||
"strings"
|
||||
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/backend/layout"
|
||||
"github.com/restic/restic/internal/backend/sema"
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
)
|
||||
|
||||
// make sure the rest backend implements restic.Backend
|
||||
|
@ -27,7 +25,6 @@ var _ restic.Backend = &Backend{}
|
|||
type Backend struct {
|
||||
url *url.URL
|
||||
connections uint
|
||||
sem sema.Semaphore
|
||||
client http.Client
|
||||
layout.Layout
|
||||
}
|
||||
|
@ -40,11 +37,6 @@ const (
|
|||
|
||||
// Open opens the REST backend with the given config.
|
||||
func Open(cfg Config, rt http.RoundTripper) (*Backend, error) {
|
||||
sem, err := sema.New(cfg.Connections)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// use url without trailing slash for layout
|
||||
url := cfg.URL.String()
|
||||
if url[len(url)-1] == '/' {
|
||||
|
@ -56,7 +48,6 @@ func Open(cfg Config, rt http.RoundTripper) (*Backend, error) {
|
|||
client: http.Client{Transport: rt},
|
||||
Layout: &layout.RESTLayout{URL: url, Join: path.Join},
|
||||
connections: cfg.Connections,
|
||||
sem: sem,
|
||||
}
|
||||
|
||||
return be, nil
|
||||
|
@ -123,10 +114,6 @@ func (b *Backend) HasAtomicReplace() bool {
|
|||
|
||||
// Save stores data in the backend at the handle.
|
||||
func (b *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
|
||||
if err := h.Valid(); err != nil {
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
|
@ -143,9 +130,7 @@ func (b *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRea
|
|||
// let's the server know what's coming.
|
||||
req.ContentLength = rd.Length()
|
||||
|
||||
b.sem.GetToken()
|
||||
resp, err := b.client.Do(req)
|
||||
b.sem.ReleaseToken()
|
||||
|
||||
var cerr error
|
||||
if resp != nil {
|
||||
|
@ -212,19 +197,6 @@ func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset
|
|||
}
|
||||
|
||||
func (b *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
debug.Log("Load %v, length %v, offset %v", h, length, offset)
|
||||
if err := h.Valid(); err != nil {
|
||||
return nil, backoff.Permanent(err)
|
||||
}
|
||||
|
||||
if offset < 0 {
|
||||
return nil, errors.New("offset is negative")
|
||||
}
|
||||
|
||||
if length < 0 {
|
||||
return nil, errors.Errorf("invalid length %d", length)
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", b.Filename(h), nil)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
|
@ -236,11 +208,8 @@ func (b *Backend) openReader(ctx context.Context, h restic.Handle, length int, o
|
|||
}
|
||||
req.Header.Set("Range", byteRange)
|
||||
req.Header.Set("Accept", ContentTypeV2)
|
||||
debug.Log("Load(%v) send range %v", h, byteRange)
|
||||
|
||||
b.sem.GetToken()
|
||||
resp, err := b.client.Do(req)
|
||||
b.sem.ReleaseToken()
|
||||
|
||||
if err != nil {
|
||||
if resp != nil {
|
||||
|
@ -265,19 +234,13 @@ func (b *Backend) openReader(ctx context.Context, h restic.Handle, length int, o
|
|||
|
||||
// Stat returns information about a blob.
|
||||
func (b *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) {
|
||||
if err := h.Valid(); err != nil {
|
||||
return restic.FileInfo{}, backoff.Permanent(err)
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodHead, b.Filename(h), nil)
|
||||
if err != nil {
|
||||
return restic.FileInfo{}, errors.WithStack(err)
|
||||
}
|
||||
req.Header.Set("Accept", ContentTypeV2)
|
||||
|
||||
b.sem.GetToken()
|
||||
resp, err := b.client.Do(req)
|
||||
b.sem.ReleaseToken()
|
||||
if err != nil {
|
||||
return restic.FileInfo{}, errors.WithStack(err)
|
||||
}
|
||||
|
@ -310,19 +273,13 @@ func (b *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, e
|
|||
|
||||
// Remove removes the blob with the given name and type.
|
||||
func (b *Backend) Remove(ctx context.Context, h restic.Handle) error {
|
||||
if err := h.Valid(); err != nil {
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "DELETE", b.Filename(h), nil)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
req.Header.Set("Accept", ContentTypeV2)
|
||||
|
||||
b.sem.GetToken()
|
||||
resp, err := b.client.Do(req)
|
||||
b.sem.ReleaseToken()
|
||||
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "client.Do")
|
||||
|
@ -359,9 +316,7 @@ func (b *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.Fi
|
|||
}
|
||||
req.Header.Set("Accept", ContentTypeV2)
|
||||
|
||||
b.sem.GetToken()
|
||||
resp, err := b.client.Do(req)
|
||||
b.sem.ReleaseToken()
|
||||
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "List")
|
||||
|
@ -457,32 +412,7 @@ func (b *Backend) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Remove keys for a specified backend type.
|
||||
func (b *Backend) removeKeys(ctx context.Context, t restic.FileType) error {
|
||||
return b.List(ctx, t, func(fi restic.FileInfo) error {
|
||||
return b.Remove(ctx, restic.Handle{Type: t, Name: fi.Name})
|
||||
})
|
||||
}
|
||||
|
||||
// Delete removes all data in the backend.
|
||||
func (b *Backend) Delete(ctx context.Context) error {
|
||||
alltypes := []restic.FileType{
|
||||
restic.PackFile,
|
||||
restic.KeyFile,
|
||||
restic.LockFile,
|
||||
restic.SnapshotFile,
|
||||
restic.IndexFile}
|
||||
|
||||
for _, t := range alltypes {
|
||||
err := b.removeKeys(ctx, t)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
err := b.Remove(ctx, restic.Handle{Type: restic.ConfigFile})
|
||||
if err != nil && b.IsNotExist(err) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
return backend.DefaultDelete(ctx, b)
|
||||
}
|
||||
|
|
|
@ -191,3 +191,7 @@ func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.F
|
|||
|
||||
return err
|
||||
}
|
||||
|
||||
func (be *Backend) Unwrap() restic.Backend {
|
||||
return be.Backend
|
||||
}
|
||||
|
|
|
@ -13,12 +13,10 @@ import (
|
|||
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/backend/layout"
|
||||
"github.com/restic/restic/internal/backend/sema"
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/minio/minio-go/v7"
|
||||
"github.com/minio/minio-go/v7/pkg/credentials"
|
||||
)
|
||||
|
@ -26,7 +24,6 @@ import (
|
|||
// Backend stores data on an S3 endpoint.
|
||||
type Backend struct {
|
||||
client *minio.Client
|
||||
sem sema.Semaphore
|
||||
cfg Config
|
||||
layout.Layout
|
||||
}
|
||||
|
@ -102,14 +99,8 @@ func open(ctx context.Context, cfg Config, rt http.RoundTripper) (*Backend, erro
|
|||
return nil, errors.Wrap(err, "minio.New")
|
||||
}
|
||||
|
||||
sem, err := sema.New(cfg.Connections)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
be := &Backend{
|
||||
client: client,
|
||||
sem: sem,
|
||||
cfg: cfg,
|
||||
}
|
||||
|
||||
|
@ -169,8 +160,6 @@ func isAccessDenied(err error) bool {
|
|||
|
||||
// IsNotExist returns true if the error is caused by a not existing file.
|
||||
func (be *Backend) IsNotExist(err error) bool {
|
||||
debug.Log("IsNotExist(%T, %#v)", err, err)
|
||||
|
||||
var e minio.ErrorResponse
|
||||
return errors.As(err, &e) && e.Code == "NoSuchKey"
|
||||
}
|
||||
|
@ -273,17 +262,8 @@ func (be *Backend) Path() string {
|
|||
|
||||
// Save stores data in the backend at the handle.
|
||||
func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
|
||||
debug.Log("Save %v", h)
|
||||
|
||||
if err := h.Valid(); err != nil {
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
|
||||
objName := be.Filename(h)
|
||||
|
||||
be.sem.GetToken()
|
||||
defer be.sem.ReleaseToken()
|
||||
|
||||
opts := minio.PutObjectOptions{StorageClass: be.cfg.StorageClass}
|
||||
opts.ContentType = "application/octet-stream"
|
||||
// the only option with the high-level api is to let the library handle the checksum computation
|
||||
|
@ -291,11 +271,8 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe
|
|||
// only use multipart uploads for very large files
|
||||
opts.PartSize = 200 * 1024 * 1024
|
||||
|
||||
debug.Log("PutObject(%v, %v, %v)", be.cfg.Bucket, objName, rd.Length())
|
||||
info, err := be.client.PutObject(ctx, be.cfg.Bucket, objName, io.NopCloser(rd), int64(rd.Length()), opts)
|
||||
|
||||
debug.Log("%v -> %v bytes, err %#v: %v", objName, info.Size, err, err)
|
||||
|
||||
// sanity check
|
||||
if err == nil && info.Size != rd.Length() {
|
||||
return errors.Errorf("wrote %d bytes instead of the expected %d bytes", info.Size, rd.Length())
|
||||
|
@ -307,32 +284,20 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe
|
|||
// Load runs fn with a reader that yields the contents of the file at h at the
|
||||
// given offset.
|
||||
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn)
|
||||
}
|
||||
|
||||
func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h))
|
||||
if err := h.Valid(); err != nil {
|
||||
return nil, backoff.Permanent(err)
|
||||
}
|
||||
|
||||
if offset < 0 {
|
||||
return nil, errors.New("offset is negative")
|
||||
}
|
||||
|
||||
if length < 0 {
|
||||
return nil, errors.Errorf("invalid length %d", length)
|
||||
}
|
||||
|
||||
objName := be.Filename(h)
|
||||
opts := minio.GetObjectOptions{}
|
||||
|
||||
var err error
|
||||
if length > 0 {
|
||||
debug.Log("range: %v-%v", offset, offset+int64(length)-1)
|
||||
err = opts.SetRange(offset, offset+int64(length)-1)
|
||||
} else if offset > 0 {
|
||||
debug.Log("range: %v-", offset)
|
||||
err = opts.SetRange(offset, 0)
|
||||
}
|
||||
|
||||
|
@ -340,41 +305,30 @@ func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int,
|
|||
return nil, errors.Wrap(err, "SetRange")
|
||||
}
|
||||
|
||||
be.sem.GetToken()
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
coreClient := minio.Core{Client: be.client}
|
||||
rd, _, _, err := coreClient.GetObject(ctx, be.cfg.Bucket, objName, opts)
|
||||
if err != nil {
|
||||
cancel()
|
||||
be.sem.ReleaseToken()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return be.sem.ReleaseTokenOnClose(rd, cancel), err
|
||||
return rd, err
|
||||
}
|
||||
|
||||
// Stat returns information about a blob.
|
||||
func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInfo, err error) {
|
||||
debug.Log("%v", h)
|
||||
|
||||
objName := be.Filename(h)
|
||||
var obj *minio.Object
|
||||
|
||||
opts := minio.GetObjectOptions{}
|
||||
|
||||
be.sem.GetToken()
|
||||
obj, err = be.client.GetObject(ctx, be.cfg.Bucket, objName, opts)
|
||||
if err != nil {
|
||||
debug.Log("GetObject() err %v", err)
|
||||
be.sem.ReleaseToken()
|
||||
return restic.FileInfo{}, errors.Wrap(err, "client.GetObject")
|
||||
}
|
||||
|
||||
// make sure that the object is closed properly.
|
||||
defer func() {
|
||||
e := obj.Close()
|
||||
be.sem.ReleaseToken()
|
||||
if err == nil {
|
||||
err = errors.Wrap(e, "Close")
|
||||
}
|
||||
|
@ -382,7 +336,6 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf
|
|||
|
||||
fi, err := obj.Stat()
|
||||
if err != nil {
|
||||
debug.Log("Stat() err %v", err)
|
||||
return restic.FileInfo{}, errors.Wrap(err, "Stat")
|
||||
}
|
||||
|
||||
|
@ -393,11 +346,7 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf
|
|||
func (be *Backend) Remove(ctx context.Context, h restic.Handle) error {
|
||||
objName := be.Filename(h)
|
||||
|
||||
be.sem.GetToken()
|
||||
err := be.client.RemoveObject(ctx, be.cfg.Bucket, objName, minio.RemoveObjectOptions{})
|
||||
be.sem.ReleaseToken()
|
||||
|
||||
debug.Log("Remove(%v) at %v -> err %v", h, objName, err)
|
||||
|
||||
if be.IsNotExist(err) {
|
||||
err = nil
|
||||
|
@ -409,8 +358,6 @@ func (be *Backend) Remove(ctx context.Context, h restic.Handle) error {
|
|||
// List runs fn for each file in the backend which has the type t. When an
|
||||
// error occurs (or fn returns an error), List stops and returns it.
|
||||
func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error {
|
||||
debug.Log("listing %v", t)
|
||||
|
||||
prefix, recursive := be.Basedir(t)
|
||||
|
||||
// make sure prefix ends with a slash
|
||||
|
@ -464,30 +411,9 @@ func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.F
|
|||
return ctx.Err()
|
||||
}
|
||||
|
||||
// Remove keys for a specified backend type.
|
||||
func (be *Backend) removeKeys(ctx context.Context, t restic.FileType) error {
|
||||
return be.List(ctx, restic.PackFile, func(fi restic.FileInfo) error {
|
||||
return be.Remove(ctx, restic.Handle{Type: t, Name: fi.Name})
|
||||
})
|
||||
}
|
||||
|
||||
// Delete removes all restic keys in the bucket. It will not remove the bucket itself.
|
||||
func (be *Backend) Delete(ctx context.Context) error {
|
||||
alltypes := []restic.FileType{
|
||||
restic.PackFile,
|
||||
restic.KeyFile,
|
||||
restic.LockFile,
|
||||
restic.SnapshotFile,
|
||||
restic.IndexFile}
|
||||
|
||||
for _, t := range alltypes {
|
||||
err := be.removeKeys(ctx, t)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return be.Remove(ctx, restic.Handle{Type: restic.ConfigFile})
|
||||
return backend.DefaultDelete(ctx, be)
|
||||
}
|
||||
|
||||
// Close does nothing
|
||||
|
|
91
internal/backend/sema/backend.go
Normal file
91
internal/backend/sema/backend.go
Normal file
|
@ -0,0 +1,91 @@
|
|||
package sema
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
)
|
||||
|
||||
// make sure that connectionLimitedBackend implements restic.Backend
|
||||
var _ restic.Backend = &connectionLimitedBackend{}
|
||||
|
||||
// connectionLimitedBackend limits the number of concurrent operations.
|
||||
type connectionLimitedBackend struct {
|
||||
restic.Backend
|
||||
sem semaphore
|
||||
}
|
||||
|
||||
// NewBackend creates a backend that limits the concurrent operations on the underlying backend
|
||||
func NewBackend(be restic.Backend) restic.Backend {
|
||||
sem, err := newSemaphore(be.Connections())
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return &connectionLimitedBackend{
|
||||
Backend: be,
|
||||
sem: sem,
|
||||
}
|
||||
}
|
||||
|
||||
// Save adds new Data to the backend.
|
||||
func (be *connectionLimitedBackend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
|
||||
if err := h.Valid(); err != nil {
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
|
||||
be.sem.GetToken()
|
||||
defer be.sem.ReleaseToken()
|
||||
|
||||
return be.Backend.Save(ctx, h, rd)
|
||||
}
|
||||
|
||||
// Load runs fn with a reader that yields the contents of the file at h at the
|
||||
// given offset.
|
||||
func (be *connectionLimitedBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||
if err := h.Valid(); err != nil {
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
if offset < 0 {
|
||||
return backoff.Permanent(errors.New("offset is negative"))
|
||||
}
|
||||
if length < 0 {
|
||||
return backoff.Permanent(errors.Errorf("invalid length %d", length))
|
||||
}
|
||||
|
||||
be.sem.GetToken()
|
||||
defer be.sem.ReleaseToken()
|
||||
|
||||
return be.Backend.Load(ctx, h, length, offset, fn)
|
||||
}
|
||||
|
||||
// Stat returns information about a file in the backend.
|
||||
func (be *connectionLimitedBackend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) {
|
||||
if err := h.Valid(); err != nil {
|
||||
return restic.FileInfo{}, backoff.Permanent(err)
|
||||
}
|
||||
|
||||
be.sem.GetToken()
|
||||
defer be.sem.ReleaseToken()
|
||||
|
||||
return be.Backend.Stat(ctx, h)
|
||||
}
|
||||
|
||||
// Remove deletes a file from the backend.
|
||||
func (be *connectionLimitedBackend) Remove(ctx context.Context, h restic.Handle) error {
|
||||
if err := h.Valid(); err != nil {
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
|
||||
be.sem.GetToken()
|
||||
defer be.sem.ReleaseToken()
|
||||
|
||||
return be.Backend.Remove(ctx, h)
|
||||
}
|
||||
|
||||
func (be *connectionLimitedBackend) Unwrap() restic.Backend {
|
||||
return be.Backend
|
||||
}
|
180
internal/backend/sema/backend_test.go
Normal file
180
internal/backend/sema/backend_test.go
Normal file
|
@ -0,0 +1,180 @@
|
|||
package sema_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/restic/restic/internal/backend/mock"
|
||||
"github.com/restic/restic/internal/backend/sema"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
"github.com/restic/restic/internal/test"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func TestParameterValidationSave(t *testing.T) {
|
||||
m := mock.NewBackend()
|
||||
m.SaveFn = func(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
|
||||
return nil
|
||||
}
|
||||
be := sema.NewBackend(m)
|
||||
|
||||
err := be.Save(context.TODO(), restic.Handle{}, nil)
|
||||
test.Assert(t, err != nil, "Save() with invalid handle did not return an error")
|
||||
}
|
||||
|
||||
func TestParameterValidationLoad(t *testing.T) {
|
||||
m := mock.NewBackend()
|
||||
m.OpenReaderFn = func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
return io.NopCloser(nil), nil
|
||||
}
|
||||
|
||||
be := sema.NewBackend(m)
|
||||
nilCb := func(rd io.Reader) error { return nil }
|
||||
|
||||
err := be.Load(context.TODO(), restic.Handle{}, 10, 0, nilCb)
|
||||
test.Assert(t, err != nil, "Load() with invalid handle did not return an error")
|
||||
|
||||
h := restic.Handle{Type: restic.PackFile, Name: "foobar"}
|
||||
err = be.Load(context.TODO(), h, 10, -1, nilCb)
|
||||
test.Assert(t, err != nil, "Save() with negative offset did not return an error")
|
||||
err = be.Load(context.TODO(), h, -1, 0, nilCb)
|
||||
test.Assert(t, err != nil, "Save() with negative length did not return an error")
|
||||
}
|
||||
|
||||
func TestParameterValidationStat(t *testing.T) {
|
||||
m := mock.NewBackend()
|
||||
m.StatFn = func(ctx context.Context, h restic.Handle) (restic.FileInfo, error) {
|
||||
return restic.FileInfo{}, nil
|
||||
}
|
||||
be := sema.NewBackend(m)
|
||||
|
||||
_, err := be.Stat(context.TODO(), restic.Handle{})
|
||||
test.Assert(t, err != nil, "Stat() with invalid handle did not return an error")
|
||||
}
|
||||
|
||||
func TestParameterValidationRemove(t *testing.T) {
|
||||
m := mock.NewBackend()
|
||||
m.RemoveFn = func(ctx context.Context, h restic.Handle) error {
|
||||
return nil
|
||||
}
|
||||
be := sema.NewBackend(m)
|
||||
|
||||
err := be.Remove(context.TODO(), restic.Handle{})
|
||||
test.Assert(t, err != nil, "Remove() with invalid handle did not return an error")
|
||||
}
|
||||
|
||||
func TestUnwrap(t *testing.T) {
|
||||
m := mock.NewBackend()
|
||||
be := sema.NewBackend(m)
|
||||
|
||||
unwrapper := be.(restic.BackendUnwrapper)
|
||||
test.Assert(t, unwrapper.Unwrap() == m, "Unwrap() returned wrong backend")
|
||||
}
|
||||
|
||||
func countingBlocker() (func(), func(int) int) {
|
||||
ctr := int64(0)
|
||||
blocker := make(chan struct{})
|
||||
|
||||
wait := func() {
|
||||
// count how many goroutines were allowed by the semaphore
|
||||
atomic.AddInt64(&ctr, 1)
|
||||
// block until the test can retrieve the counter
|
||||
<-blocker
|
||||
}
|
||||
|
||||
unblock := func(expected int) int {
|
||||
// give goroutines enough time to block
|
||||
var blocked int64
|
||||
for i := 0; i < 100 && blocked != int64(expected); i++ {
|
||||
time.Sleep(100 * time.Microsecond)
|
||||
blocked = atomic.LoadInt64(&ctr)
|
||||
}
|
||||
close(blocker)
|
||||
return int(blocked)
|
||||
}
|
||||
|
||||
return wait, unblock
|
||||
}
|
||||
|
||||
func concurrencyTester(t *testing.T, setup func(m *mock.Backend), handler func(be restic.Backend) func() error, unblock func(int) int) {
|
||||
expectBlocked := int(2)
|
||||
|
||||
m := mock.NewBackend()
|
||||
setup(m)
|
||||
m.ConnectionsFn = func() uint { return uint(expectBlocked) }
|
||||
be := sema.NewBackend(m)
|
||||
|
||||
var wg errgroup.Group
|
||||
for i := 0; i < int(expectBlocked+1); i++ {
|
||||
wg.Go(handler(be))
|
||||
}
|
||||
|
||||
blocked := unblock(expectBlocked)
|
||||
test.Assert(t, blocked == expectBlocked, "Unexpected number of goroutines blocked: %v", blocked)
|
||||
test.OK(t, wg.Wait())
|
||||
}
|
||||
|
||||
func TestConcurrencyLimitSave(t *testing.T) {
|
||||
wait, unblock := countingBlocker()
|
||||
concurrencyTester(t, func(m *mock.Backend) {
|
||||
m.SaveFn = func(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
|
||||
wait()
|
||||
return nil
|
||||
}
|
||||
}, func(be restic.Backend) func() error {
|
||||
return func() error {
|
||||
h := restic.Handle{Type: restic.PackFile, Name: "foobar"}
|
||||
return be.Save(context.TODO(), h, nil)
|
||||
}
|
||||
}, unblock)
|
||||
}
|
||||
|
||||
func TestConcurrencyLimitLoad(t *testing.T) {
|
||||
wait, unblock := countingBlocker()
|
||||
concurrencyTester(t, func(m *mock.Backend) {
|
||||
m.OpenReaderFn = func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
wait()
|
||||
return io.NopCloser(nil), nil
|
||||
}
|
||||
}, func(be restic.Backend) func() error {
|
||||
return func() error {
|
||||
h := restic.Handle{Type: restic.PackFile, Name: "foobar"}
|
||||
nilCb := func(rd io.Reader) error { return nil }
|
||||
return be.Load(context.TODO(), h, 10, 0, nilCb)
|
||||
}
|
||||
}, unblock)
|
||||
}
|
||||
|
||||
func TestConcurrencyLimitStat(t *testing.T) {
|
||||
wait, unblock := countingBlocker()
|
||||
concurrencyTester(t, func(m *mock.Backend) {
|
||||
m.StatFn = func(ctx context.Context, h restic.Handle) (restic.FileInfo, error) {
|
||||
wait()
|
||||
return restic.FileInfo{}, nil
|
||||
}
|
||||
}, func(be restic.Backend) func() error {
|
||||
return func() error {
|
||||
h := restic.Handle{Type: restic.PackFile, Name: "foobar"}
|
||||
_, err := be.Stat(context.TODO(), h)
|
||||
return err
|
||||
}
|
||||
}, unblock)
|
||||
}
|
||||
|
||||
func TestConcurrencyLimitDelete(t *testing.T) {
|
||||
wait, unblock := countingBlocker()
|
||||
concurrencyTester(t, func(m *mock.Backend) {
|
||||
m.RemoveFn = func(ctx context.Context, h restic.Handle) error {
|
||||
wait()
|
||||
return nil
|
||||
}
|
||||
}, func(be restic.Backend) func() error {
|
||||
return func() error {
|
||||
h := restic.Handle{Type: restic.PackFile, Name: "foobar"}
|
||||
return be.Remove(context.TODO(), h)
|
||||
}
|
||||
}, unblock)
|
||||
}
|
|
@ -2,64 +2,30 @@
|
|||
package sema
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
)
|
||||
|
||||
// A Semaphore limits access to a restricted resource.
|
||||
type Semaphore struct {
|
||||
// A semaphore limits access to a restricted resource.
|
||||
type semaphore struct {
|
||||
ch chan struct{}
|
||||
}
|
||||
|
||||
// New returns a new semaphore with capacity n.
|
||||
func New(n uint) (Semaphore, error) {
|
||||
// newSemaphore returns a new semaphore with capacity n.
|
||||
func newSemaphore(n uint) (semaphore, error) {
|
||||
if n == 0 {
|
||||
return Semaphore{}, errors.New("capacity must be a positive number")
|
||||
return semaphore{}, errors.New("capacity must be a positive number")
|
||||
}
|
||||
return Semaphore{
|
||||
return semaphore{
|
||||
ch: make(chan struct{}, n),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetToken blocks until a Token is available.
|
||||
func (s Semaphore) GetToken() { s.ch <- struct{}{} }
|
||||
func (s semaphore) GetToken() {
|
||||
s.ch <- struct{}{}
|
||||
debug.Log("acquired token")
|
||||
}
|
||||
|
||||
// ReleaseToken returns a token.
|
||||
func (s Semaphore) ReleaseToken() { <-s.ch }
|
||||
|
||||
// ReleaseTokenOnClose wraps an io.ReadCloser to return a token on Close.
|
||||
// Before returning the token, cancel, if not nil, will be run
|
||||
// to free up context resources.
|
||||
func (s Semaphore) ReleaseTokenOnClose(rc io.ReadCloser, cancel context.CancelFunc) io.ReadCloser {
|
||||
return &wrapReader{ReadCloser: rc, sem: s, cancel: cancel}
|
||||
}
|
||||
|
||||
type wrapReader struct {
|
||||
io.ReadCloser
|
||||
eofSeen bool
|
||||
sem Semaphore
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func (wr *wrapReader) Read(p []byte) (int, error) {
|
||||
if wr.eofSeen { // XXX Why do we do this?
|
||||
return 0, io.EOF
|
||||
}
|
||||
|
||||
n, err := wr.ReadCloser.Read(p)
|
||||
if err == io.EOF {
|
||||
wr.eofSeen = true
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (wr *wrapReader) Close() error {
|
||||
err := wr.ReadCloser.Close()
|
||||
if wr.cancel != nil {
|
||||
wr.cancel()
|
||||
}
|
||||
wr.sem.ReleaseToken()
|
||||
return err
|
||||
}
|
||||
func (s semaphore) ReleaseToken() { <-s.ch }
|
||||
|
|
|
@ -15,7 +15,6 @@ import (
|
|||
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/backend/layout"
|
||||
"github.com/restic/restic/internal/backend/sema"
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
|
@ -35,7 +34,6 @@ type SFTP struct {
|
|||
|
||||
posixRename bool
|
||||
|
||||
sem sema.Semaphore
|
||||
layout.Layout
|
||||
Config
|
||||
backend.Modes
|
||||
|
@ -140,11 +138,7 @@ func Open(ctx context.Context, cfg Config) (*SFTP, error) {
|
|||
}
|
||||
|
||||
func open(ctx context.Context, sftp *SFTP, cfg Config) (*SFTP, error) {
|
||||
sem, err := sema.New(cfg.Connections)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var err error
|
||||
sftp.Layout, err = layout.ParseLayout(ctx, sftp, cfg.Layout, defaultLayout, cfg.Path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -158,7 +152,6 @@ func open(ctx context.Context, sftp *SFTP, cfg Config) (*SFTP, error) {
|
|||
|
||||
sftp.Config = cfg
|
||||
sftp.p = cfg.Path
|
||||
sftp.sem = sem
|
||||
sftp.Modes = m
|
||||
return sftp, nil
|
||||
}
|
||||
|
@ -304,22 +297,14 @@ func tempSuffix() string {
|
|||
|
||||
// Save stores data in the backend at the handle.
|
||||
func (r *SFTP) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
|
||||
debug.Log("Save %v", h)
|
||||
if err := r.clientError(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := h.Valid(); err != nil {
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
|
||||
filename := r.Filename(h)
|
||||
tmpFilename := filename + "-restic-temp-" + tempSuffix()
|
||||
dirname := r.Dirname(h)
|
||||
|
||||
r.sem.GetToken()
|
||||
defer r.sem.ReleaseToken()
|
||||
|
||||
// create new file
|
||||
f, err := r.c.OpenFile(tmpFilename, os.O_CREATE|os.O_EXCL|os.O_WRONLY)
|
||||
|
||||
|
@ -415,77 +400,35 @@ func (r *SFTP) Load(ctx context.Context, h restic.Handle, length int, offset int
|
|||
return backend.DefaultLoad(ctx, h, length, offset, r.openReader, fn)
|
||||
}
|
||||
|
||||
// wrapReader wraps an io.ReadCloser to run an additional function on Close.
|
||||
type wrapReader struct {
|
||||
io.ReadCloser
|
||||
io.WriterTo
|
||||
f func()
|
||||
}
|
||||
|
||||
func (wr *wrapReader) Close() error {
|
||||
err := wr.ReadCloser.Close()
|
||||
wr.f()
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *SFTP) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
debug.Log("Load %v, length %v, offset %v", h, length, offset)
|
||||
if err := h.Valid(); err != nil {
|
||||
return nil, backoff.Permanent(err)
|
||||
}
|
||||
|
||||
if offset < 0 {
|
||||
return nil, errors.New("offset is negative")
|
||||
}
|
||||
|
||||
r.sem.GetToken()
|
||||
f, err := r.c.Open(r.Filename(h))
|
||||
if err != nil {
|
||||
r.sem.ReleaseToken()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if offset > 0 {
|
||||
_, err = f.Seek(offset, 0)
|
||||
if err != nil {
|
||||
r.sem.ReleaseToken()
|
||||
_ = f.Close()
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// use custom close wrapper to also provide WriteTo() on the wrapper
|
||||
rd := &wrapReader{
|
||||
ReadCloser: f,
|
||||
WriterTo: f,
|
||||
f: func() {
|
||||
r.sem.ReleaseToken()
|
||||
},
|
||||
}
|
||||
|
||||
if length > 0 {
|
||||
// unlimited reads usually use io.Copy which needs WriteTo support at the underlying reader
|
||||
// limited reads are usually combined with io.ReadFull which reads all required bytes into a buffer in one go
|
||||
return backend.LimitReadCloser(rd, int64(length)), nil
|
||||
return backend.LimitReadCloser(f, int64(length)), nil
|
||||
}
|
||||
|
||||
return rd, nil
|
||||
return f, nil
|
||||
}
|
||||
|
||||
// Stat returns information about a blob.
|
||||
func (r *SFTP) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) {
|
||||
debug.Log("Stat(%v)", h)
|
||||
if err := r.clientError(); err != nil {
|
||||
return restic.FileInfo{}, err
|
||||
}
|
||||
|
||||
if err := h.Valid(); err != nil {
|
||||
return restic.FileInfo{}, backoff.Permanent(err)
|
||||
}
|
||||
|
||||
r.sem.GetToken()
|
||||
defer r.sem.ReleaseToken()
|
||||
|
||||
fi, err := r.c.Lstat(r.Filename(h))
|
||||
if err != nil {
|
||||
return restic.FileInfo{}, errors.Wrap(err, "Lstat")
|
||||
|
@ -496,28 +439,20 @@ func (r *SFTP) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, erro
|
|||
|
||||
// Remove removes the content stored at name.
|
||||
func (r *SFTP) Remove(ctx context.Context, h restic.Handle) error {
|
||||
debug.Log("Remove(%v)", h)
|
||||
if err := r.clientError(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r.sem.GetToken()
|
||||
defer r.sem.ReleaseToken()
|
||||
|
||||
return r.c.Remove(r.Filename(h))
|
||||
}
|
||||
|
||||
// List runs fn for each file in the backend which has the type t. When an
|
||||
// error occurs (or fn returns an error), List stops and returns it.
|
||||
func (r *SFTP) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error {
|
||||
debug.Log("List %v", t)
|
||||
|
||||
basedir, subdirs := r.Basedir(t)
|
||||
walker := r.c.Walk(basedir)
|
||||
for {
|
||||
r.sem.GetToken()
|
||||
ok := walker.Step()
|
||||
r.sem.ReleaseToken()
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
|
@ -572,7 +507,6 @@ var closeTimeout = 2 * time.Second
|
|||
|
||||
// Close closes the sftp connection and terminates the underlying command.
|
||||
func (r *SFTP) Close() error {
|
||||
debug.Log("Close")
|
||||
if r == nil {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -15,12 +15,10 @@ import (
|
|||
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/backend/layout"
|
||||
"github.com/restic/restic/internal/backend/sema"
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/ncw/swift/v2"
|
||||
)
|
||||
|
||||
|
@ -28,7 +26,6 @@ import (
|
|||
type beSwift struct {
|
||||
conn *swift.Connection
|
||||
connections uint
|
||||
sem sema.Semaphore
|
||||
container string // Container name
|
||||
prefix string // Prefix of object names in the container
|
||||
layout.Layout
|
||||
|
@ -42,11 +39,6 @@ var _ restic.Backend = &beSwift{}
|
|||
func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend, error) {
|
||||
debug.Log("config %#v", cfg)
|
||||
|
||||
sem, err := sema.New(cfg.Connections)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
be := &beSwift{
|
||||
conn: &swift.Connection{
|
||||
UserName: cfg.UserName,
|
||||
|
@ -72,7 +64,6 @@ func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend
|
|||
Transport: rt,
|
||||
},
|
||||
connections: cfg.Connections,
|
||||
sem: sem,
|
||||
container: cfg.Container,
|
||||
prefix: cfg.Prefix,
|
||||
Layout: &layout.DefaultLayout{
|
||||
|
@ -143,18 +134,6 @@ func (be *beSwift) Load(ctx context.Context, h restic.Handle, length int, offset
|
|||
}
|
||||
|
||||
func (be *beSwift) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
debug.Log("Load %v, length %v, offset %v", h, length, offset)
|
||||
if err := h.Valid(); err != nil {
|
||||
return nil, backoff.Permanent(err)
|
||||
}
|
||||
|
||||
if offset < 0 {
|
||||
return nil, errors.New("offset is negative")
|
||||
}
|
||||
|
||||
if length < 0 {
|
||||
return nil, errors.Errorf("invalid length %d", length)
|
||||
}
|
||||
|
||||
objName := be.Filename(h)
|
||||
|
||||
|
@ -167,59 +146,34 @@ func (be *beSwift) openReader(ctx context.Context, h restic.Handle, length int,
|
|||
headers["Range"] = fmt.Sprintf("bytes=%d-%d", offset, offset+int64(length)-1)
|
||||
}
|
||||
|
||||
if _, ok := headers["Range"]; ok {
|
||||
debug.Log("Load(%v) send range %v", h, headers["Range"])
|
||||
}
|
||||
|
||||
be.sem.GetToken()
|
||||
obj, _, err := be.conn.ObjectOpen(ctx, be.container, objName, false, headers)
|
||||
if err != nil {
|
||||
debug.Log(" err %v", err)
|
||||
be.sem.ReleaseToken()
|
||||
return nil, errors.Wrap(err, "conn.ObjectOpen")
|
||||
}
|
||||
|
||||
return be.sem.ReleaseTokenOnClose(obj, nil), nil
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
// Save stores data in the backend at the handle.
|
||||
func (be *beSwift) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
|
||||
if err := h.Valid(); err != nil {
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
|
||||
objName := be.Filename(h)
|
||||
|
||||
debug.Log("Save %v at %v", h, objName)
|
||||
|
||||
be.sem.GetToken()
|
||||
defer be.sem.ReleaseToken()
|
||||
|
||||
encoding := "binary/octet-stream"
|
||||
|
||||
debug.Log("PutObject(%v, %v, %v)", be.container, objName, encoding)
|
||||
hdr := swift.Headers{"Content-Length": strconv.FormatInt(rd.Length(), 10)}
|
||||
_, err := be.conn.ObjectPut(ctx,
|
||||
be.container, objName, rd, true, hex.EncodeToString(rd.Hash()),
|
||||
encoding, hdr)
|
||||
// swift does not return the upload length
|
||||
debug.Log("%v, err %#v", objName, err)
|
||||
|
||||
return errors.Wrap(err, "client.PutObject")
|
||||
}
|
||||
|
||||
// Stat returns information about a blob.
|
||||
func (be *beSwift) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInfo, err error) {
|
||||
debug.Log("%v", h)
|
||||
|
||||
objName := be.Filename(h)
|
||||
|
||||
be.sem.GetToken()
|
||||
defer be.sem.ReleaseToken()
|
||||
|
||||
obj, _, err := be.conn.Object(ctx, be.container, objName)
|
||||
if err != nil {
|
||||
debug.Log("Object() err %v", err)
|
||||
return restic.FileInfo{}, errors.Wrap(err, "conn.Object")
|
||||
}
|
||||
|
||||
|
@ -230,27 +184,19 @@ func (be *beSwift) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf
|
|||
func (be *beSwift) Remove(ctx context.Context, h restic.Handle) error {
|
||||
objName := be.Filename(h)
|
||||
|
||||
be.sem.GetToken()
|
||||
defer be.sem.ReleaseToken()
|
||||
|
||||
err := be.conn.ObjectDelete(ctx, be.container, objName)
|
||||
debug.Log("Remove(%v) -> err %v", h, err)
|
||||
return errors.Wrap(err, "conn.ObjectDelete")
|
||||
}
|
||||
|
||||
// List runs fn for each file in the backend which has the type t. When an
|
||||
// error occurs (or fn returns an error), List stops and returns it.
|
||||
func (be *beSwift) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error {
|
||||
debug.Log("listing %v", t)
|
||||
|
||||
prefix, _ := be.Basedir(t)
|
||||
prefix += "/"
|
||||
|
||||
err := be.conn.ObjectsWalk(ctx, be.container, &swift.ObjectsOpts{Prefix: prefix},
|
||||
func(ctx context.Context, opts *swift.ObjectsOpts) (interface{}, error) {
|
||||
be.sem.GetToken()
|
||||
newObjects, err := be.conn.Objects(ctx, be.container, opts)
|
||||
be.sem.ReleaseToken()
|
||||
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "conn.ObjectNames")
|
||||
|
@ -285,13 +231,6 @@ func (be *beSwift) List(ctx context.Context, t restic.FileType, fn func(restic.F
|
|||
return ctx.Err()
|
||||
}
|
||||
|
||||
// Remove keys for a specified backend type.
|
||||
func (be *beSwift) removeKeys(ctx context.Context, t restic.FileType) error {
|
||||
return be.List(ctx, t, func(fi restic.FileInfo) error {
|
||||
return be.Remove(ctx, restic.Handle{Type: t, Name: fi.Name})
|
||||
})
|
||||
}
|
||||
|
||||
// IsNotExist returns true if the error is caused by a not existing file.
|
||||
func (be *beSwift) IsNotExist(err error) bool {
|
||||
var e *swift.Error
|
||||
|
@ -301,26 +240,7 @@ func (be *beSwift) IsNotExist(err error) bool {
|
|||
// Delete removes all restic objects in the container.
|
||||
// It will not remove the container itself.
|
||||
func (be *beSwift) Delete(ctx context.Context) error {
|
||||
alltypes := []restic.FileType{
|
||||
restic.PackFile,
|
||||
restic.KeyFile,
|
||||
restic.LockFile,
|
||||
restic.SnapshotFile,
|
||||
restic.IndexFile}
|
||||
|
||||
for _, t := range alltypes {
|
||||
err := be.removeKeys(ctx, t)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
err := be.Remove(ctx, restic.Handle{Type: restic.ConfigFile})
|
||||
if err != nil && !be.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
return backend.DefaultDelete(ctx, be)
|
||||
}
|
||||
|
||||
// Close does nothing
|
||||
|
|
|
@ -124,17 +124,7 @@ func (s *Suite) TestLoad(t *testing.T) {
|
|||
b := s.open(t)
|
||||
defer s.close(t, b)
|
||||
|
||||
noop := func(rd io.Reader) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := b.Load(context.TODO(), restic.Handle{}, 0, 0, noop)
|
||||
if err == nil {
|
||||
t.Fatalf("Load() did not return an error for invalid handle")
|
||||
}
|
||||
test.Assert(t, !b.IsNotExist(err), "IsNotExist() should not accept an invalid handle error: %v", err)
|
||||
|
||||
err = testLoad(b, restic.Handle{Type: restic.PackFile, Name: "foobar"}, 0, 0)
|
||||
err := testLoad(b, restic.Handle{Type: restic.PackFile, Name: "foobar"}, 0, 0)
|
||||
if err == nil {
|
||||
t.Fatalf("Load() did not return an error for non-existing blob")
|
||||
}
|
||||
|
@ -153,11 +143,6 @@ func (s *Suite) TestLoad(t *testing.T) {
|
|||
|
||||
t.Logf("saved %d bytes as %v", length, handle)
|
||||
|
||||
err = b.Load(context.TODO(), handle, 100, -1, noop)
|
||||
if err == nil {
|
||||
t.Fatalf("Load() returned no error for negative offset!")
|
||||
}
|
||||
|
||||
err = b.Load(context.TODO(), handle, 0, 0, func(rd io.Reader) error {
|
||||
_, err := io.Copy(io.Discard, rd)
|
||||
if err != nil {
|
||||
|
|
|
@ -62,6 +62,7 @@ func LimitReadCloser(r io.ReadCloser, n int64) *LimitedReadCloser {
|
|||
func DefaultLoad(ctx context.Context, h restic.Handle, length int, offset int64,
|
||||
openReader func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error),
|
||||
fn func(rd io.Reader) error) error {
|
||||
|
||||
rd, err := openReader(ctx, h, length, offset)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -74,6 +75,31 @@ func DefaultLoad(ctx context.Context, h restic.Handle, length int, offset int64,
|
|||
return rd.Close()
|
||||
}
|
||||
|
||||
// DefaultDelete removes all restic keys in the bucket. It will not remove the bucket itself.
|
||||
func DefaultDelete(ctx context.Context, be restic.Backend) error {
|
||||
alltypes := []restic.FileType{
|
||||
restic.PackFile,
|
||||
restic.KeyFile,
|
||||
restic.LockFile,
|
||||
restic.SnapshotFile,
|
||||
restic.IndexFile}
|
||||
|
||||
for _, t := range alltypes {
|
||||
err := be.List(ctx, t, func(fi restic.FileInfo) error {
|
||||
return be.Remove(ctx, restic.Handle{Type: t, Name: fi.Name})
|
||||
})
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
err := be.Remove(ctx, restic.Handle{Type: restic.ConfigFile})
|
||||
if err != nil && be.IsNotExist(err) {
|
||||
err = nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
type memorizedLister struct {
|
||||
fileInfos []restic.FileInfo
|
||||
tpe restic.FileType
|
||||
|
|
4
internal/cache/backend.go
vendored
4
internal/cache/backend.go
vendored
|
@ -211,3 +211,7 @@ func (b *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, e
|
|||
func (b *Backend) IsNotExist(err error) bool {
|
||||
return b.Backend.IsNotExist(err)
|
||||
}
|
||||
|
||||
func (b *Backend) Unwrap() restic.Backend {
|
||||
return b.Backend
|
||||
}
|
||||
|
|
|
@ -8,7 +8,6 @@ import (
|
|||
|
||||
"github.com/restic/restic/internal/backend/layout"
|
||||
"github.com/restic/restic/internal/backend/s3"
|
||||
"github.com/restic/restic/internal/cache"
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
|
@ -22,24 +21,26 @@ func init() {
|
|||
// "default" layout.
|
||||
type S3Layout struct{}
|
||||
|
||||
func toS3Backend(repo restic.Repository) *s3.Backend {
|
||||
b := repo.Backend()
|
||||
// unwrap cache
|
||||
if be, ok := b.(*cache.Backend); ok {
|
||||
b = be.Backend
|
||||
}
|
||||
func toS3Backend(b restic.Backend) *s3.Backend {
|
||||
for b != nil {
|
||||
if be, ok := b.(*s3.Backend); ok {
|
||||
return be
|
||||
}
|
||||
|
||||
be, ok := b.(*s3.Backend)
|
||||
if !ok {
|
||||
debug.Log("backend is not s3")
|
||||
return nil
|
||||
if be, ok := b.(restic.BackendUnwrapper); ok {
|
||||
b = be.Unwrap()
|
||||
} else {
|
||||
// not the backend we're looking for
|
||||
break
|
||||
}
|
||||
}
|
||||
return be
|
||||
debug.Log("backend is not s3")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check tests whether the migration can be applied.
|
||||
func (m *S3Layout) Check(ctx context.Context, repo restic.Repository) (bool, string, error) {
|
||||
be := toS3Backend(repo)
|
||||
be := toS3Backend(repo.Backend())
|
||||
if be == nil {
|
||||
debug.Log("backend is not s3")
|
||||
return false, "backend is not s3", nil
|
||||
|
@ -91,7 +92,7 @@ func (m *S3Layout) moveFiles(ctx context.Context, be *s3.Backend, l layout.Layou
|
|||
|
||||
// Apply runs the migration.
|
||||
func (m *S3Layout) Apply(ctx context.Context, repo restic.Repository) error {
|
||||
be := toS3Backend(repo)
|
||||
be := toS3Backend(repo.Backend())
|
||||
if be == nil {
|
||||
debug.Log("backend is not s3")
|
||||
return errors.New("backend is not s3")
|
||||
|
|
27
internal/migrations/s3_layout_test.go
Normal file
27
internal/migrations/s3_layout_test.go
Normal file
|
@ -0,0 +1,27 @@
|
|||
package migrations
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/restic/restic/internal/backend/mock"
|
||||
"github.com/restic/restic/internal/backend/s3"
|
||||
"github.com/restic/restic/internal/cache"
|
||||
"github.com/restic/restic/internal/test"
|
||||
)
|
||||
|
||||
func TestS3UnwrapBackend(t *testing.T) {
|
||||
// toS3Backend(b restic.Backend) *s3.Backend
|
||||
|
||||
m := mock.NewBackend()
|
||||
test.Assert(t, toS3Backend(m) == nil, "mock backend is not an s3 backend")
|
||||
|
||||
// uninitialized fake backend for testing
|
||||
s3 := &s3.Backend{}
|
||||
test.Assert(t, toS3Backend(s3) == s3, "s3 was not returned")
|
||||
|
||||
c := &cache.Backend{Backend: s3}
|
||||
test.Assert(t, toS3Backend(c) == s3, "failed to unwrap s3 backend")
|
||||
|
||||
c.Backend = m
|
||||
test.Assert(t, toS3Backend(c) == nil, "a wrapped mock backend is not an s3 backend")
|
||||
}
|
|
@ -70,6 +70,11 @@ type Backend interface {
|
|||
Delete(ctx context.Context) error
|
||||
}
|
||||
|
||||
type BackendUnwrapper interface {
|
||||
// Unwrap returns the underlying backend or nil if there is none.
|
||||
Unwrap() Backend
|
||||
}
|
||||
|
||||
// FileInfo is contains information about a file in the backend.
|
||||
type FileInfo struct {
|
||||
Size int64
|
||||
|
|
Loading…
Reference in a new issue