backends: clean reader closing
This commit is contained in:
parent
7d55b4f95e
commit
0b258cc054
3 changed files with 9 additions and 63 deletions
|
@ -226,18 +226,6 @@ func (be *Backend) saveLarge(ctx context.Context, objName string, rd restic.Rewi
|
||||||
return errors.Wrap(err, "PutBlockList")
|
return errors.Wrap(err, "PutBlockList")
|
||||||
}
|
}
|
||||||
|
|
||||||
// wrapReader wraps an io.ReadCloser to run an additional function on Close.
|
|
||||||
type wrapReader struct {
|
|
||||||
io.ReadCloser
|
|
||||||
f func()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (wr wrapReader) Close() error {
|
|
||||||
err := wr.ReadCloser.Close()
|
|
||||||
wr.f()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Load runs fn with a reader that yields the contents of the file at h at the
|
// Load runs fn with a reader that yields the contents of the file at h at the
|
||||||
// given offset.
|
// given offset.
|
||||||
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||||
|
@ -278,15 +266,7 @@ func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int,
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
closeRd := wrapReader{
|
return be.sem.ReleaseTokenOnClose(rd, nil), err
|
||||||
ReadCloser: rd,
|
|
||||||
f: func() {
|
|
||||||
debug.Log("Close()")
|
|
||||||
be.sem.ReleaseToken()
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
return closeRd, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stat returns information about a blob.
|
// Stat returns information about a blob.
|
||||||
|
|
|
@ -263,18 +263,6 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// wrapReader wraps an io.ReadCloser to run an additional function on Close.
|
|
||||||
type wrapReader struct {
|
|
||||||
io.ReadCloser
|
|
||||||
f func()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (wr wrapReader) Close() error {
|
|
||||||
err := wr.ReadCloser.Close()
|
|
||||||
wr.f()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Load runs fn with a reader that yields the contents of the file at h at the
|
// Load runs fn with a reader that yields the contents of the file at h at the
|
||||||
// given offset.
|
// given offset.
|
||||||
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||||
|
@ -303,21 +291,16 @@ func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int,
|
||||||
|
|
||||||
be.sem.GetToken()
|
be.sem.GetToken()
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
|
||||||
r, err := be.bucket.Object(objName).NewRangeReader(ctx, offset, int64(length))
|
r, err := be.bucket.Object(objName).NewRangeReader(ctx, offset, int64(length))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
cancel()
|
||||||
be.sem.ReleaseToken()
|
be.sem.ReleaseToken()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
closeRd := wrapReader{
|
return be.sem.ReleaseTokenOnClose(r, cancel), err
|
||||||
ReadCloser: r,
|
|
||||||
f: func() {
|
|
||||||
debug.Log("Close()")
|
|
||||||
be.sem.ReleaseToken()
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
return closeRd, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stat returns information about a blob.
|
// Stat returns information about a blob.
|
||||||
|
|
|
@ -301,18 +301,6 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe
|
||||||
return errors.Wrap(err, "client.PutObject")
|
return errors.Wrap(err, "client.PutObject")
|
||||||
}
|
}
|
||||||
|
|
||||||
// wrapReader wraps an io.ReadCloser to run an additional function on Close.
|
|
||||||
type wrapReader struct {
|
|
||||||
io.ReadCloser
|
|
||||||
f func()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (wr wrapReader) Close() error {
|
|
||||||
err := wr.ReadCloser.Close()
|
|
||||||
wr.f()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Load runs fn with a reader that yields the contents of the file at h at the
|
// Load runs fn with a reader that yields the contents of the file at h at the
|
||||||
// given offset.
|
// given offset.
|
||||||
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||||
|
@ -350,22 +338,17 @@ func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int,
|
||||||
}
|
}
|
||||||
|
|
||||||
be.sem.GetToken()
|
be.sem.GetToken()
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
|
||||||
coreClient := minio.Core{Client: be.client}
|
coreClient := minio.Core{Client: be.client}
|
||||||
rd, _, _, err := coreClient.GetObject(ctx, be.cfg.Bucket, objName, opts)
|
rd, _, _, err := coreClient.GetObject(ctx, be.cfg.Bucket, objName, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
cancel()
|
||||||
be.sem.ReleaseToken()
|
be.sem.ReleaseToken()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
closeRd := wrapReader{
|
return be.sem.ReleaseTokenOnClose(rd, cancel), err
|
||||||
ReadCloser: rd,
|
|
||||||
f: func() {
|
|
||||||
debug.Log("Close()")
|
|
||||||
be.sem.ReleaseToken()
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
return closeRd, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stat returns information about a blob.
|
// Stat returns information about a blob.
|
||||||
|
|
Loading…
Reference in a new issue