Merge pull request #1410 from armhold/deadlock2

unify behavior for max http connections across backends
This commit is contained in:
Alexander Neumann 2017-11-24 21:32:56 +01:00
commit f79698dcdd
6 changed files with 113 additions and 59 deletions

View file

@ -227,13 +227,17 @@ func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset
} }
// Stat returns information about a blob. // Stat returns information about a blob.
func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInfo, err error) { func (be *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) {
debug.Log("%v", h) debug.Log("%v", h)
objName := be.Filename(h) objName := be.Filename(h)
blob := be.container.GetBlobReference(objName) blob := be.container.GetBlobReference(objName)
if err := blob.GetProperties(nil); err != nil { be.sem.GetToken()
err := blob.GetProperties(nil)
be.sem.ReleaseToken()
if err != nil {
debug.Log("blob.GetProperties err %v", err) debug.Log("blob.GetProperties err %v", err)
return restic.FileInfo{}, errors.Wrap(err, "blob.GetProperties") return restic.FileInfo{}, errors.Wrap(err, "blob.GetProperties")
} }
@ -244,7 +248,11 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf
// Test returns true if a blob of the given type and name exists in the backend. // Test returns true if a blob of the given type and name exists in the backend.
func (be *Backend) Test(ctx context.Context, h restic.Handle) (bool, error) { func (be *Backend) Test(ctx context.Context, h restic.Handle) (bool, error) {
objName := be.Filename(h) objName := be.Filename(h)
be.sem.GetToken()
found, err := be.container.GetBlobReference(objName).Exists() found, err := be.container.GetBlobReference(objName).Exists()
be.sem.ReleaseToken()
if err != nil { if err != nil {
return false, err return false, err
} }
@ -254,7 +262,11 @@ func (be *Backend) Test(ctx context.Context, h restic.Handle) (bool, error) {
// Remove removes the blob with the given name and type. // Remove removes the blob with the given name and type.
func (be *Backend) Remove(ctx context.Context, h restic.Handle) error { func (be *Backend) Remove(ctx context.Context, h restic.Handle) error {
objName := be.Filename(h) objName := be.Filename(h)
be.sem.GetToken()
_, err := be.container.GetBlobReference(objName).DeleteIfExists(nil) _, err := be.container.GetBlobReference(objName).DeleteIfExists(nil)
be.sem.ReleaseToken()
debug.Log("Remove(%v) at %v -> err %v", h, objName, err) debug.Log("Remove(%v) at %v -> err %v", h, objName, err)
return errors.Wrap(err, "client.RemoveObject") return errors.Wrap(err, "client.RemoveObject")
} }
@ -282,7 +294,10 @@ func (be *Backend) List(ctx context.Context, t restic.FileType) <-chan string {
defer close(ch) defer close(ch)
for { for {
be.sem.GetToken()
obj, err := be.container.ListBlobs(params) obj, err := be.container.ListBlobs(params)
be.sem.ReleaseToken()
if err != nil { if err != nil {
return return
} }

View file

@ -137,31 +137,6 @@ func (be *b2Backend) Location() string {
return be.cfg.Bucket return be.cfg.Bucket
} }
// wrapReader wraps an io.ReadCloser to run an additional function on Close.
type wrapReader struct {
io.ReadCloser
eofSeen bool
f func()
}
func (wr *wrapReader) Read(p []byte) (int, error) {
if wr.eofSeen {
return 0, io.EOF
}
n, err := wr.ReadCloser.Read(p)
if err == io.EOF {
wr.eofSeen = true
}
return n, err
}
func (wr *wrapReader) Close() error {
err := wr.ReadCloser.Close()
wr.f()
return err
}
// IsNotExist returns true if the error is caused by a non-existing file. // IsNotExist returns true if the error is caused by a non-existing file.
func (be *b2Backend) IsNotExist(err error) bool { func (be *b2Backend) IsNotExist(err error) bool {
return b2.IsNotExist(errors.Cause(err)) return b2.IsNotExist(errors.Cause(err))
@ -192,14 +167,7 @@ func (be *b2Backend) Load(ctx context.Context, h restic.Handle, length int, offs
if offset == 0 && length == 0 { if offset == 0 && length == 0 {
rd := obj.NewReader(ctx) rd := obj.NewReader(ctx)
wrapper := &wrapReader{ return be.sem.ReleaseTokenOnClose(rd, cancel), nil
ReadCloser: rd,
f: func() {
cancel()
be.sem.ReleaseToken()
},
}
return wrapper, nil
} }
// pass a negative length to NewRangeReader so that the remainder of the // pass a negative length to NewRangeReader so that the remainder of the
@ -209,14 +177,7 @@ func (be *b2Backend) Load(ctx context.Context, h restic.Handle, length int, offs
} }
rd := obj.NewRangeReader(ctx, offset, int64(length)) rd := obj.NewRangeReader(ctx, offset, int64(length))
wrapper := &wrapReader{ return be.sem.ReleaseTokenOnClose(rd, cancel), nil
ReadCloser: rd,
f: func() {
cancel()
be.sem.ReleaseToken()
},
}
return wrapper, nil
} }
// Save stores data in the backend at the handle. // Save stores data in the backend at the handle.

View file

@ -204,14 +204,15 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd io.Reader) (err
debug.Log("Save %v at %v", h, objName) debug.Log("Save %v at %v", h, objName)
be.sem.GetToken()
// Check key does not already exist // Check key does not already exist
if _, err := be.service.Objects.Get(be.bucketName, objName).Do(); err == nil { if _, err := be.service.Objects.Get(be.bucketName, objName).Do(); err == nil {
debug.Log("%v already exists", h) debug.Log("%v already exists", h)
be.sem.ReleaseToken()
return errors.New("key already exists") return errors.New("key already exists")
} }
be.sem.GetToken()
debug.Log("InsertObject(%v, %v)", be.bucketName, objName) debug.Log("InsertObject(%v, %v)", be.bucketName, objName)
// Set chunk size to zero to disable resumable uploads. // Set chunk size to zero to disable resumable uploads.
@ -323,7 +324,10 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf
objName := be.Filename(h) objName := be.Filename(h)
be.sem.GetToken()
obj, err := be.service.Objects.Get(be.bucketName, objName).Do() obj, err := be.service.Objects.Get(be.bucketName, objName).Do()
be.sem.ReleaseToken()
if err != nil { if err != nil {
debug.Log("GetObject() err %v", err) debug.Log("GetObject() err %v", err)
return restic.FileInfo{}, errors.Wrap(err, "service.Objects.Get") return restic.FileInfo{}, errors.Wrap(err, "service.Objects.Get")
@ -336,7 +340,11 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf
func (be *Backend) Test(ctx context.Context, h restic.Handle) (bool, error) { func (be *Backend) Test(ctx context.Context, h restic.Handle) (bool, error) {
found := false found := false
objName := be.Filename(h) objName := be.Filename(h)
be.sem.GetToken()
_, err := be.service.Objects.Get(be.bucketName, objName).Do() _, err := be.service.Objects.Get(be.bucketName, objName).Do()
be.sem.ReleaseToken()
if err == nil { if err == nil {
found = true found = true
} }
@ -348,7 +356,10 @@ func (be *Backend) Test(ctx context.Context, h restic.Handle) (bool, error) {
func (be *Backend) Remove(ctx context.Context, h restic.Handle) error { func (be *Backend) Remove(ctx context.Context, h restic.Handle) error {
objName := be.Filename(h) objName := be.Filename(h)
be.sem.GetToken()
err := be.service.Objects.Delete(be.bucketName, objName).Do() err := be.service.Objects.Delete(be.bucketName, objName).Do()
be.sem.ReleaseToken()
if er, ok := err.(*googleapi.Error); ok { if er, ok := err.(*googleapi.Error); ok {
if er.Code == 404 { if er.Code == 404 {
err = nil err = nil
@ -378,7 +389,10 @@ func (be *Backend) List(ctx context.Context, t restic.FileType) <-chan string {
listReq := be.service.Objects.List(be.bucketName).Prefix(prefix).MaxResults(int64(be.listMaxItems)) listReq := be.service.Objects.List(be.bucketName).Prefix(prefix).MaxResults(int64(be.listMaxItems))
for { for {
be.sem.GetToken()
obj, err := listReq.Do() obj, err := listReq.Do()
be.sem.ReleaseToken()
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "error listing %v: %v\n", prefix, err) fmt.Fprintf(os.Stderr, "error listing %v: %v\n", prefix, err)
return return

View file

@ -263,6 +263,9 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd io.Reader) (err
objName := be.Filename(h) objName := be.Filename(h)
be.sem.GetToken()
defer be.sem.ReleaseToken()
// Check key does not already exist // Check key does not already exist
_, err = be.client.StatObject(be.cfg.Bucket, objName) _, err = be.client.StatObject(be.cfg.Bucket, objName)
if err == nil { if err == nil {
@ -282,10 +285,8 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd io.Reader) (err
debug.Log("reader is %#T, no specific workaround enabled", rd) debug.Log("reader is %#T, no specific workaround enabled", rd)
} }
be.sem.GetToken()
debug.Log("PutObject(%v, %v)", be.cfg.Bucket, objName) debug.Log("PutObject(%v, %v)", be.cfg.Bucket, objName)
n, err := be.client.PutObject(be.cfg.Bucket, objName, rd, "application/octet-stream") n, err := be.client.PutObject(be.cfg.Bucket, objName, rd, "application/octet-stream")
be.sem.ReleaseToken()
debug.Log("%v -> %v bytes, err %#v: %v", objName, n, err, err) debug.Log("%v -> %v bytes, err %#v: %v", objName, n, err, err)
@ -358,15 +359,18 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf
objName := be.Filename(h) objName := be.Filename(h)
var obj *minio.Object var obj *minio.Object
be.sem.GetToken()
obj, err = be.client.GetObject(be.cfg.Bucket, objName) obj, err = be.client.GetObject(be.cfg.Bucket, objName)
if err != nil { if err != nil {
debug.Log("GetObject() err %v", err) debug.Log("GetObject() err %v", err)
be.sem.ReleaseToken()
return restic.FileInfo{}, errors.Wrap(err, "client.GetObject") return restic.FileInfo{}, errors.Wrap(err, "client.GetObject")
} }
// make sure that the object is closed properly. // make sure that the object is closed properly.
defer func() { defer func() {
e := obj.Close() e := obj.Close()
be.sem.ReleaseToken()
if err == nil { if err == nil {
err = errors.Wrap(e, "Close") err = errors.Wrap(e, "Close")
} }
@ -385,7 +389,11 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf
func (be *Backend) Test(ctx context.Context, h restic.Handle) (bool, error) { func (be *Backend) Test(ctx context.Context, h restic.Handle) (bool, error) {
found := false found := false
objName := be.Filename(h) objName := be.Filename(h)
be.sem.GetToken()
_, err := be.client.StatObject(be.cfg.Bucket, objName) _, err := be.client.StatObject(be.cfg.Bucket, objName)
be.sem.ReleaseToken()
if err == nil { if err == nil {
found = true found = true
} }
@ -397,7 +405,11 @@ func (be *Backend) Test(ctx context.Context, h restic.Handle) (bool, error) {
// Remove removes the blob with the given name and type. // Remove removes the blob with the given name and type.
func (be *Backend) Remove(ctx context.Context, h restic.Handle) error { func (be *Backend) Remove(ctx context.Context, h restic.Handle) error {
objName := be.Filename(h) objName := be.Filename(h)
be.sem.GetToken()
err := be.client.RemoveObject(be.cfg.Bucket, objName) err := be.client.RemoveObject(be.cfg.Bucket, objName)
be.sem.ReleaseToken()
debug.Log("Remove(%v) at %v -> err %v", h, objName, err) debug.Log("Remove(%v) at %v -> err %v", h, objName, err)
if be.IsNotExist(err) { if be.IsNotExist(err) {
@ -421,6 +433,9 @@ func (be *Backend) List(ctx context.Context, t restic.FileType) <-chan string {
prefix += "/" prefix += "/"
} }
// NB: unfortunately we can't protect this with be.sem.GetToken() here.
// Doing so would enable a deadlock situation (gh-1399), as ListObjects()
// starts its own goroutine and returns results via a channel.
listresp := be.client.ListObjects(be.cfg.Bucket, prefix, true, ctx.Done()) listresp := be.client.ListObjects(be.cfg.Bucket, prefix, true, ctx.Done())
go func() { go func() {

View file

@ -1,6 +1,10 @@
package backend package backend
import "github.com/restic/restic/internal/errors" import (
"context"
"github.com/restic/restic/internal/errors"
"io"
)
// Semaphore limits access to a restricted resource. // Semaphore limits access to a restricted resource.
type Semaphore struct { type Semaphore struct {
@ -26,3 +30,39 @@ func (s *Semaphore) GetToken() {
func (s *Semaphore) ReleaseToken() { func (s *Semaphore) ReleaseToken() {
<-s.ch <-s.ch
} }
// ReleaseTokenOnClose wraps an io.ReadCloser to return a token on Close. Before returning the token,
// cancel, if provided, will be run to free up context resources.
func (s *Semaphore) ReleaseTokenOnClose(rc io.ReadCloser, cancel context.CancelFunc) io.ReadCloser {
return &wrapReader{rc, false, func() {
if cancel != nil {
cancel()
}
s.ReleaseToken()
}}
}
// wrapReader wraps an io.ReadCloser to run an additional function on Close.
type wrapReader struct {
io.ReadCloser
eofSeen bool
f func()
}
func (wr *wrapReader) Read(p []byte) (int, error) {
if wr.eofSeen {
return 0, io.EOF
}
n, err := wr.ReadCloser.Read(p)
if err == io.EOF {
wr.eofSeen = true
}
return n, err
}
func (wr *wrapReader) Close() error {
err := wr.ReadCloser.Close()
wr.f()
return err
}

View file

@ -129,11 +129,6 @@ func (be *beSwift) Load(ctx context.Context, h restic.Handle, length int, offset
objName := be.Filename(h) objName := be.Filename(h)
be.sem.GetToken()
defer func() {
be.sem.ReleaseToken()
}()
headers := swift.Headers{} headers := swift.Headers{}
if offset > 0 { if offset > 0 {
headers["Range"] = fmt.Sprintf("bytes=%d-", offset) headers["Range"] = fmt.Sprintf("bytes=%d-", offset)
@ -147,13 +142,15 @@ func (be *beSwift) Load(ctx context.Context, h restic.Handle, length int, offset
debug.Log("Load(%v) send range %v", h, headers["Range"]) debug.Log("Load(%v) send range %v", h, headers["Range"])
} }
be.sem.GetToken()
obj, _, err := be.conn.ObjectOpen(be.container, objName, false, headers) obj, _, err := be.conn.ObjectOpen(be.container, objName, false, headers)
if err != nil { if err != nil {
debug.Log(" err %v", err) debug.Log(" err %v", err)
be.sem.ReleaseToken()
return nil, errors.Wrap(err, "conn.ObjectOpen") return nil, errors.Wrap(err, "conn.ObjectOpen")
} }
return obj, nil return be.sem.ReleaseTokenOnClose(obj, nil), nil
} }
// Save stores data in the backend at the handle. // Save stores data in the backend at the handle.
@ -166,6 +163,9 @@ func (be *beSwift) Save(ctx context.Context, h restic.Handle, rd io.Reader) (err
debug.Log("Save %v at %v", h, objName) debug.Log("Save %v at %v", h, objName)
be.sem.GetToken()
defer be.sem.ReleaseToken()
// Check key does not already exist // Check key does not already exist
switch _, _, err = be.conn.Object(be.container, objName); err { switch _, _, err = be.conn.Object(be.container, objName); err {
case nil: case nil:
@ -179,11 +179,6 @@ func (be *beSwift) Save(ctx context.Context, h restic.Handle, rd io.Reader) (err
return errors.Wrap(err, "conn.Object") return errors.Wrap(err, "conn.Object")
} }
be.sem.GetToken()
defer func() {
be.sem.ReleaseToken()
}()
encoding := "binary/octet-stream" encoding := "binary/octet-stream"
debug.Log("PutObject(%v, %v, %v)", be.container, objName, encoding) debug.Log("PutObject(%v, %v, %v)", be.container, objName, encoding)
@ -199,6 +194,9 @@ func (be *beSwift) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf
objName := be.Filename(h) objName := be.Filename(h)
be.sem.GetToken()
defer be.sem.ReleaseToken()
obj, _, err := be.conn.Object(be.container, objName) obj, _, err := be.conn.Object(be.container, objName)
if err != nil { if err != nil {
debug.Log("Object() err %v", err) debug.Log("Object() err %v", err)
@ -211,6 +209,10 @@ func (be *beSwift) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf
// Test returns true if a blob of the given type and name exists in the backend. // Test returns true if a blob of the given type and name exists in the backend.
func (be *beSwift) Test(ctx context.Context, h restic.Handle) (bool, error) { func (be *beSwift) Test(ctx context.Context, h restic.Handle) (bool, error) {
objName := be.Filename(h) objName := be.Filename(h)
be.sem.GetToken()
defer be.sem.ReleaseToken()
switch _, _, err := be.conn.Object(be.container, objName); err { switch _, _, err := be.conn.Object(be.container, objName); err {
case nil: case nil:
return true, nil return true, nil
@ -226,6 +228,10 @@ func (be *beSwift) Test(ctx context.Context, h restic.Handle) (bool, error) {
// Remove removes the blob with the given name and type. // Remove removes the blob with the given name and type.
func (be *beSwift) Remove(ctx context.Context, h restic.Handle) error { func (be *beSwift) Remove(ctx context.Context, h restic.Handle) error {
objName := be.Filename(h) objName := be.Filename(h)
be.sem.GetToken()
defer be.sem.ReleaseToken()
err := be.conn.ObjectDelete(be.container, objName) err := be.conn.ObjectDelete(be.container, objName)
debug.Log("Remove(%v) -> err %v", h, err) debug.Log("Remove(%v) -> err %v", h, err)
return errors.Wrap(err, "conn.ObjectDelete") return errors.Wrap(err, "conn.ObjectDelete")
@ -245,7 +251,10 @@ func (be *beSwift) List(ctx context.Context, t restic.FileType) <-chan string {
err := be.conn.ObjectsWalk(be.container, &swift.ObjectsOpts{Prefix: prefix}, err := be.conn.ObjectsWalk(be.container, &swift.ObjectsOpts{Prefix: prefix},
func(opts *swift.ObjectsOpts) (interface{}, error) { func(opts *swift.ObjectsOpts) (interface{}, error) {
be.sem.GetToken()
newObjects, err := be.conn.ObjectNames(be.container, opts) newObjects, err := be.conn.ObjectNames(be.container, opts)
be.sem.ReleaseToken()
if err != nil { if err != nil {
return nil, errors.Wrap(err, "conn.ObjectNames") return nil, errors.Wrap(err, "conn.ObjectNames")
} }