From c0fb9ebfcea1c3337b9bf5081520403436efeb4f Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Sun, 8 Oct 2023 11:39:26 +0100 Subject: [PATCH] operations: make Open() return an io.ReadSeekCloser #7350 As part of reducing memory usage in rclone, we need to have a raw handle to an object we can seek with. --- fs/operations/check.go | 5 +- fs/operations/operations.go | 6 +- fs/operations/reopen.go | 282 +++++++++++++++++++++++++++-------- fs/operations/reopen_test.go | 258 ++++++++++++++++++++++++++++++-- 4 files changed, 466 insertions(+), 85 deletions(-) diff --git a/fs/operations/check.go b/fs/operations/check.go index f96efb139..06d9f14ec 100644 --- a/fs/operations/check.go +++ b/fs/operations/check.go @@ -335,7 +335,8 @@ func CheckIdenticalDownload(ctx context.Context, dst, src fs.Object) (differ boo // Does the work for CheckIdenticalDownload func checkIdenticalDownload(ctx context.Context, dst, src fs.Object) (differ bool, err error) { - in1, err := Open(ctx, dst) + var in1, in2 io.ReadCloser + in1, err = Open(ctx, dst) if err != nil { return true, fmt.Errorf("failed to open %q: %w", dst, err) } @@ -345,7 +346,7 @@ func checkIdenticalDownload(ctx context.Context, dst, src fs.Object) (differ boo }() in1 = tr1.Account(ctx, in1).WithBuffer() // account and buffer the transfer - in2, err := Open(ctx, src) + in2, err = Open(ctx, src) if err != nil { return true, fmt.Errorf("failed to open %q: %w", src, err) } diff --git a/fs/operations/operations.go b/fs/operations/operations.go index 8fb55c184..a7e2f6d50 100644 --- a/fs/operations/operations.go +++ b/fs/operations/operations.go @@ -771,7 +771,8 @@ func hashSum(ctx context.Context, ht hash.Type, base64Encoded bool, downloadFlag for _, option := range fs.GetConfig(ctx).DownloadHeaders { options = append(options, option) } - in, err := Open(ctx, o, options...) + var in io.ReadCloser + in, err = Open(ctx, o, options...) if err != nil { return "ERROR", fmt.Errorf("failed to open file %v: %w", o, err) } @@ -1071,7 +1072,8 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64, sep []b for _, option := range ci.DownloadHeaders { options = append(options, option) } - in, err := Open(ctx, o, options...) + var in io.ReadCloser + in, err = Open(ctx, o, options...) if err != nil { err = fs.CountError(err) fs.Errorf(o, "Failed to open: %v", err) diff --git a/fs/operations/reopen.go b/fs/operations/reopen.go index 2e2a114ed..b2531ced0 100644 --- a/fs/operations/reopen.go +++ b/fs/operations/reopen.go @@ -10,23 +10,42 @@ import ( "github.com/rclone/rclone/fs/fserrors" ) +// AccountFn is a function which will be called after every read +// from the ReOpen. +// +// It may return an error which will be passed back to the user. +type AccountFn func(n int) error + // ReOpen is a wrapper for an object reader which reopens the stream on error type ReOpen struct { - ctx context.Context - mu sync.Mutex // mutex to protect the below - src fs.Object // object to open - options []fs.OpenOption // option to pass to initial open - rc io.ReadCloser // underlying stream - read int64 // number of bytes read from this stream - maxTries int // maximum number of retries - tries int // number of retries we've had so far in this stream - err error // if this is set then Read/Close calls will return it - opened bool // if set then rc is valid and needs closing + ctx context.Context + mu sync.Mutex // mutex to protect the below + src fs.Object // object to open + baseOptions []fs.OpenOption // options to pass to initial open and where offset == 0 + options []fs.OpenOption // option to pass on subsequent opens where offset != 0 + rangeOption fs.RangeOption // adjust this range option on re-opens + rc io.ReadCloser // underlying stream + size int64 // total size of object - can be -ve + start int64 // absolute position to start reading from + end int64 // absolute position to end reading (exclusive) + offset int64 // offset in the file we are at, offset from start + newOffset int64 // if different to offset, reopen needed + maxTries int // maximum number of retries + tries int // number of retries we've had so far in this stream + err error // if this is set then Read/Close calls will return it + opened bool // if set then rc is valid and needs closing + account AccountFn // account for a read + reads int // count how many times the data has been read + accountOn int // only account on or after this read } var ( - errorFileClosed = errors.New("file already closed") - errorTooManyTries = errors.New("failed to reopen: too many retries") + errFileClosed = errors.New("file already closed") + errTooManyTries = errors.New("failed to reopen: too many retries") + errInvalidWhence = errors.New("reopen Seek: invalid whence") + errNegativeSeek = errors.New("reopen Seek: negative position") + errSeekPastEnd = errors.New("reopen Seek: attempt to seek past end of data") + errBadEndSeek = errors.New("reopen Seek: can't seek from end with unknown sized object") ) // NewReOpen makes a handle which will reopen itself and seek to where @@ -37,15 +56,49 @@ var ( // // If an fs.RangeOption is set then this will applied when reading from // the start, and updated on retries. -func NewReOpen(ctx context.Context, src fs.Object, maxTries int, options ...fs.OpenOption) (rc io.ReadCloser, err error) { +func NewReOpen(ctx context.Context, src fs.Object, maxTries int, options ...fs.OpenOption) (rc *ReOpen, err error) { h := &ReOpen{ - ctx: ctx, - src: src, - maxTries: maxTries, - options: options, + ctx: ctx, + src: src, + maxTries: maxTries, + baseOptions: options, + size: src.Size(), + start: 0, + offset: 0, + newOffset: -1, // -1 means no seek required } h.mu.Lock() defer h.mu.Unlock() + + // Filter the options for subsequent opens + h.options = make([]fs.OpenOption, 0, len(options)+1) + var limit int64 = -1 + for _, option := range options { + switch x := option.(type) { + case *fs.HashesOption: + // leave hash option out when ranging + case *fs.RangeOption: + h.start, limit = x.Decode(h.end) + case *fs.SeekOption: + h.start, limit = x.Offset, -1 + default: + h.options = append(h.options, option) + } + } + + // Put our RangeOption on the end + h.rangeOption.Start = h.start + h.options = append(h.options, &h.rangeOption) + + // If a size range is set then set the end point of the file to that + if limit >= 0 && h.size >= 0 { + h.end = h.start + limit + h.rangeOption.End = h.end - 1 // remember range options are inclusive + } else { + h.end = h.size + h.rangeOption.End = -1 + } + err = h.open() if err != nil { return nil, err @@ -66,7 +119,7 @@ func NewReOpen(ctx context.Context, src fs.Object, maxTries int, options ...fs.O // tries. // // Use this instead of calling the Open method on fs.Objects -func Open(ctx context.Context, src fs.Object, options ...fs.OpenOption) (rc io.ReadCloser, err error) { +func Open(ctx context.Context, src fs.Object, options ...fs.OpenOption) (rc *ReOpen, err error) { maxTries := fs.GetConfig(ctx).LowLevelRetries return NewReOpen(ctx, src, maxTries, options...) } @@ -75,49 +128,25 @@ func Open(ctx context.Context, src fs.Object, options ...fs.OpenOption) (rc io.R // // we don't retry here as the Open() call will itself have low level retries func (h *ReOpen) open() error { - opts := []fs.OpenOption{} - var hashOption *fs.HashesOption - var rangeOption *fs.RangeOption - for _, option := range h.options { - switch option := option.(type) { - case *fs.HashesOption: - hashOption = option - case *fs.RangeOption: - rangeOption = option - case *fs.HTTPOption: - opts = append(opts, option) - default: - if option.Mandatory() { - fs.Logf(h.src, "Unsupported mandatory option: %v", option) - } - } - } - if h.read == 0 { - if rangeOption != nil { - opts = append(opts, rangeOption) - } - if hashOption != nil { - // put hashOption on if reading from the start, ditch otherwise - opts = append(opts, hashOption) - } + var opts []fs.OpenOption + if h.offset == 0 { + // if reading from the start using the initial options + opts = h.baseOptions } else { - if rangeOption != nil { - // range to the read point - opts = append(opts, &fs.RangeOption{Start: rangeOption.Start + h.read, End: rangeOption.End}) - } else { - // seek to the read point - opts = append(opts, &fs.SeekOption{Offset: h.read}) - } + // otherwise use the filtered options + opts = h.options + // Adjust range start to where we have got to + h.rangeOption.Start = h.start + h.offset } h.tries++ if h.tries > h.maxTries { - h.err = errorTooManyTries + h.err = errTooManyTries } else { h.rc, h.err = h.src.Open(h.ctx, opts...) } if h.err != nil { if h.tries > 1 { - fs.Debugf(h.src, "Reopen failed after %d bytes read: %v", h.read, h.err) + fs.Debugf(h.src, "Reopen failed after offset %d bytes read: %v", h.offset, h.err) } return h.err } @@ -125,6 +154,31 @@ func (h *ReOpen) open() error { return nil } +// reopen the underlying handle by closing it and reopening it. +func (h *ReOpen) reopen() (err error) { + // close underlying stream if needed + if h.opened { + h.opened = false + _ = h.rc.Close() + } + return h.open() +} + +// account for n bytes being read +func (h *ReOpen) accountRead(n int) error { + if h.account == nil { + return nil + } + // Don't start accounting until we've reached this many reads + // + // rw.reads will be 1 the first time this is called + // rw.accountOn 2 means start accounting on the 2nd read through + if h.reads >= h.accountOn { + return h.account(n) + } + return nil +} + // Read bytes retrying as necessary func (h *ReOpen) Read(p []byte) (n int, err error) { h.mu.Lock() @@ -133,32 +187,128 @@ func (h *ReOpen) Read(p []byte) (n int, err error) { // return a previous error if there is one return n, h.err } - n, err = h.rc.Read(p) - if err != nil { - h.err = err + + // re-open if seek needed + if h.newOffset >= 0 { + if h.offset != h.newOffset { + fs.Debugf(h.src, "Seek from %d to %d", h.offset, h.newOffset) + h.offset = h.newOffset + err = h.reopen() + if err != nil { + return 0, err + } + } + h.newOffset = -1 } - h.read += int64(n) - if err != nil && err != io.EOF && !fserrors.IsNoLowLevelRetryError(err) { - // close underlying stream - h.opened = false - _ = h.rc.Close() - // reopen stream, clearing error if successful - fs.Debugf(h.src, "Reopening on read failure after %d bytes: retry %d/%d: %v", h.read, h.tries, h.maxTries, err) - if h.open() == nil { - err = nil + + // Read a full buffer + startOffset := h.offset + var nn int + for n < len(p) && err == nil { + nn, err = h.rc.Read(p[n:]) + n += nn + h.offset += int64(nn) + if err != nil && err != io.EOF { + h.err = err + if !fserrors.IsNoLowLevelRetryError(err) { + fs.Debugf(h.src, "Reopening on read failure after offset %d bytes: retry %d/%d: %v", h.offset, h.tries, h.maxTries, err) + if h.reopen() == nil { + err = nil + } + } } } + // Count a read of the data if we read from the start successfully + if startOffset == 0 && n != 0 { + h.reads++ + } + // Account the read + accErr := h.accountRead(n) + if err == nil { + err = accErr + } return n, err } +// Seek sets the offset for the next Read or Write to offset, interpreted +// according to whence: SeekStart means relative to the start of the file, +// SeekCurrent means relative to the current offset, and SeekEnd means relative +// to the end (for example, offset = -2 specifies the penultimate byte of the +// file). Seek returns the new offset relative to the start of the file or an +// error, if any. +// +// Seeking to an offset before the start of the file is an error. Seeking +// to any positive offset may be allowed, but if the new offset exceeds the +// size of the underlying object the behavior of subsequent I/O operations is +// implementation-dependent. +func (h *ReOpen) Seek(offset int64, whence int) (int64, error) { + h.mu.Lock() + defer h.mu.Unlock() + if h.err != nil { + // return a previous error if there is one + return 0, h.err + } + var abs int64 + var size = h.end - h.start + switch whence { + case io.SeekStart: + abs = offset + case io.SeekCurrent: + if h.newOffset >= 0 { + abs = h.newOffset + offset + } else { + abs = h.offset + offset + } + case io.SeekEnd: + if h.size < 0 { + return 0, errBadEndSeek + } + abs = size + offset + default: + return 0, errInvalidWhence + } + if abs < 0 { + return 0, errNegativeSeek + } + if h.size >= 0 && abs > size { + return size, errSeekPastEnd + } + + h.tries = 0 // Reset open count on seek + h.newOffset = abs // New offset - applied in Read + return abs, nil +} + // Close the stream func (h *ReOpen) Close() error { h.mu.Lock() defer h.mu.Unlock() if !h.opened { - return errorFileClosed + return errFileClosed } h.opened = false - h.err = errorFileClosed + h.err = errFileClosed return h.rc.Close() } + +// SetAccounting should be provided with a function which will be +// called after every read from the RW. +// +// It may return an error which will be passed back to the user. +func (h *ReOpen) SetAccounting(account AccountFn) *ReOpen { + h.account = account + return h +} + +// DelayAccounting makes sure the accounting function only gets called +// on the i-th or later read of the data from this point (counting +// from 1). +// +// This is useful so that we don't account initial reads of the data +// e.g. when calculating hashes. +// +// Set this to 0 to account everything. +func (h *ReOpen) DelayAccounting(i int) { + h.accountOn = i + h.reads = 0 +} diff --git a/fs/operations/reopen_test.go b/fs/operations/reopen_test.go index 2a7195b56..6cb42a439 100644 --- a/fs/operations/reopen_test.go +++ b/fs/operations/reopen_test.go @@ -9,12 +9,17 @@ import ( "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fstest/mockobject" + "github.com/rclone/rclone/lib/pool" "github.com/rclone/rclone/lib/readers" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) -// check interface -var _ io.ReadCloser = (*ReOpen)(nil) +// check interfaces +var ( + _ io.ReadSeekCloser = (*ReOpen)(nil) + _ pool.DelayAccountinger = (*ReOpen)(nil) +) var errorTestError = errors.New("test error") @@ -24,13 +29,36 @@ var errorTestError = errors.New("test error") // error type reOpenTestObject struct { fs.Object - breaks []int64 + t *testing.T + breaks []int64 + unknownSize bool } // Open opens the file for read. Call Close() on the returned io.ReadCloser // // This will break after reading the number of bytes in breaks func (o *reOpenTestObject) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error) { + gotHash := false + gotRange := false + startPos := int64(0) + for _, option := range options { + switch x := option.(type) { + case *fs.HashesOption: + gotHash = true + case *fs.RangeOption: + gotRange = true + startPos = x.Start + if o.unknownSize { + assert.Equal(o.t, int64(-1), x.End) + } + case *fs.SeekOption: + startPos = x.Offset + } + } + // Check if ranging, mustn't have hash if offset != 0 + if gotHash && gotRange { + assert.Equal(o.t, int64(0), startPos) + } rc, err := o.Object.Open(ctx, options...) if err != nil { return nil, err @@ -52,28 +80,53 @@ func (o *reOpenTestObject) Open(ctx context.Context, options ...fs.OpenOption) ( } func TestReOpen(t *testing.T) { - for testIndex, testName := range []string{"Seek", "Range"} { + for _, testName := range []string{"Normal", "WithRangeOption", "WithSeekOption", "UnknownSize"} { t.Run(testName, func(t *testing.T) { // Contents for the mock object var ( reOpenTestcontents = []byte("0123456789") expectedRead = reOpenTestcontents rangeOption *fs.RangeOption + seekOption *fs.SeekOption + unknownSize = false ) - if testIndex > 0 { - rangeOption = &fs.RangeOption{Start: 1, End: 7} + switch testName { + case "Normal": + case "WithRangeOption": + rangeOption = &fs.RangeOption{Start: 1, End: 7} // range is inclusive expectedRead = reOpenTestcontents[1:8] + case "WithSeekOption": + seekOption = &fs.SeekOption{Offset: 2} + expectedRead = reOpenTestcontents[2:] + case "UnknownSize": + rangeOption = &fs.RangeOption{Start: 1, End: -1} + expectedRead = reOpenTestcontents[1:] + unknownSize = true + default: + panic("bad test name") } // Start the test with the given breaks - testReOpen := func(breaks []int64, maxRetries int) (io.ReadCloser, error) { + testReOpen := func(breaks []int64, maxRetries int) (*ReOpen, error) { srcOrig := mockobject.New("potato").WithContent(reOpenTestcontents, mockobject.SeekModeNone) + srcOrig.SetUnknownSize(unknownSize) src := &reOpenTestObject{ - Object: srcOrig, - breaks: breaks, + Object: srcOrig, + t: t, + breaks: breaks, + unknownSize: unknownSize, } - hashOption := &fs.HashesOption{Hashes: hash.NewHashSet(hash.MD5)} - return NewReOpen(context.Background(), src, maxRetries, hashOption, rangeOption) + opts := []fs.OpenOption{} + if rangeOption == nil && seekOption == nil { + opts = append(opts, &fs.HashesOption{Hashes: hash.NewHashSet(hash.MD5)}) + } + if rangeOption != nil { + opts = append(opts, rangeOption) + } + if seekOption != nil { + opts = append(opts, seekOption) + } + return NewReOpen(context.Background(), src, maxRetries, opts...) } t.Run("Basics", func(t *testing.T) { @@ -92,16 +145,25 @@ func TestReOpen(t *testing.T) { assert.Equal(t, 0, n) assert.Equal(t, io.EOF, err) + // Rewind the stream + _, err = h.Seek(0, io.SeekStart) + require.NoError(t, err) + + // Check contents read correctly + got, err = io.ReadAll(h) + assert.NoError(t, err) + assert.Equal(t, expectedRead, got) + // Check close assert.NoError(t, h.Close()) // Check double close - assert.Equal(t, errorFileClosed, h.Close()) + assert.Equal(t, errFileClosed, h.Close()) // Check read after close n, err = h.Read(buf) assert.Equal(t, 0, n) - assert.Equal(t, errorFileClosed, err) + assert.Equal(t, errFileClosed, err) }) t.Run("ErrorAtStart", func(t *testing.T) { @@ -139,10 +201,176 @@ func TestReOpen(t *testing.T) { var buf = make([]byte, 1) n, err := h.Read(buf) assert.Equal(t, 0, n) - assert.Equal(t, errorTooManyTries, err) + assert.Equal(t, errTooManyTries, err) // Check close - assert.Equal(t, errorFileClosed, h.Close()) + assert.Equal(t, errFileClosed, h.Close()) + }) + + t.Run("Seek", func(t *testing.T) { + // open + h, err := testReOpen([]int64{2, 1, 3}, 10) + assert.NoError(t, err) + + // Seek to end + pos, err := h.Seek(int64(len(expectedRead)), io.SeekStart) + assert.NoError(t, err) + assert.Equal(t, int64(len(expectedRead)), pos) + + // Seek to start + pos, err = h.Seek(0, io.SeekStart) + assert.NoError(t, err) + assert.Equal(t, int64(0), pos) + + // Should not allow seek past end + pos, err = h.Seek(int64(len(expectedRead))+1, io.SeekCurrent) + if !unknownSize { + assert.Equal(t, errSeekPastEnd, err) + assert.Equal(t, len(expectedRead), int(pos)) + } else { + assert.Equal(t, nil, err) + assert.Equal(t, len(expectedRead)+1, int(pos)) + + // Seek back to start to get tests in sync + pos, err = h.Seek(0, io.SeekStart) + assert.NoError(t, err) + assert.Equal(t, int64(0), pos) + } + + // Should not allow seek to negative position start + pos, err = h.Seek(-1, io.SeekCurrent) + assert.Equal(t, errNegativeSeek, err) + assert.Equal(t, 0, int(pos)) + + // Should not allow seek with invalid whence + pos, err = h.Seek(0, 3) + assert.Equal(t, errInvalidWhence, err) + assert.Equal(t, 0, int(pos)) + + // check read + dst := make([]byte, 5) + n, err := h.Read(dst) + assert.Nil(t, err) + assert.Equal(t, 5, n) + assert.Equal(t, expectedRead[:5], dst) + + // Test io.SeekCurrent + pos, err = h.Seek(-3, io.SeekCurrent) + assert.Nil(t, err) + assert.Equal(t, 2, int(pos)) + + // check read + n, err = h.Read(dst) + assert.Nil(t, err) + assert.Equal(t, 5, n) + assert.Equal(t, expectedRead[2:7], dst) + + pos, err = h.Seek(-2, io.SeekCurrent) + assert.Nil(t, err) + assert.Equal(t, 5, int(pos)) + + // Test io.SeekEnd + pos, err = h.Seek(-3, io.SeekEnd) + if !unknownSize { + assert.Nil(t, err) + assert.Equal(t, len(expectedRead)-3, int(pos)) + } else { + assert.Equal(t, errBadEndSeek, err) + assert.Equal(t, 0, int(pos)) + + // sync + pos, err = h.Seek(1, io.SeekCurrent) + assert.Nil(t, err) + assert.Equal(t, 6, int(pos)) + } + + // check read + dst = make([]byte, 3) + n, err = h.Read(dst) + assert.Nil(t, err) + assert.Equal(t, 3, n) + assert.Equal(t, expectedRead[len(expectedRead)-3:], dst) + + // check close + assert.NoError(t, h.Close()) + _, err = h.Seek(0, io.SeekCurrent) + assert.Equal(t, errFileClosed, err) + }) + + t.Run("AccountRead", func(t *testing.T) { + h, err := testReOpen(nil, 10) + assert.NoError(t, err) + + var total int + h.SetAccounting(func(n int) error { + total += n + return nil + }) + + dst := make([]byte, 3) + n, err := h.Read(dst) + assert.Equal(t, 3, n) + assert.NoError(t, err) + assert.Equal(t, 3, total) + }) + + t.Run("AccountReadDelay", func(t *testing.T) { + h, err := testReOpen(nil, 10) + assert.NoError(t, err) + + var total int + h.SetAccounting(func(n int) error { + total += n + return nil + }) + + rewind := func() { + _, err := h.Seek(0, io.SeekStart) + require.NoError(t, err) + } + + h.DelayAccounting(3) + + dst := make([]byte, 16) + + n, err := h.Read(dst) + assert.Equal(t, len(expectedRead), n) + assert.Equal(t, io.EOF, err) + assert.Equal(t, 0, total) + rewind() + + n, err = h.Read(dst) + assert.Equal(t, len(expectedRead), n) + assert.Equal(t, io.EOF, err) + assert.Equal(t, 0, total) + rewind() + + n, err = h.Read(dst) + assert.Equal(t, len(expectedRead), n) + assert.Equal(t, io.EOF, err) + assert.Equal(t, len(expectedRead), total) + rewind() + + n, err = h.Read(dst) + assert.Equal(t, len(expectedRead), n) + assert.Equal(t, io.EOF, err) + assert.Equal(t, 2*len(expectedRead), total) + rewind() + }) + + t.Run("AccountReadError", func(t *testing.T) { + // Test accounting errors + h, err := testReOpen(nil, 10) + assert.NoError(t, err) + + h.SetAccounting(func(n int) error { + return errorTestError + }) + + dst := make([]byte, 3) + n, err := h.Read(dst) + assert.Equal(t, 3, n) + assert.Equal(t, errorTestError, err) }) }) }