operations: reopen downloads on error when using check --download and cat

Before this change, some parts of operations called the Open method on
objects directly, and some called NewReOpen to make an object which
can re-open itself on errors.

This adds a new function operations.Open which should be called
instead of fs.Object.Open to open a reliable stream of data and
changes all call sites to use that.

This means `rclone check --download` and `rclone cat` will re-open
files on failures.

See: https://forum.rclone.org/t/does-rclone-support-retries-for-check-when-using-download-flag/38641
This commit is contained in:
Nick Craig-Wood 2023-06-01 12:54:19 +01:00
parent 279d9ecc56
commit 1f9c962183
4 changed files with 32 additions and 13 deletions

View file

@ -335,7 +335,7 @@ func CheckIdenticalDownload(ctx context.Context, dst, src fs.Object) (differ boo
// Does the work for CheckIdenticalDownload // Does the work for CheckIdenticalDownload
func checkIdenticalDownload(ctx context.Context, dst, src fs.Object) (differ bool, err error) { func checkIdenticalDownload(ctx context.Context, dst, src fs.Object) (differ bool, err error) {
in1, err := dst.Open(ctx) in1, err := Open(ctx, dst)
if err != nil { if err != nil {
return true, fmt.Errorf("failed to open %q: %w", dst, err) return true, fmt.Errorf("failed to open %q: %w", dst, err)
} }
@ -345,7 +345,7 @@ func checkIdenticalDownload(ctx context.Context, dst, src fs.Object) (differ boo
}() }()
in1 = tr1.Account(ctx, in1).WithBuffer() // account and buffer the transfer in1 = tr1.Account(ctx, in1).WithBuffer() // account and buffer the transfer
in2, err := src.Open(ctx) in2, err := Open(ctx, src)
if err != nil { if err != nil {
return true, fmt.Errorf("failed to open %q: %w", src, err) return true, fmt.Errorf("failed to open %q: %w", src, err)
} }
@ -483,7 +483,7 @@ func (c *checkMarch) checkSum(ctx context.Context, obj fs.Object, download bool,
<-c.tokens // get the token back to free up a slot <-c.tokens // get the token back to free up a slot
c.wg.Done() c.wg.Done()
}() }()
if in, err = obj.Open(ctx); err != nil { if in, err = Open(ctx, obj); err != nil {
return return
} }
tr := accounting.Stats(ctx).NewTransfer(obj) tr := accounting.Stats(ctx).NewTransfer(obj)
@ -538,7 +538,7 @@ type HashSums map[string]string
// ParseSumFile parses a hash SUM file and returns hashes as a map // ParseSumFile parses a hash SUM file and returns hashes as a map
func ParseSumFile(ctx context.Context, sumFile fs.Object) (HashSums, error) { func ParseSumFile(ctx context.Context, sumFile fs.Object) (HashSums, error) {
rd, err := sumFile.Open(ctx) rd, err := Open(ctx, sumFile)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -62,7 +62,6 @@ type multiThreadCopyState struct {
// Copy a single stream into place // Copy a single stream into place
func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int) (err error) { func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int) (err error) {
ci := fs.GetConfig(ctx)
defer func() { defer func() {
if err != nil { if err != nil {
fs.Debugf(mc.src, "multi-thread copy: stream %d/%d failed: %v", stream+1, mc.streams, err) fs.Debugf(mc.src, "multi-thread copy: stream %d/%d failed: %v", stream+1, mc.streams, err)
@ -79,7 +78,7 @@ func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int) (err
fs.Debugf(mc.src, "multi-thread copy: stream %d/%d (%d-%d) size %v starting", stream+1, mc.streams, start, end, fs.SizeSuffix(end-start)) fs.Debugf(mc.src, "multi-thread copy: stream %d/%d (%d-%d) size %v starting", stream+1, mc.streams, start, end, fs.SizeSuffix(end-start))
rc, err := NewReOpen(ctx, mc.src, ci.LowLevelRetries, &fs.RangeOption{Start: start, End: end - 1}) rc, err := Open(ctx, mc.src, &fs.RangeOption{Start: start, End: end - 1})
if err != nil { if err != nil {
return fmt.Errorf("multipart copy: failed to open source: %w", err) return fmt.Errorf("multipart copy: failed to open source: %w", err)
} }

View file

@ -421,7 +421,7 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
for _, option := range ci.DownloadHeaders { for _, option := range ci.DownloadHeaders {
options = append(options, option) options = append(options, option)
} }
in0, err = NewReOpen(ctx, src, ci.LowLevelRetries, options...) in0, err = Open(ctx, src, options...)
if err != nil { if err != nil {
err = fmt.Errorf("failed to open source object: %w", err) err = fmt.Errorf("failed to open source object: %w", err)
} else { } else {
@ -1026,7 +1026,7 @@ func hashSum(ctx context.Context, ht hash.Type, base64Encoded bool, downloadFlag
for _, option := range fs.GetConfig(ctx).DownloadHeaders { for _, option := range fs.GetConfig(ctx).DownloadHeaders {
options = append(options, option) options = append(options, option)
} }
in, err := NewReOpen(ctx, o, fs.GetConfig(ctx).LowLevelRetries, options...) in, err := Open(ctx, o, options...)
if err != nil { if err != nil {
return "ERROR", fmt.Errorf("failed to open file %v: %w", o, err) return "ERROR", fmt.Errorf("failed to open file %v: %w", o, err)
} }
@ -1326,7 +1326,7 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64, sep []b
for _, option := range ci.DownloadHeaders { for _, option := range ci.DownloadHeaders {
options = append(options, option) options = append(options, option)
} }
in, err := o.Open(ctx, options...) in, err := Open(ctx, o, options...)
if err != nil { if err != nil {
err = fs.CountError(err) err = fs.CountError(err)
fs.Errorf(o, "Failed to open: %v", err) fs.Errorf(o, "Failed to open: %v", err)

View file

@ -29,12 +29,14 @@ var (
errorTooManyTries = errors.New("failed to reopen: too many retries") errorTooManyTries = errors.New("failed to reopen: too many retries")
) )
// NewReOpen makes a handle which will reopen itself and seek to where it was on errors // NewReOpen makes a handle which will reopen itself and seek to where
// it was on errors up to maxTries times.
// //
// If hashOption is set this will be applied when reading from the start. // If an fs.HashesOption is set this will be applied when reading from
// the start.
// //
// If rangeOption is set then this will applied when reading from the // If an fs.RangeOption is set then this will applied when reading from
// start, and updated on retries. // the start, and updated on retries.
func NewReOpen(ctx context.Context, src fs.Object, maxTries int, options ...fs.OpenOption) (rc io.ReadCloser, err error) { func NewReOpen(ctx context.Context, src fs.Object, maxTries int, options ...fs.OpenOption) (rc io.ReadCloser, err error) {
h := &ReOpen{ h := &ReOpen{
ctx: ctx, ctx: ctx,
@ -51,6 +53,24 @@ func NewReOpen(ctx context.Context, src fs.Object, maxTries int, options ...fs.O
return h, nil return h, nil
} }
// Open makes a handle which will reopen itself and seek to where it
// was on errors.
//
// If an fs.HashesOption is set this will be applied when reading from
// the start.
//
// If an fs.RangeOption is set then this will applied when reading from
// the start, and updated on retries.
//
// It will obey LowLevelRetries in the ctx as the maximum number of
// tries.
//
// Use this instead of calling the Open method on fs.Objects
func Open(ctx context.Context, src fs.Object, options ...fs.OpenOption) (rc io.ReadCloser, err error) {
maxTries := fs.GetConfig(ctx).LowLevelRetries
return NewReOpen(ctx, src, maxTries, options...)
}
// open the underlying handle - call with lock held // open the underlying handle - call with lock held
// //
// we don't retry here as the Open() call will itself have low level retries // we don't retry here as the Open() call will itself have low level retries