93955b755f
Before this fix if more than one retry happened on a file that rclone had opened for read with a backend that uses fs.FixRangeOption then rclone would read too much data and the transfer would fail. Backends affected: - azureblob, azurefiles, b2, box, dropbox, fichier, filefabric - googlecloudstorage, hidrive, imagekit, jottacloud, koofr, netstorage - onedrive, opendrive, oracleobjectstorage, pikpak, premiumizeme - protondrive, qingstor, quatrix, s3, sharefile, sugarsync, swift - uptobox, webdav, zoho This was because rclone was emitting Range requests for the wrong data range on the second and subsequent retries. This was caused by fs.FixRangeOption modifying the options and the reopen code relying on them not being modified. This fix makes a copy of the fs.FixRangeOption in the reopen code to fix the problem. In future it might be best to change fs.FixRangeOption so it returns a new options slice. Fixes #7759
316 lines
9 KiB
Go
316 lines
9 KiB
Go
package operations
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"io"
|
|
"sync"
|
|
|
|
"github.com/rclone/rclone/fs"
|
|
"github.com/rclone/rclone/fs/fserrors"
|
|
)
|
|
|
|
// AccountFn is a function which will be called after every read
|
|
// from the ReOpen.
|
|
//
|
|
// It may return an error which will be passed back to the user.
|
|
type AccountFn func(n int) error
|
|
|
|
// ReOpen is a wrapper for an object reader which reopens the stream on error
|
|
type ReOpen struct {
|
|
ctx context.Context
|
|
mu sync.Mutex // mutex to protect the below
|
|
src fs.Object // object to open
|
|
baseOptions []fs.OpenOption // options to pass to initial open and where offset == 0
|
|
options []fs.OpenOption // option to pass on subsequent opens where offset != 0
|
|
rangeOption fs.RangeOption // adjust this range option on re-opens
|
|
rc io.ReadCloser // underlying stream
|
|
size int64 // total size of object - can be -ve
|
|
start int64 // absolute position to start reading from
|
|
end int64 // absolute position to end reading (exclusive)
|
|
offset int64 // offset in the file we are at, offset from start
|
|
newOffset int64 // if different to offset, reopen needed
|
|
maxTries int // maximum number of retries
|
|
tries int // number of retries we've had so far in this stream
|
|
err error // if this is set then Read/Close calls will return it
|
|
opened bool // if set then rc is valid and needs closing
|
|
account AccountFn // account for a read
|
|
reads int // count how many times the data has been read
|
|
accountOn int // only account on or after this read
|
|
}
|
|
|
|
var (
|
|
errFileClosed = errors.New("file already closed")
|
|
errTooManyTries = errors.New("failed to reopen: too many retries")
|
|
errInvalidWhence = errors.New("reopen Seek: invalid whence")
|
|
errNegativeSeek = errors.New("reopen Seek: negative position")
|
|
errSeekPastEnd = errors.New("reopen Seek: attempt to seek past end of data")
|
|
errBadEndSeek = errors.New("reopen Seek: can't seek from end with unknown sized object")
|
|
)
|
|
|
|
// NewReOpen makes a handle which will reopen itself and seek to where
|
|
// it was on errors up to maxTries times.
|
|
//
|
|
// If an fs.HashesOption is set this will be applied when reading from
|
|
// the start.
|
|
//
|
|
// If an fs.RangeOption is set then this will applied when reading from
|
|
// the start, and updated on retries.
|
|
func NewReOpen(ctx context.Context, src fs.Object, maxTries int, options ...fs.OpenOption) (rc *ReOpen, err error) {
|
|
h := &ReOpen{
|
|
ctx: ctx,
|
|
src: src,
|
|
maxTries: maxTries,
|
|
baseOptions: options,
|
|
size: src.Size(),
|
|
start: 0,
|
|
offset: 0,
|
|
newOffset: -1, // -1 means no seek required
|
|
}
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
|
|
// Filter the options for subsequent opens
|
|
h.options = make([]fs.OpenOption, 0, len(options)+1)
|
|
var limit int64 = -1
|
|
for _, option := range options {
|
|
switch x := option.(type) {
|
|
case *fs.HashesOption:
|
|
// leave hash option out when ranging
|
|
case *fs.RangeOption:
|
|
h.start, limit = x.Decode(h.end)
|
|
case *fs.SeekOption:
|
|
h.start, limit = x.Offset, -1
|
|
default:
|
|
h.options = append(h.options, option)
|
|
}
|
|
}
|
|
|
|
// Put our RangeOption on the end
|
|
h.rangeOption.Start = h.start
|
|
h.options = append(h.options, &h.rangeOption)
|
|
|
|
// If a size range is set then set the end point of the file to that
|
|
if limit >= 0 && h.size >= 0 {
|
|
h.end = h.start + limit
|
|
h.rangeOption.End = h.end - 1 // remember range options are inclusive
|
|
} else {
|
|
h.end = h.size
|
|
h.rangeOption.End = -1
|
|
}
|
|
|
|
err = h.open()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return h, nil
|
|
}
|
|
|
|
// Open makes a handle which will reopen itself and seek to where it
|
|
// was on errors.
|
|
//
|
|
// If an fs.HashesOption is set this will be applied when reading from
|
|
// the start.
|
|
//
|
|
// If an fs.RangeOption is set then this will applied when reading from
|
|
// the start, and updated on retries.
|
|
//
|
|
// It will obey LowLevelRetries in the ctx as the maximum number of
|
|
// tries.
|
|
//
|
|
// Use this instead of calling the Open method on fs.Objects
|
|
func Open(ctx context.Context, src fs.Object, options ...fs.OpenOption) (rc *ReOpen, err error) {
|
|
maxTries := fs.GetConfig(ctx).LowLevelRetries
|
|
return NewReOpen(ctx, src, maxTries, options...)
|
|
}
|
|
|
|
// open the underlying handle - call with lock held
|
|
//
|
|
// we don't retry here as the Open() call will itself have low level retries
|
|
func (h *ReOpen) open() error {
|
|
var opts []fs.OpenOption
|
|
if h.offset == 0 {
|
|
// if reading from the start using the initial options
|
|
opts = h.baseOptions
|
|
} else {
|
|
// otherwise use the filtered options
|
|
opts = h.options
|
|
// Adjust range start to where we have got to
|
|
h.rangeOption.Start = h.start + h.offset
|
|
}
|
|
// Make a copy of the options as fs.FixRangeOption modifies them :-(
|
|
opts = append(make([]fs.OpenOption, 0, len(opts)), opts...)
|
|
h.tries++
|
|
if h.tries > h.maxTries {
|
|
h.err = errTooManyTries
|
|
} else {
|
|
h.rc, h.err = h.src.Open(h.ctx, opts...)
|
|
}
|
|
if h.err != nil {
|
|
if h.tries > 1 {
|
|
fs.Debugf(h.src, "Reopen failed after offset %d bytes read: %v", h.offset, h.err)
|
|
}
|
|
return h.err
|
|
}
|
|
h.opened = true
|
|
return nil
|
|
}
|
|
|
|
// reopen the underlying handle by closing it and reopening it.
|
|
func (h *ReOpen) reopen() (err error) {
|
|
// close underlying stream if needed
|
|
if h.opened {
|
|
h.opened = false
|
|
_ = h.rc.Close()
|
|
}
|
|
return h.open()
|
|
}
|
|
|
|
// account for n bytes being read
|
|
func (h *ReOpen) accountRead(n int) error {
|
|
if h.account == nil {
|
|
return nil
|
|
}
|
|
// Don't start accounting until we've reached this many reads
|
|
//
|
|
// rw.reads will be 1 the first time this is called
|
|
// rw.accountOn 2 means start accounting on the 2nd read through
|
|
if h.reads >= h.accountOn {
|
|
return h.account(n)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Read bytes retrying as necessary
|
|
func (h *ReOpen) Read(p []byte) (n int, err error) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
if h.err != nil {
|
|
// return a previous error if there is one
|
|
return n, h.err
|
|
}
|
|
|
|
// re-open if seek needed
|
|
if h.newOffset >= 0 {
|
|
if h.offset != h.newOffset {
|
|
fs.Debugf(h.src, "Seek from %d to %d", h.offset, h.newOffset)
|
|
h.offset = h.newOffset
|
|
err = h.reopen()
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
}
|
|
h.newOffset = -1
|
|
}
|
|
|
|
// Read a full buffer
|
|
startOffset := h.offset
|
|
var nn int
|
|
for n < len(p) && err == nil {
|
|
nn, err = h.rc.Read(p[n:])
|
|
n += nn
|
|
h.offset += int64(nn)
|
|
if err != nil && err != io.EOF {
|
|
h.err = err
|
|
if !fserrors.IsNoLowLevelRetryError(err) {
|
|
fs.Debugf(h.src, "Reopening on read failure after offset %d bytes: retry %d/%d: %v", h.offset, h.tries, h.maxTries, err)
|
|
if h.reopen() == nil {
|
|
err = nil
|
|
}
|
|
}
|
|
}
|
|
}
|
|
// Count a read of the data if we read from the start successfully
|
|
if startOffset == 0 && n != 0 {
|
|
h.reads++
|
|
}
|
|
// Account the read
|
|
accErr := h.accountRead(n)
|
|
if err == nil {
|
|
err = accErr
|
|
}
|
|
return n, err
|
|
}
|
|
|
|
// Seek sets the offset for the next Read or Write to offset, interpreted
|
|
// according to whence: SeekStart means relative to the start of the file,
|
|
// SeekCurrent means relative to the current offset, and SeekEnd means relative
|
|
// to the end (for example, offset = -2 specifies the penultimate byte of the
|
|
// file). Seek returns the new offset relative to the start of the file or an
|
|
// error, if any.
|
|
//
|
|
// Seeking to an offset before the start of the file is an error. Seeking
|
|
// to any positive offset may be allowed, but if the new offset exceeds the
|
|
// size of the underlying object the behavior of subsequent I/O operations is
|
|
// implementation-dependent.
|
|
func (h *ReOpen) Seek(offset int64, whence int) (int64, error) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
if h.err != nil {
|
|
// return a previous error if there is one
|
|
return 0, h.err
|
|
}
|
|
var abs int64
|
|
var size = h.end - h.start
|
|
switch whence {
|
|
case io.SeekStart:
|
|
abs = offset
|
|
case io.SeekCurrent:
|
|
if h.newOffset >= 0 {
|
|
abs = h.newOffset + offset
|
|
} else {
|
|
abs = h.offset + offset
|
|
}
|
|
case io.SeekEnd:
|
|
if h.size < 0 {
|
|
return 0, errBadEndSeek
|
|
}
|
|
abs = size + offset
|
|
default:
|
|
return 0, errInvalidWhence
|
|
}
|
|
if abs < 0 {
|
|
return 0, errNegativeSeek
|
|
}
|
|
if h.size >= 0 && abs > size {
|
|
return size, errSeekPastEnd
|
|
}
|
|
|
|
h.tries = 0 // Reset open count on seek
|
|
h.newOffset = abs // New offset - applied in Read
|
|
return abs, nil
|
|
}
|
|
|
|
// Close the stream
|
|
func (h *ReOpen) Close() error {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
if !h.opened {
|
|
return errFileClosed
|
|
}
|
|
h.opened = false
|
|
h.err = errFileClosed
|
|
return h.rc.Close()
|
|
}
|
|
|
|
// SetAccounting should be provided with a function which will be
|
|
// called after every read from the RW.
|
|
//
|
|
// It may return an error which will be passed back to the user.
|
|
func (h *ReOpen) SetAccounting(account AccountFn) *ReOpen {
|
|
h.account = account
|
|
return h
|
|
}
|
|
|
|
// DelayAccounting makes sure the accounting function only gets called
|
|
// on the i-th or later read of the data from this point (counting
|
|
// from 1).
|
|
//
|
|
// This is useful so that we don't account initial reads of the data
|
|
// e.g. when calculating hashes.
|
|
//
|
|
// Set this to 0 to account everything.
|
|
func (h *ReOpen) DelayAccounting(i int) {
|
|
h.accountOn = i
|
|
h.reads = 0
|
|
}
|