1f9c962183
Before this change, some parts of operations called the Open method on objects directly, and some called NewReOpen to make an object which can re-open itself on errors. This adds a new function operations.Open which should be called instead of fs.Object.Open to open a reliable stream of data and changes all call sites to use that. This means `rclone check --download` and `rclone cat` will re-open files on failures. See: https://forum.rclone.org/t/does-rclone-support-retries-for-check-when-using-download-flag/38641
164 lines
4.3 KiB
Go
164 lines
4.3 KiB
Go
package operations
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"io"
|
|
"sync"
|
|
|
|
"github.com/rclone/rclone/fs"
|
|
"github.com/rclone/rclone/fs/fserrors"
|
|
)
|
|
|
|
// ReOpen is a wrapper for an object reader which reopens the stream on error
|
|
type ReOpen struct {
|
|
ctx context.Context
|
|
mu sync.Mutex // mutex to protect the below
|
|
src fs.Object // object to open
|
|
options []fs.OpenOption // option to pass to initial open
|
|
rc io.ReadCloser // underlying stream
|
|
read int64 // number of bytes read from this stream
|
|
maxTries int // maximum number of retries
|
|
tries int // number of retries we've had so far in this stream
|
|
err error // if this is set then Read/Close calls will return it
|
|
opened bool // if set then rc is valid and needs closing
|
|
}
|
|
|
|
var (
|
|
errorFileClosed = errors.New("file already closed")
|
|
errorTooManyTries = errors.New("failed to reopen: too many retries")
|
|
)
|
|
|
|
// NewReOpen makes a handle which will reopen itself and seek to where
|
|
// it was on errors up to maxTries times.
|
|
//
|
|
// If an fs.HashesOption is set this will be applied when reading from
|
|
// the start.
|
|
//
|
|
// If an fs.RangeOption is set then this will applied when reading from
|
|
// the start, and updated on retries.
|
|
func NewReOpen(ctx context.Context, src fs.Object, maxTries int, options ...fs.OpenOption) (rc io.ReadCloser, err error) {
|
|
h := &ReOpen{
|
|
ctx: ctx,
|
|
src: src,
|
|
maxTries: maxTries,
|
|
options: options,
|
|
}
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
err = h.open()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return h, nil
|
|
}
|
|
|
|
// Open makes a handle which will reopen itself and seek to where it
|
|
// was on errors.
|
|
//
|
|
// If an fs.HashesOption is set this will be applied when reading from
|
|
// the start.
|
|
//
|
|
// If an fs.RangeOption is set then this will applied when reading from
|
|
// the start, and updated on retries.
|
|
//
|
|
// It will obey LowLevelRetries in the ctx as the maximum number of
|
|
// tries.
|
|
//
|
|
// Use this instead of calling the Open method on fs.Objects
|
|
func Open(ctx context.Context, src fs.Object, options ...fs.OpenOption) (rc io.ReadCloser, err error) {
|
|
maxTries := fs.GetConfig(ctx).LowLevelRetries
|
|
return NewReOpen(ctx, src, maxTries, options...)
|
|
}
|
|
|
|
// open the underlying handle - call with lock held
|
|
//
|
|
// we don't retry here as the Open() call will itself have low level retries
|
|
func (h *ReOpen) open() error {
|
|
opts := []fs.OpenOption{}
|
|
var hashOption *fs.HashesOption
|
|
var rangeOption *fs.RangeOption
|
|
for _, option := range h.options {
|
|
switch option := option.(type) {
|
|
case *fs.HashesOption:
|
|
hashOption = option
|
|
case *fs.RangeOption:
|
|
rangeOption = option
|
|
case *fs.HTTPOption:
|
|
opts = append(opts, option)
|
|
default:
|
|
if option.Mandatory() {
|
|
fs.Logf(h.src, "Unsupported mandatory option: %v", option)
|
|
}
|
|
}
|
|
}
|
|
if h.read == 0 {
|
|
if rangeOption != nil {
|
|
opts = append(opts, rangeOption)
|
|
}
|
|
if hashOption != nil {
|
|
// put hashOption on if reading from the start, ditch otherwise
|
|
opts = append(opts, hashOption)
|
|
}
|
|
} else {
|
|
if rangeOption != nil {
|
|
// range to the read point
|
|
opts = append(opts, &fs.RangeOption{Start: rangeOption.Start + h.read, End: rangeOption.End})
|
|
} else {
|
|
// seek to the read point
|
|
opts = append(opts, &fs.SeekOption{Offset: h.read})
|
|
}
|
|
}
|
|
h.tries++
|
|
if h.tries > h.maxTries {
|
|
h.err = errorTooManyTries
|
|
} else {
|
|
h.rc, h.err = h.src.Open(h.ctx, opts...)
|
|
}
|
|
if h.err != nil {
|
|
if h.tries > 1 {
|
|
fs.Debugf(h.src, "Reopen failed after %d bytes read: %v", h.read, h.err)
|
|
}
|
|
return h.err
|
|
}
|
|
h.opened = true
|
|
return nil
|
|
}
|
|
|
|
// Read bytes retrying as necessary
|
|
func (h *ReOpen) Read(p []byte) (n int, err error) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
if h.err != nil {
|
|
// return a previous error if there is one
|
|
return n, h.err
|
|
}
|
|
n, err = h.rc.Read(p)
|
|
if err != nil {
|
|
h.err = err
|
|
}
|
|
h.read += int64(n)
|
|
if err != nil && err != io.EOF && !fserrors.IsNoLowLevelRetryError(err) {
|
|
// close underlying stream
|
|
h.opened = false
|
|
_ = h.rc.Close()
|
|
// reopen stream, clearing error if successful
|
|
fs.Debugf(h.src, "Reopening on read failure after %d bytes: retry %d/%d: %v", h.read, h.tries, h.maxTries, err)
|
|
if h.open() == nil {
|
|
err = nil
|
|
}
|
|
}
|
|
return n, err
|
|
}
|
|
|
|
// Close the stream
|
|
func (h *ReOpen) Close() error {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
if !h.opened {
|
|
return errorFileClosed
|
|
}
|
|
h.opened = false
|
|
h.err = errorFileClosed
|
|
return h.rc.Close()
|
|
}
|