backend: factor out connection limiting and parameter validation

The SemaphoreBackend now uniformly enforces the limit of concurrent
backend operations. In addition, it unifies the parameter validation.

The List() methods no longer uses a semaphore. Restic already never runs
multiple list operations in parallel.

By managing the semaphore in a wrapper backend, the sections that hold a
semaphore token grow slightly. However, the main bottleneck is IO, so
this shouldn't make much of a difference.

The key insight that enables the SemaphoreBackend is that all of the
complex semaphore handling in `openReader()` still happens within the
original call to `Load()`. Thus, getting and releasing the semaphore
tokens can be refactored to happen directly in `Load()`. This eliminates
the need for wrapping the reader in `openReader()` to release the token.
This commit is contained in:
Michael Eischer 2023-04-07 23:02:35 +02:00
parent 8b5ab5b59f
commit 8e1e3844aa
13 changed files with 126 additions and 384 deletions

View file

@ -25,6 +25,7 @@ import (
"github.com/restic/restic/internal/backend/rest"
"github.com/restic/restic/internal/backend/retry"
"github.com/restic/restic/internal/backend/s3"
"github.com/restic/restic/internal/backend/sema"
"github.com/restic/restic/internal/backend/sftp"
"github.com/restic/restic/internal/backend/swift"
"github.com/restic/restic/internal/cache"
@ -744,8 +745,8 @@ func open(ctx context.Context, s string, gopts GlobalOptions, opts options.Optio
return nil, errors.Fatalf("unable to open repository at %v: %v", location.StripPassword(s), err)
}
// wrap with debug logging
be = logger.New(be)
// wrap with debug logging and connection limiting
be = logger.New(sema.New(be))
// wrap backend if a test specified an inner hook
if gopts.backendInnerTestHook != nil {
@ -820,5 +821,5 @@ func create(ctx context.Context, s string, opts options.Options) (restic.Backend
return nil, err
}
return logger.New(be), nil
return logger.New(sema.New(be)), nil
}

View file

@ -14,7 +14,6 @@ import (
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/backend/layout"
"github.com/restic/restic/internal/backend/sema"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
@ -26,7 +25,6 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
azContainer "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/cenkalti/backoff/v4"
)
// Backend stores data on an azure endpoint.
@ -34,7 +32,6 @@ type Backend struct {
cfg Config
container *azContainer.Client
connections uint
sem sema.Semaphore
prefix string
listMaxItems int
layout.Layout
@ -96,16 +93,10 @@ func open(cfg Config, rt http.RoundTripper) (*Backend, error) {
return nil, errors.New("no azure authentication information found")
}
sem, err := sema.New(cfg.Connections)
if err != nil {
return nil, err
}
be := &Backend{
container: client,
cfg: cfg,
connections: cfg.Connections,
sem: sem,
Layout: &layout.DefaultLayout{
Path: cfg.Prefix,
Join: path.Join,
@ -186,14 +177,8 @@ func (be *Backend) Path() string {
// Save stores data in the backend at the handle.
func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
if err := h.Valid(); err != nil {
return backoff.Permanent(err)
}
objName := be.Filename(h)
be.sem.GetToken()
debug.Log("InsertObject(%v, %v)", be.cfg.AccountName, objName)
var err error
@ -205,7 +190,6 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe
err = be.saveLarge(ctx, objName, rd)
}
be.sem.ReleaseToken()
return err
}
@ -297,7 +281,6 @@ func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int,
objName := be.Filename(h)
blockBlobClient := be.container.NewBlobClient(objName)
be.sem.GetToken()
resp, err := blockBlobClient.DownloadStream(ctx, &blob.DownloadStreamOptions{
Range: azblob.HTTPRange{
Offset: offset,
@ -306,11 +289,10 @@ func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int,
})
if err != nil {
be.sem.ReleaseToken()
return nil, err
}
return be.sem.ReleaseTokenOnClose(resp.Body, nil), err
return resp.Body, err
}
// Stat returns information about a blob.
@ -318,9 +300,7 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo,
objName := be.Filename(h)
blobClient := be.container.NewBlobClient(objName)
be.sem.GetToken()
props, err := blobClient.GetProperties(ctx, nil)
be.sem.ReleaseToken()
if err != nil {
return restic.FileInfo{}, errors.Wrap(err, "blob.GetProperties")
@ -338,9 +318,7 @@ func (be *Backend) Remove(ctx context.Context, h restic.Handle) error {
objName := be.Filename(h)
blob := be.container.NewBlobClient(objName)
be.sem.GetToken()
_, err := blob.Delete(ctx, &azblob.DeleteBlobOptions{})
be.sem.ReleaseToken()
if be.IsNotExist(err) {
return nil
@ -368,9 +346,7 @@ func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.F
lister := be.container.NewListBlobsFlatPager(opts)
for lister.More() {
be.sem.GetToken()
resp, err := lister.NextPage(ctx)
be.sem.ReleaseToken()
if err != nil {
return err

View file

@ -11,12 +11,10 @@ import (
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/backend/layout"
"github.com/restic/restic/internal/backend/sema"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
"github.com/cenkalti/backoff/v4"
"github.com/kurin/blazer/b2"
"github.com/kurin/blazer/base"
)
@ -28,7 +26,6 @@ type b2Backend struct {
cfg Config
listMaxItems int
layout.Layout
sem sema.Semaphore
canDelete bool
}
@ -92,11 +89,6 @@ func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend
return nil, errors.Wrap(err, "Bucket")
}
sem, err := sema.New(cfg.Connections)
if err != nil {
return nil, err
}
be := &b2Backend{
client: client,
bucket: bucket,
@ -106,7 +98,6 @@ func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend
Path: cfg.Prefix,
},
listMaxItems: defaultListMaxItems,
sem: sem,
canDelete: true,
}
@ -134,11 +125,6 @@ func Create(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backe
return nil, errors.Wrap(err, "NewBucket")
}
sem, err := sema.New(cfg.Connections)
if err != nil {
return nil, err
}
be := &b2Backend{
client: client,
bucket: bucket,
@ -148,7 +134,6 @@ func Create(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backe
Path: cfg.Prefix,
},
listMaxItems: defaultListMaxItems,
sem: sem,
}
_, err = be.Stat(ctx, restic.Handle{Type: restic.ConfigFile})
@ -202,20 +187,18 @@ func (be *b2Backend) IsNotExist(err error) bool {
// Load runs fn with a reader that yields the contents of the file at h at the
// given offset.
func (be *b2Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn)
}
func (be *b2Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
ctx, cancel := context.WithCancel(ctx)
be.sem.GetToken()
name := be.Layout.Filename(h)
obj := be.bucket.Object(name)
if offset == 0 && length == 0 {
rd := obj.NewReader(ctx)
return be.sem.ReleaseTokenOnClose(rd, cancel), nil
return obj.NewReader(ctx), nil
}
// pass a negative length to NewRangeReader so that the remainder of the
@ -224,8 +207,7 @@ func (be *b2Backend) openReader(ctx context.Context, h restic.Handle, length int
length = -1
}
rd := obj.NewRangeReader(ctx, offset, int64(length))
return be.sem.ReleaseTokenOnClose(rd, cancel), nil
return obj.NewRangeReader(ctx, offset, int64(length)), nil
}
// Save stores data in the backend at the handle.
@ -233,15 +215,7 @@ func (be *b2Backend) Save(ctx context.Context, h restic.Handle, rd restic.Rewind
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if err := h.Valid(); err != nil {
return backoff.Permanent(err)
}
be.sem.GetToken()
defer be.sem.ReleaseToken()
name := be.Filename(h)
debug.Log("Save %v, name %v", h, name)
obj := be.bucket.Object(name)
// b2 always requires sha1 checksums for uploaded file parts
@ -262,9 +236,6 @@ func (be *b2Backend) Save(ctx context.Context, h restic.Handle, rd restic.Rewind
// Stat returns information about a blob.
func (be *b2Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInfo, err error) {
be.sem.GetToken()
defer be.sem.ReleaseToken()
name := be.Filename(h)
obj := be.bucket.Object(name)
info, err := obj.Attrs(ctx)
@ -276,9 +247,6 @@ func (be *b2Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileI
// Remove removes the blob with the given name and type.
func (be *b2Backend) Remove(ctx context.Context, h restic.Handle) error {
be.sem.GetToken()
defer be.sem.ReleaseToken()
// the retry backend will also repeat the remove method up to 10 times
for i := 0; i < 3; i++ {
obj := be.bucket.Object(be.Filename(h))
@ -313,20 +281,13 @@ func (be *b2Backend) Remove(ctx context.Context, h restic.Handle) error {
return errors.New("failed to delete all file versions")
}
type semLocker struct {
sema.Semaphore
}
func (sm *semLocker) Lock() { sm.GetToken() }
func (sm *semLocker) Unlock() { sm.ReleaseToken() }
// List returns a channel that yields all names of blobs of type t.
func (be *b2Backend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
prefix, _ := be.Basedir(t)
iter := be.bucket.List(ctx, b2.ListPrefix(prefix), b2.ListPageSize(be.listMaxItems), b2.ListLocker(&semLocker{be.sem}))
iter := be.bucket.List(ctx, b2.ListPrefix(prefix), b2.ListPageSize(be.listMaxItems))
for iter.Next() {
obj := iter.Object()

View file

@ -15,7 +15,6 @@ import (
"github.com/pkg/errors"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/backend/layout"
"github.com/restic/restic/internal/backend/sema"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/restic"
@ -37,7 +36,6 @@ type Backend struct {
gcsClient *storage.Client
projectID string
connections uint
sem sema.Semaphore
bucketName string
bucket *storage.BucketHandle
prefix string
@ -99,16 +97,10 @@ func open(cfg Config, rt http.RoundTripper) (*Backend, error) {
return nil, errors.Wrap(err, "getStorageClient")
}
sem, err := sema.New(cfg.Connections)
if err != nil {
return nil, err
}
be := &Backend{
gcsClient: gcsClient,
projectID: cfg.ProjectID,
connections: cfg.Connections,
sem: sem,
bucketName: cfg.Bucket,
bucket: gcsClient.Bucket(cfg.Bucket),
prefix: cfg.Prefix,
@ -203,16 +195,8 @@ func (be *Backend) Path() string {
// Save stores data in the backend at the handle.
func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
if err := h.Valid(); err != nil {
return err
}
objName := be.Filename(h)
be.sem.GetToken()
debug.Log("InsertObject(%v, %v)", be.bucketName, objName)
// Set chunk size to zero to disable resumable uploads.
//
// With a non-zero chunk size (the default is
@ -247,8 +231,6 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe
err = cerr
}
be.sem.ReleaseToken()
if err != nil {
return errors.Wrap(err, "service.Objects.Insert")
}
@ -263,6 +245,9 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe
// Load runs fn with a reader that yields the contents of the file at h at the
// given offset.
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn)
}
@ -274,27 +259,19 @@ func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int,
objName := be.Filename(h)
be.sem.GetToken()
ctx, cancel := context.WithCancel(ctx)
r, err := be.bucket.Object(objName).NewRangeReader(ctx, offset, int64(length))
if err != nil {
cancel()
be.sem.ReleaseToken()
return nil, err
}
return be.sem.ReleaseTokenOnClose(r, cancel), err
return r, err
}
// Stat returns information about a blob.
func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInfo, err error) {
objName := be.Filename(h)
be.sem.GetToken()
attr, err := be.bucket.Object(objName).Attrs(ctx)
be.sem.ReleaseToken()
if err != nil {
return restic.FileInfo{}, errors.Wrap(err, "service.Objects.Get")
@ -307,9 +284,7 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf
func (be *Backend) Remove(ctx context.Context, h restic.Handle) error {
objName := be.Filename(h)
be.sem.GetToken()
err := be.bucket.Object(objName).Delete(ctx)
be.sem.ReleaseToken()
if err == storage.ErrObjectNotExist {
err = nil
@ -334,9 +309,7 @@ func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.F
itr := be.bucket.Objects(ctx, &storage.Query{Prefix: prefix})
for {
be.sem.GetToken()
attrs, err := itr.Next()
be.sem.ReleaseToken()
if err == iterator.Done {
break
}

View file

@ -10,7 +10,6 @@ import (
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/backend/layout"
"github.com/restic/restic/internal/backend/sema"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/fs"
@ -22,7 +21,6 @@ import (
// Local is a backend in a local directory.
type Local struct {
Config
sem sema.Semaphore
layout.Layout
backend.Modes
}
@ -38,11 +36,6 @@ func open(ctx context.Context, cfg Config) (*Local, error) {
return nil, err
}
sem, err := sema.New(cfg.Connections)
if err != nil {
return nil, err
}
fi, err := fs.Stat(l.Filename(restic.Handle{Type: restic.ConfigFile}))
m := backend.DeriveModesFromFileInfo(fi, err)
debug.Log("using (%03O file, %03O dir) permissions", m.File, m.Dir)
@ -50,7 +43,6 @@ func open(ctx context.Context, cfg Config) (*Local, error) {
return &Local{
Config: cfg,
Layout: l,
sem: sem,
Modes: m,
}, nil
}
@ -114,10 +106,6 @@ func (b *Local) IsNotExist(err error) bool {
// Save stores data in the backend at the handle.
func (b *Local) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) (err error) {
if err := h.Valid(); err != nil {
return backoff.Permanent(err)
}
finalname := b.Filename(h)
dir := filepath.Dir(finalname)
@ -128,9 +116,6 @@ func (b *Local) Save(ctx context.Context, h restic.Handle, rd restic.RewindReade
}
}()
b.sem.GetToken()
defer b.sem.ReleaseToken()
// Create new file with a temporary name.
tmpname := filepath.Base(finalname) + "-tmp-"
f, err := tempFile(dir, tmpname)
@ -216,40 +201,28 @@ func (b *Local) Load(ctx context.Context, h restic.Handle, length int, offset in
}
func (b *Local) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
b.sem.GetToken()
f, err := fs.Open(b.Filename(h))
if err != nil {
b.sem.ReleaseToken()
return nil, err
}
if offset > 0 {
_, err = f.Seek(offset, 0)
if err != nil {
b.sem.ReleaseToken()
_ = f.Close()
return nil, err
}
}
r := b.sem.ReleaseTokenOnClose(f, nil)
if length > 0 {
return backend.LimitReadCloser(r, int64(length)), nil
return backend.LimitReadCloser(f, int64(length)), nil
}
return r, nil
return f, nil
}
// Stat returns information about a blob.
func (b *Local) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) {
if err := h.Valid(); err != nil {
return restic.FileInfo{}, backoff.Permanent(err)
}
b.sem.GetToken()
defer b.sem.ReleaseToken()
fi, err := fs.Stat(b.Filename(h))
if err != nil {
return restic.FileInfo{}, errors.WithStack(err)
@ -262,9 +235,6 @@ func (b *Local) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, err
func (b *Local) Remove(ctx context.Context, h restic.Handle) error {
fn := b.Filename(h)
b.sem.GetToken()
defer b.sem.ReleaseToken()
// reset read-only flag
err := fs.Chmod(fn, 0666)
if err != nil && !os.IsPermission(err) {

View file

@ -10,12 +10,9 @@ import (
"github.com/cespare/xxhash/v2"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/backend/sema"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
"github.com/cenkalti/backoff/v4"
)
type memMap map[restic.Handle][]byte
@ -32,19 +29,12 @@ const connectionCount = 2
type MemoryBackend struct {
data memMap
m sync.Mutex
sem sema.Semaphore
}
// New returns a new backend that saves all data in a map in memory.
func New() *MemoryBackend {
sem, err := sema.New(connectionCount)
if err != nil {
panic(err)
}
be := &MemoryBackend{
data: make(memMap),
sem: sem,
}
debug.Log("created new memory backend")
@ -59,13 +49,6 @@ func (be *MemoryBackend) IsNotExist(err error) bool {
// Save adds new Data to the backend.
func (be *MemoryBackend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
if err := h.Valid(); err != nil {
return backoff.Permanent(err)
}
be.sem.GetToken()
defer be.sem.ReleaseToken()
be.m.Lock()
defer be.m.Unlock()
@ -113,8 +96,6 @@ func (be *MemoryBackend) Load(ctx context.Context, h restic.Handle, length int,
}
func (be *MemoryBackend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
be.sem.GetToken()
be.m.Lock()
defer be.m.Unlock()
@ -123,21 +104,12 @@ func (be *MemoryBackend) openReader(ctx context.Context, h restic.Handle, length
h.Name = ""
}
debug.Log("Load %v offset %v len %v", h, offset, length)
if offset < 0 {
be.sem.ReleaseToken()
return nil, errors.New("offset is negative")
}
if _, ok := be.data[h]; !ok {
be.sem.ReleaseToken()
return nil, errNotFound
}
buf := be.data[h]
if offset > int64(len(buf)) {
be.sem.ReleaseToken()
return nil, errors.New("offset beyond end of file")
}
@ -146,18 +118,11 @@ func (be *MemoryBackend) openReader(ctx context.Context, h restic.Handle, length
buf = buf[:length]
}
return be.sem.ReleaseTokenOnClose(io.NopCloser(bytes.NewReader(buf)), nil), ctx.Err()
return io.NopCloser(bytes.NewReader(buf)), ctx.Err()
}
// Stat returns information about a file in the backend.
func (be *MemoryBackend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) {
if err := h.Valid(); err != nil {
return restic.FileInfo{}, backoff.Permanent(err)
}
be.sem.GetToken()
defer be.sem.ReleaseToken()
be.m.Lock()
defer be.m.Unlock()
@ -176,9 +141,6 @@ func (be *MemoryBackend) Stat(ctx context.Context, h restic.Handle) (restic.File
// Remove deletes a file from the backend.
func (be *MemoryBackend) Remove(ctx context.Context, h restic.Handle) error {
be.sem.GetToken()
defer be.sem.ReleaseToken()
be.m.Lock()
defer be.m.Unlock()

View file

@ -12,12 +12,9 @@ import (
"strings"
"github.com/restic/restic/internal/backend/layout"
"github.com/restic/restic/internal/backend/sema"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
"github.com/cenkalti/backoff/v4"
)
// make sure the rest backend implements restic.Backend
@ -27,7 +24,6 @@ var _ restic.Backend = &Backend{}
type Backend struct {
url *url.URL
connections uint
sem sema.Semaphore
client http.Client
layout.Layout
}
@ -40,11 +36,6 @@ const (
// Open opens the REST backend with the given config.
func Open(cfg Config, rt http.RoundTripper) (*Backend, error) {
sem, err := sema.New(cfg.Connections)
if err != nil {
return nil, err
}
// use url without trailing slash for layout
url := cfg.URL.String()
if url[len(url)-1] == '/' {
@ -56,7 +47,6 @@ func Open(cfg Config, rt http.RoundTripper) (*Backend, error) {
client: http.Client{Transport: rt},
Layout: &layout.RESTLayout{URL: url, Join: path.Join},
connections: cfg.Connections,
sem: sem,
}
return be, nil
@ -123,10 +113,6 @@ func (b *Backend) HasAtomicReplace() bool {
// Save stores data in the backend at the handle.
func (b *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
if err := h.Valid(); err != nil {
return backoff.Permanent(err)
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -143,9 +129,7 @@ func (b *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRea
// let's the server know what's coming.
req.ContentLength = rd.Length()
b.sem.GetToken()
resp, err := b.client.Do(req)
b.sem.ReleaseToken()
var cerr error
if resp != nil {
@ -212,18 +196,6 @@ func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset
}
func (b *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
if err := h.Valid(); err != nil {
return nil, backoff.Permanent(err)
}
if offset < 0 {
return nil, errors.New("offset is negative")
}
if length < 0 {
return nil, errors.Errorf("invalid length %d", length)
}
req, err := http.NewRequestWithContext(ctx, "GET", b.Filename(h), nil)
if err != nil {
return nil, errors.WithStack(err)
@ -237,9 +209,7 @@ func (b *Backend) openReader(ctx context.Context, h restic.Handle, length int, o
req.Header.Set("Accept", ContentTypeV2)
debug.Log("Load(%v) send range %v", h, byteRange)
b.sem.GetToken()
resp, err := b.client.Do(req)
b.sem.ReleaseToken()
if err != nil {
if resp != nil {
@ -264,19 +234,13 @@ func (b *Backend) openReader(ctx context.Context, h restic.Handle, length int, o
// Stat returns information about a blob.
func (b *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) {
if err := h.Valid(); err != nil {
return restic.FileInfo{}, backoff.Permanent(err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodHead, b.Filename(h), nil)
if err != nil {
return restic.FileInfo{}, errors.WithStack(err)
}
req.Header.Set("Accept", ContentTypeV2)
b.sem.GetToken()
resp, err := b.client.Do(req)
b.sem.ReleaseToken()
if err != nil {
return restic.FileInfo{}, errors.WithStack(err)
}
@ -309,19 +273,13 @@ func (b *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, e
// Remove removes the blob with the given name and type.
func (b *Backend) Remove(ctx context.Context, h restic.Handle) error {
if err := h.Valid(); err != nil {
return backoff.Permanent(err)
}
req, err := http.NewRequestWithContext(ctx, "DELETE", b.Filename(h), nil)
if err != nil {
return errors.WithStack(err)
}
req.Header.Set("Accept", ContentTypeV2)
b.sem.GetToken()
resp, err := b.client.Do(req)
b.sem.ReleaseToken()
if err != nil {
return errors.Wrap(err, "client.Do")
@ -358,9 +316,7 @@ func (b *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.Fi
}
req.Header.Set("Accept", ContentTypeV2)
b.sem.GetToken()
resp, err := b.client.Do(req)
b.sem.ReleaseToken()
if err != nil {
return errors.Wrap(err, "List")

View file

@ -13,12 +13,10 @@ import (
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/backend/layout"
"github.com/restic/restic/internal/backend/sema"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
"github.com/cenkalti/backoff/v4"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
)
@ -26,7 +24,6 @@ import (
// Backend stores data on an S3 endpoint.
type Backend struct {
client *minio.Client
sem sema.Semaphore
cfg Config
layout.Layout
}
@ -102,14 +99,8 @@ func open(ctx context.Context, cfg Config, rt http.RoundTripper) (*Backend, erro
return nil, errors.Wrap(err, "minio.New")
}
sem, err := sema.New(cfg.Connections)
if err != nil {
return nil, err
}
be := &Backend{
client: client,
sem: sem,
cfg: cfg,
}
@ -271,15 +262,8 @@ func (be *Backend) Path() string {
// Save stores data in the backend at the handle.
func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
if err := h.Valid(); err != nil {
return backoff.Permanent(err)
}
objName := be.Filename(h)
be.sem.GetToken()
defer be.sem.ReleaseToken()
opts := minio.PutObjectOptions{StorageClass: be.cfg.StorageClass}
opts.ContentType = "application/octet-stream"
// the only option with the high-level api is to let the library handle the checksum computation
@ -301,6 +285,9 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe
// Load runs fn with a reader that yields the contents of the file at h at the
// given offset.
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn)
}
@ -321,18 +308,13 @@ func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int,
return nil, errors.Wrap(err, "SetRange")
}
be.sem.GetToken()
ctx, cancel := context.WithCancel(ctx)
coreClient := minio.Core{Client: be.client}
rd, _, _, err := coreClient.GetObject(ctx, be.cfg.Bucket, objName, opts)
if err != nil {
cancel()
be.sem.ReleaseToken()
return nil, err
}
return be.sem.ReleaseTokenOnClose(rd, cancel), err
return rd, err
}
// Stat returns information about a blob.
@ -342,17 +324,14 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf
opts := minio.GetObjectOptions{}
be.sem.GetToken()
obj, err = be.client.GetObject(ctx, be.cfg.Bucket, objName, opts)
if err != nil {
be.sem.ReleaseToken()
return restic.FileInfo{}, errors.Wrap(err, "client.GetObject")
}
// make sure that the object is closed properly.
defer func() {
e := obj.Close()
be.sem.ReleaseToken()
if err == nil {
err = errors.Wrap(e, "Close")
}
@ -370,9 +349,7 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf
func (be *Backend) Remove(ctx context.Context, h restic.Handle) error {
objName := be.Filename(h)
be.sem.GetToken()
err := be.client.RemoveObject(ctx, be.cfg.Bucket, objName, minio.RemoveObjectOptions{})
be.sem.ReleaseToken()
if be.IsNotExist(err) {
err = nil

View file

@ -0,0 +1,87 @@
package sema
import (
"context"
"io"
"github.com/cenkalti/backoff/v4"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
)
// make sure that SemaphoreBackend implements restic.Backend
var _ restic.Backend = &SemaphoreBackend{}
// SemaphoreBackend limits the number of concurrent operations.
type SemaphoreBackend struct {
restic.Backend
sem semaphore
}
// New creates a backend that limits the concurrent operations on the underlying backend
func New(be restic.Backend) *SemaphoreBackend {
sem, err := newSemaphore(be.Connections())
if err != nil {
panic(err)
}
return &SemaphoreBackend{
Backend: be,
sem: sem,
}
}
// Save adds new Data to the backend.
func (be *SemaphoreBackend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
if err := h.Valid(); err != nil {
return backoff.Permanent(err)
}
be.sem.GetToken()
defer be.sem.ReleaseToken()
return be.Backend.Save(ctx, h, rd)
}
// Load runs fn with a reader that yields the contents of the file at h at the
// given offset.
func (be *SemaphoreBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
if err := h.Valid(); err != nil {
return backoff.Permanent(err)
}
if offset < 0 {
return backoff.Permanent(errors.New("offset is negative"))
}
if length < 0 {
return backoff.Permanent(errors.Errorf("invalid length %d", length))
}
be.sem.GetToken()
defer be.sem.ReleaseToken()
return be.Backend.Load(ctx, h, length, offset, fn)
}
// Stat returns information about a file in the backend.
func (be *SemaphoreBackend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) {
if err := h.Valid(); err != nil {
return restic.FileInfo{}, backoff.Permanent(err)
}
be.sem.GetToken()
defer be.sem.ReleaseToken()
return be.Backend.Stat(ctx, h)
}
// Remove deletes a file from the backend.
func (be *SemaphoreBackend) Remove(ctx context.Context, h restic.Handle) error {
if err := h.Valid(); err != nil {
return backoff.Permanent(err)
}
be.sem.GetToken()
defer be.sem.ReleaseToken()
return be.Backend.Remove(ctx, h)
}

View file

@ -2,64 +2,30 @@
package sema
import (
"context"
"io"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
)
// A Semaphore limits access to a restricted resource.
type Semaphore struct {
// A semaphore limits access to a restricted resource.
type semaphore struct {
ch chan struct{}
}
// New returns a new semaphore with capacity n.
func New(n uint) (Semaphore, error) {
// newSemaphore returns a new semaphore with capacity n.
func newSemaphore(n uint) (semaphore, error) {
if n == 0 {
return Semaphore{}, errors.New("capacity must be a positive number")
return semaphore{}, errors.New("capacity must be a positive number")
}
return Semaphore{
return semaphore{
ch: make(chan struct{}, n),
}, nil
}
// GetToken blocks until a Token is available.
func (s Semaphore) GetToken() { s.ch <- struct{}{} }
func (s semaphore) GetToken() {
s.ch <- struct{}{}
debug.Log("acquired token")
}
// ReleaseToken returns a token.
func (s Semaphore) ReleaseToken() { <-s.ch }
// ReleaseTokenOnClose wraps an io.ReadCloser to return a token on Close.
// Before returning the token, cancel, if not nil, will be run
// to free up context resources.
func (s Semaphore) ReleaseTokenOnClose(rc io.ReadCloser, cancel context.CancelFunc) io.ReadCloser {
return &wrapReader{ReadCloser: rc, sem: s, cancel: cancel}
}
type wrapReader struct {
io.ReadCloser
eofSeen bool
sem Semaphore
cancel context.CancelFunc
}
func (wr *wrapReader) Read(p []byte) (int, error) {
if wr.eofSeen { // XXX Why do we do this?
return 0, io.EOF
}
n, err := wr.ReadCloser.Read(p)
if err == io.EOF {
wr.eofSeen = true
}
return n, err
}
func (wr *wrapReader) Close() error {
err := wr.ReadCloser.Close()
if wr.cancel != nil {
wr.cancel()
}
wr.sem.ReleaseToken()
return err
}
func (s semaphore) ReleaseToken() { <-s.ch }

View file

@ -15,7 +15,6 @@ import (
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/backend/layout"
"github.com/restic/restic/internal/backend/sema"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
@ -35,7 +34,6 @@ type SFTP struct {
posixRename bool
sem sema.Semaphore
layout.Layout
Config
backend.Modes
@ -140,11 +138,7 @@ func Open(ctx context.Context, cfg Config) (*SFTP, error) {
}
func open(ctx context.Context, sftp *SFTP, cfg Config) (*SFTP, error) {
sem, err := sema.New(cfg.Connections)
if err != nil {
return nil, err
}
var err error
sftp.Layout, err = layout.ParseLayout(ctx, sftp, cfg.Layout, defaultLayout, cfg.Path)
if err != nil {
return nil, err
@ -158,7 +152,6 @@ func open(ctx context.Context, sftp *SFTP, cfg Config) (*SFTP, error) {
sftp.Config = cfg
sftp.p = cfg.Path
sftp.sem = sem
sftp.Modes = m
return sftp, nil
}
@ -308,17 +301,10 @@ func (r *SFTP) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader
return err
}
if err := h.Valid(); err != nil {
return backoff.Permanent(err)
}
filename := r.Filename(h)
tmpFilename := filename + "-restic-temp-" + tempSuffix()
dirname := r.Dirname(h)
r.sem.GetToken()
defer r.sem.ReleaseToken()
// create new file
f, err := r.c.OpenFile(tmpFilename, os.O_CREATE|os.O_EXCL|os.O_WRONLY)
@ -414,52 +400,27 @@ func (r *SFTP) Load(ctx context.Context, h restic.Handle, length int, offset int
return backend.DefaultLoad(ctx, h, length, offset, r.openReader, fn)
}
// wrapReader wraps an io.ReadCloser to run an additional function on Close.
type wrapReader struct {
io.ReadCloser
io.WriterTo
f func()
}
func (wr *wrapReader) Close() error {
err := wr.ReadCloser.Close()
wr.f()
return err
}
func (r *SFTP) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
r.sem.GetToken()
f, err := r.c.Open(r.Filename(h))
if err != nil {
r.sem.ReleaseToken()
return nil, err
}
if offset > 0 {
_, err = f.Seek(offset, 0)
if err != nil {
r.sem.ReleaseToken()
_ = f.Close()
return nil, err
}
}
// use custom close wrapper to also provide WriteTo() on the wrapper
rd := &wrapReader{
ReadCloser: f,
WriterTo: f,
f: func() {
r.sem.ReleaseToken()
},
}
if length > 0 {
// unlimited reads usually use io.Copy which needs WriteTo support at the underlying reader
// limited reads are usually combined with io.ReadFull which reads all required bytes into a buffer in one go
return backend.LimitReadCloser(rd, int64(length)), nil
return backend.LimitReadCloser(f, int64(length)), nil
}
return rd, nil
return f, nil
}
// Stat returns information about a blob.
@ -468,13 +429,6 @@ func (r *SFTP) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, erro
return restic.FileInfo{}, err
}
if err := h.Valid(); err != nil {
return restic.FileInfo{}, backoff.Permanent(err)
}
r.sem.GetToken()
defer r.sem.ReleaseToken()
fi, err := r.c.Lstat(r.Filename(h))
if err != nil {
return restic.FileInfo{}, errors.Wrap(err, "Lstat")
@ -489,9 +443,6 @@ func (r *SFTP) Remove(ctx context.Context, h restic.Handle) error {
return err
}
r.sem.GetToken()
defer r.sem.ReleaseToken()
return r.c.Remove(r.Filename(h))
}
@ -501,9 +452,7 @@ func (r *SFTP) List(ctx context.Context, t restic.FileType, fn func(restic.FileI
basedir, subdirs := r.Basedir(t)
walker := r.c.Walk(basedir)
for {
r.sem.GetToken()
ok := walker.Step()
r.sem.ReleaseToken()
if !ok {
break
}

View file

@ -15,12 +15,10 @@ import (
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/backend/layout"
"github.com/restic/restic/internal/backend/sema"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
"github.com/cenkalti/backoff/v4"
"github.com/ncw/swift/v2"
)
@ -28,7 +26,6 @@ import (
type beSwift struct {
conn *swift.Connection
connections uint
sem sema.Semaphore
container string // Container name
prefix string // Prefix of object names in the container
layout.Layout
@ -42,11 +39,6 @@ var _ restic.Backend = &beSwift{}
func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend, error) {
debug.Log("config %#v", cfg)
sem, err := sema.New(cfg.Connections)
if err != nil {
return nil, err
}
be := &beSwift{
conn: &swift.Connection{
UserName: cfg.UserName,
@ -72,7 +64,6 @@ func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend
Transport: rt,
},
connections: cfg.Connections,
sem: sem,
container: cfg.Container,
prefix: cfg.Prefix,
Layout: &layout.DefaultLayout{
@ -159,27 +150,17 @@ func (be *beSwift) openReader(ctx context.Context, h restic.Handle, length int,
debug.Log("Load(%v) send range %v", h, headers["Range"])
}
be.sem.GetToken()
obj, _, err := be.conn.ObjectOpen(ctx, be.container, objName, false, headers)
if err != nil {
be.sem.ReleaseToken()
return nil, errors.Wrap(err, "conn.ObjectOpen")
}
return be.sem.ReleaseTokenOnClose(obj, nil), nil
return obj, nil
}
// Save stores data in the backend at the handle.
func (be *beSwift) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
if err := h.Valid(); err != nil {
return backoff.Permanent(err)
}
objName := be.Filename(h)
be.sem.GetToken()
defer be.sem.ReleaseToken()
encoding := "binary/octet-stream"
debug.Log("PutObject(%v, %v, %v)", be.container, objName, encoding)
@ -196,9 +177,6 @@ func (be *beSwift) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe
func (be *beSwift) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInfo, err error) {
objName := be.Filename(h)
be.sem.GetToken()
defer be.sem.ReleaseToken()
obj, _, err := be.conn.Object(ctx, be.container, objName)
if err != nil {
return restic.FileInfo{}, errors.Wrap(err, "conn.Object")
@ -211,9 +189,6 @@ func (be *beSwift) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf
func (be *beSwift) Remove(ctx context.Context, h restic.Handle) error {
objName := be.Filename(h)
be.sem.GetToken()
defer be.sem.ReleaseToken()
err := be.conn.ObjectDelete(ctx, be.container, objName)
return errors.Wrap(err, "conn.ObjectDelete")
}
@ -226,9 +201,7 @@ func (be *beSwift) List(ctx context.Context, t restic.FileType, fn func(restic.F
err := be.conn.ObjectsWalk(ctx, be.container, &swift.ObjectsOpts{Prefix: prefix},
func(ctx context.Context, opts *swift.ObjectsOpts) (interface{}, error) {
be.sem.GetToken()
newObjects, err := be.conn.Objects(ctx, be.container, opts)
be.sem.ReleaseToken()
if err != nil {
return nil, errors.Wrap(err, "conn.ObjectNames")

View file

@ -6,7 +6,6 @@ import (
"fmt"
"io"
"github.com/cenkalti/backoff/v4"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
@ -63,15 +62,7 @@ func LimitReadCloser(r io.ReadCloser, n int64) *LimitedReadCloser {
func DefaultLoad(ctx context.Context, h restic.Handle, length int, offset int64,
openReader func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error),
fn func(rd io.Reader) error) error {
if err := h.Valid(); err != nil {
return backoff.Permanent(err)
}
if offset < 0 {
return errors.New("offset is negative")
}
if length < 0 {
return errors.Errorf("invalid length %d", length)
}
rd, err := openReader(ctx, h, length, offset)
if err != nil {
return err