operations: make Open() return an io.ReadSeekCloser #7350
As part of reducing memory usage in rclone, we need to have a raw handle to an object we can seek with.
This commit is contained in:
parent
e8fcde8de1
commit
c0fb9ebfce
4 changed files with 466 additions and 85 deletions
|
@ -335,7 +335,8 @@ func CheckIdenticalDownload(ctx context.Context, dst, src fs.Object) (differ boo
|
||||||
|
|
||||||
// Does the work for CheckIdenticalDownload
|
// Does the work for CheckIdenticalDownload
|
||||||
func checkIdenticalDownload(ctx context.Context, dst, src fs.Object) (differ bool, err error) {
|
func checkIdenticalDownload(ctx context.Context, dst, src fs.Object) (differ bool, err error) {
|
||||||
in1, err := Open(ctx, dst)
|
var in1, in2 io.ReadCloser
|
||||||
|
in1, err = Open(ctx, dst)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return true, fmt.Errorf("failed to open %q: %w", dst, err)
|
return true, fmt.Errorf("failed to open %q: %w", dst, err)
|
||||||
}
|
}
|
||||||
|
@ -345,7 +346,7 @@ func checkIdenticalDownload(ctx context.Context, dst, src fs.Object) (differ boo
|
||||||
}()
|
}()
|
||||||
in1 = tr1.Account(ctx, in1).WithBuffer() // account and buffer the transfer
|
in1 = tr1.Account(ctx, in1).WithBuffer() // account and buffer the transfer
|
||||||
|
|
||||||
in2, err := Open(ctx, src)
|
in2, err = Open(ctx, src)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return true, fmt.Errorf("failed to open %q: %w", src, err)
|
return true, fmt.Errorf("failed to open %q: %w", src, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -771,7 +771,8 @@ func hashSum(ctx context.Context, ht hash.Type, base64Encoded bool, downloadFlag
|
||||||
for _, option := range fs.GetConfig(ctx).DownloadHeaders {
|
for _, option := range fs.GetConfig(ctx).DownloadHeaders {
|
||||||
options = append(options, option)
|
options = append(options, option)
|
||||||
}
|
}
|
||||||
in, err := Open(ctx, o, options...)
|
var in io.ReadCloser
|
||||||
|
in, err = Open(ctx, o, options...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "ERROR", fmt.Errorf("failed to open file %v: %w", o, err)
|
return "ERROR", fmt.Errorf("failed to open file %v: %w", o, err)
|
||||||
}
|
}
|
||||||
|
@ -1071,7 +1072,8 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64, sep []b
|
||||||
for _, option := range ci.DownloadHeaders {
|
for _, option := range ci.DownloadHeaders {
|
||||||
options = append(options, option)
|
options = append(options, option)
|
||||||
}
|
}
|
||||||
in, err := Open(ctx, o, options...)
|
var in io.ReadCloser
|
||||||
|
in, err = Open(ctx, o, options...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fs.CountError(err)
|
err = fs.CountError(err)
|
||||||
fs.Errorf(o, "Failed to open: %v", err)
|
fs.Errorf(o, "Failed to open: %v", err)
|
||||||
|
|
|
@ -10,23 +10,42 @@ import (
|
||||||
"github.com/rclone/rclone/fs/fserrors"
|
"github.com/rclone/rclone/fs/fserrors"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// AccountFn is a function which will be called after every read
|
||||||
|
// from the ReOpen.
|
||||||
|
//
|
||||||
|
// It may return an error which will be passed back to the user.
|
||||||
|
type AccountFn func(n int) error
|
||||||
|
|
||||||
// ReOpen is a wrapper for an object reader which reopens the stream on error
|
// ReOpen is a wrapper for an object reader which reopens the stream on error
|
||||||
type ReOpen struct {
|
type ReOpen struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
mu sync.Mutex // mutex to protect the below
|
mu sync.Mutex // mutex to protect the below
|
||||||
src fs.Object // object to open
|
src fs.Object // object to open
|
||||||
options []fs.OpenOption // option to pass to initial open
|
baseOptions []fs.OpenOption // options to pass to initial open and where offset == 0
|
||||||
rc io.ReadCloser // underlying stream
|
options []fs.OpenOption // option to pass on subsequent opens where offset != 0
|
||||||
read int64 // number of bytes read from this stream
|
rangeOption fs.RangeOption // adjust this range option on re-opens
|
||||||
maxTries int // maximum number of retries
|
rc io.ReadCloser // underlying stream
|
||||||
tries int // number of retries we've had so far in this stream
|
size int64 // total size of object - can be -ve
|
||||||
err error // if this is set then Read/Close calls will return it
|
start int64 // absolute position to start reading from
|
||||||
opened bool // if set then rc is valid and needs closing
|
end int64 // absolute position to end reading (exclusive)
|
||||||
|
offset int64 // offset in the file we are at, offset from start
|
||||||
|
newOffset int64 // if different to offset, reopen needed
|
||||||
|
maxTries int // maximum number of retries
|
||||||
|
tries int // number of retries we've had so far in this stream
|
||||||
|
err error // if this is set then Read/Close calls will return it
|
||||||
|
opened bool // if set then rc is valid and needs closing
|
||||||
|
account AccountFn // account for a read
|
||||||
|
reads int // count how many times the data has been read
|
||||||
|
accountOn int // only account on or after this read
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
errorFileClosed = errors.New("file already closed")
|
errFileClosed = errors.New("file already closed")
|
||||||
errorTooManyTries = errors.New("failed to reopen: too many retries")
|
errTooManyTries = errors.New("failed to reopen: too many retries")
|
||||||
|
errInvalidWhence = errors.New("reopen Seek: invalid whence")
|
||||||
|
errNegativeSeek = errors.New("reopen Seek: negative position")
|
||||||
|
errSeekPastEnd = errors.New("reopen Seek: attempt to seek past end of data")
|
||||||
|
errBadEndSeek = errors.New("reopen Seek: can't seek from end with unknown sized object")
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewReOpen makes a handle which will reopen itself and seek to where
|
// NewReOpen makes a handle which will reopen itself and seek to where
|
||||||
|
@ -37,15 +56,49 @@ var (
|
||||||
//
|
//
|
||||||
// If an fs.RangeOption is set then this will applied when reading from
|
// If an fs.RangeOption is set then this will applied when reading from
|
||||||
// the start, and updated on retries.
|
// the start, and updated on retries.
|
||||||
func NewReOpen(ctx context.Context, src fs.Object, maxTries int, options ...fs.OpenOption) (rc io.ReadCloser, err error) {
|
func NewReOpen(ctx context.Context, src fs.Object, maxTries int, options ...fs.OpenOption) (rc *ReOpen, err error) {
|
||||||
h := &ReOpen{
|
h := &ReOpen{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
src: src,
|
src: src,
|
||||||
maxTries: maxTries,
|
maxTries: maxTries,
|
||||||
options: options,
|
baseOptions: options,
|
||||||
|
size: src.Size(),
|
||||||
|
start: 0,
|
||||||
|
offset: 0,
|
||||||
|
newOffset: -1, // -1 means no seek required
|
||||||
}
|
}
|
||||||
h.mu.Lock()
|
h.mu.Lock()
|
||||||
defer h.mu.Unlock()
|
defer h.mu.Unlock()
|
||||||
|
|
||||||
|
// Filter the options for subsequent opens
|
||||||
|
h.options = make([]fs.OpenOption, 0, len(options)+1)
|
||||||
|
var limit int64 = -1
|
||||||
|
for _, option := range options {
|
||||||
|
switch x := option.(type) {
|
||||||
|
case *fs.HashesOption:
|
||||||
|
// leave hash option out when ranging
|
||||||
|
case *fs.RangeOption:
|
||||||
|
h.start, limit = x.Decode(h.end)
|
||||||
|
case *fs.SeekOption:
|
||||||
|
h.start, limit = x.Offset, -1
|
||||||
|
default:
|
||||||
|
h.options = append(h.options, option)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put our RangeOption on the end
|
||||||
|
h.rangeOption.Start = h.start
|
||||||
|
h.options = append(h.options, &h.rangeOption)
|
||||||
|
|
||||||
|
// If a size range is set then set the end point of the file to that
|
||||||
|
if limit >= 0 && h.size >= 0 {
|
||||||
|
h.end = h.start + limit
|
||||||
|
h.rangeOption.End = h.end - 1 // remember range options are inclusive
|
||||||
|
} else {
|
||||||
|
h.end = h.size
|
||||||
|
h.rangeOption.End = -1
|
||||||
|
}
|
||||||
|
|
||||||
err = h.open()
|
err = h.open()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -66,7 +119,7 @@ func NewReOpen(ctx context.Context, src fs.Object, maxTries int, options ...fs.O
|
||||||
// tries.
|
// tries.
|
||||||
//
|
//
|
||||||
// Use this instead of calling the Open method on fs.Objects
|
// Use this instead of calling the Open method on fs.Objects
|
||||||
func Open(ctx context.Context, src fs.Object, options ...fs.OpenOption) (rc io.ReadCloser, err error) {
|
func Open(ctx context.Context, src fs.Object, options ...fs.OpenOption) (rc *ReOpen, err error) {
|
||||||
maxTries := fs.GetConfig(ctx).LowLevelRetries
|
maxTries := fs.GetConfig(ctx).LowLevelRetries
|
||||||
return NewReOpen(ctx, src, maxTries, options...)
|
return NewReOpen(ctx, src, maxTries, options...)
|
||||||
}
|
}
|
||||||
|
@ -75,49 +128,25 @@ func Open(ctx context.Context, src fs.Object, options ...fs.OpenOption) (rc io.R
|
||||||
//
|
//
|
||||||
// we don't retry here as the Open() call will itself have low level retries
|
// we don't retry here as the Open() call will itself have low level retries
|
||||||
func (h *ReOpen) open() error {
|
func (h *ReOpen) open() error {
|
||||||
opts := []fs.OpenOption{}
|
var opts []fs.OpenOption
|
||||||
var hashOption *fs.HashesOption
|
if h.offset == 0 {
|
||||||
var rangeOption *fs.RangeOption
|
// if reading from the start using the initial options
|
||||||
for _, option := range h.options {
|
opts = h.baseOptions
|
||||||
switch option := option.(type) {
|
|
||||||
case *fs.HashesOption:
|
|
||||||
hashOption = option
|
|
||||||
case *fs.RangeOption:
|
|
||||||
rangeOption = option
|
|
||||||
case *fs.HTTPOption:
|
|
||||||
opts = append(opts, option)
|
|
||||||
default:
|
|
||||||
if option.Mandatory() {
|
|
||||||
fs.Logf(h.src, "Unsupported mandatory option: %v", option)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if h.read == 0 {
|
|
||||||
if rangeOption != nil {
|
|
||||||
opts = append(opts, rangeOption)
|
|
||||||
}
|
|
||||||
if hashOption != nil {
|
|
||||||
// put hashOption on if reading from the start, ditch otherwise
|
|
||||||
opts = append(opts, hashOption)
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
if rangeOption != nil {
|
// otherwise use the filtered options
|
||||||
// range to the read point
|
opts = h.options
|
||||||
opts = append(opts, &fs.RangeOption{Start: rangeOption.Start + h.read, End: rangeOption.End})
|
// Adjust range start to where we have got to
|
||||||
} else {
|
h.rangeOption.Start = h.start + h.offset
|
||||||
// seek to the read point
|
|
||||||
opts = append(opts, &fs.SeekOption{Offset: h.read})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
h.tries++
|
h.tries++
|
||||||
if h.tries > h.maxTries {
|
if h.tries > h.maxTries {
|
||||||
h.err = errorTooManyTries
|
h.err = errTooManyTries
|
||||||
} else {
|
} else {
|
||||||
h.rc, h.err = h.src.Open(h.ctx, opts...)
|
h.rc, h.err = h.src.Open(h.ctx, opts...)
|
||||||
}
|
}
|
||||||
if h.err != nil {
|
if h.err != nil {
|
||||||
if h.tries > 1 {
|
if h.tries > 1 {
|
||||||
fs.Debugf(h.src, "Reopen failed after %d bytes read: %v", h.read, h.err)
|
fs.Debugf(h.src, "Reopen failed after offset %d bytes read: %v", h.offset, h.err)
|
||||||
}
|
}
|
||||||
return h.err
|
return h.err
|
||||||
}
|
}
|
||||||
|
@ -125,6 +154,31 @@ func (h *ReOpen) open() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// reopen the underlying handle by closing it and reopening it.
|
||||||
|
func (h *ReOpen) reopen() (err error) {
|
||||||
|
// close underlying stream if needed
|
||||||
|
if h.opened {
|
||||||
|
h.opened = false
|
||||||
|
_ = h.rc.Close()
|
||||||
|
}
|
||||||
|
return h.open()
|
||||||
|
}
|
||||||
|
|
||||||
|
// account for n bytes being read
|
||||||
|
func (h *ReOpen) accountRead(n int) error {
|
||||||
|
if h.account == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// Don't start accounting until we've reached this many reads
|
||||||
|
//
|
||||||
|
// rw.reads will be 1 the first time this is called
|
||||||
|
// rw.accountOn 2 means start accounting on the 2nd read through
|
||||||
|
if h.reads >= h.accountOn {
|
||||||
|
return h.account(n)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Read bytes retrying as necessary
|
// Read bytes retrying as necessary
|
||||||
func (h *ReOpen) Read(p []byte) (n int, err error) {
|
func (h *ReOpen) Read(p []byte) (n int, err error) {
|
||||||
h.mu.Lock()
|
h.mu.Lock()
|
||||||
|
@ -133,32 +187,128 @@ func (h *ReOpen) Read(p []byte) (n int, err error) {
|
||||||
// return a previous error if there is one
|
// return a previous error if there is one
|
||||||
return n, h.err
|
return n, h.err
|
||||||
}
|
}
|
||||||
n, err = h.rc.Read(p)
|
|
||||||
if err != nil {
|
// re-open if seek needed
|
||||||
h.err = err
|
if h.newOffset >= 0 {
|
||||||
|
if h.offset != h.newOffset {
|
||||||
|
fs.Debugf(h.src, "Seek from %d to %d", h.offset, h.newOffset)
|
||||||
|
h.offset = h.newOffset
|
||||||
|
err = h.reopen()
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
h.newOffset = -1
|
||||||
}
|
}
|
||||||
h.read += int64(n)
|
|
||||||
if err != nil && err != io.EOF && !fserrors.IsNoLowLevelRetryError(err) {
|
// Read a full buffer
|
||||||
// close underlying stream
|
startOffset := h.offset
|
||||||
h.opened = false
|
var nn int
|
||||||
_ = h.rc.Close()
|
for n < len(p) && err == nil {
|
||||||
// reopen stream, clearing error if successful
|
nn, err = h.rc.Read(p[n:])
|
||||||
fs.Debugf(h.src, "Reopening on read failure after %d bytes: retry %d/%d: %v", h.read, h.tries, h.maxTries, err)
|
n += nn
|
||||||
if h.open() == nil {
|
h.offset += int64(nn)
|
||||||
err = nil
|
if err != nil && err != io.EOF {
|
||||||
|
h.err = err
|
||||||
|
if !fserrors.IsNoLowLevelRetryError(err) {
|
||||||
|
fs.Debugf(h.src, "Reopening on read failure after offset %d bytes: retry %d/%d: %v", h.offset, h.tries, h.maxTries, err)
|
||||||
|
if h.reopen() == nil {
|
||||||
|
err = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Count a read of the data if we read from the start successfully
|
||||||
|
if startOffset == 0 && n != 0 {
|
||||||
|
h.reads++
|
||||||
|
}
|
||||||
|
// Account the read
|
||||||
|
accErr := h.accountRead(n)
|
||||||
|
if err == nil {
|
||||||
|
err = accErr
|
||||||
|
}
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Seek sets the offset for the next Read or Write to offset, interpreted
|
||||||
|
// according to whence: SeekStart means relative to the start of the file,
|
||||||
|
// SeekCurrent means relative to the current offset, and SeekEnd means relative
|
||||||
|
// to the end (for example, offset = -2 specifies the penultimate byte of the
|
||||||
|
// file). Seek returns the new offset relative to the start of the file or an
|
||||||
|
// error, if any.
|
||||||
|
//
|
||||||
|
// Seeking to an offset before the start of the file is an error. Seeking
|
||||||
|
// to any positive offset may be allowed, but if the new offset exceeds the
|
||||||
|
// size of the underlying object the behavior of subsequent I/O operations is
|
||||||
|
// implementation-dependent.
|
||||||
|
func (h *ReOpen) Seek(offset int64, whence int) (int64, error) {
|
||||||
|
h.mu.Lock()
|
||||||
|
defer h.mu.Unlock()
|
||||||
|
if h.err != nil {
|
||||||
|
// return a previous error if there is one
|
||||||
|
return 0, h.err
|
||||||
|
}
|
||||||
|
var abs int64
|
||||||
|
var size = h.end - h.start
|
||||||
|
switch whence {
|
||||||
|
case io.SeekStart:
|
||||||
|
abs = offset
|
||||||
|
case io.SeekCurrent:
|
||||||
|
if h.newOffset >= 0 {
|
||||||
|
abs = h.newOffset + offset
|
||||||
|
} else {
|
||||||
|
abs = h.offset + offset
|
||||||
|
}
|
||||||
|
case io.SeekEnd:
|
||||||
|
if h.size < 0 {
|
||||||
|
return 0, errBadEndSeek
|
||||||
|
}
|
||||||
|
abs = size + offset
|
||||||
|
default:
|
||||||
|
return 0, errInvalidWhence
|
||||||
|
}
|
||||||
|
if abs < 0 {
|
||||||
|
return 0, errNegativeSeek
|
||||||
|
}
|
||||||
|
if h.size >= 0 && abs > size {
|
||||||
|
return size, errSeekPastEnd
|
||||||
|
}
|
||||||
|
|
||||||
|
h.tries = 0 // Reset open count on seek
|
||||||
|
h.newOffset = abs // New offset - applied in Read
|
||||||
|
return abs, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Close the stream
|
// Close the stream
|
||||||
func (h *ReOpen) Close() error {
|
func (h *ReOpen) Close() error {
|
||||||
h.mu.Lock()
|
h.mu.Lock()
|
||||||
defer h.mu.Unlock()
|
defer h.mu.Unlock()
|
||||||
if !h.opened {
|
if !h.opened {
|
||||||
return errorFileClosed
|
return errFileClosed
|
||||||
}
|
}
|
||||||
h.opened = false
|
h.opened = false
|
||||||
h.err = errorFileClosed
|
h.err = errFileClosed
|
||||||
return h.rc.Close()
|
return h.rc.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetAccounting should be provided with a function which will be
|
||||||
|
// called after every read from the RW.
|
||||||
|
//
|
||||||
|
// It may return an error which will be passed back to the user.
|
||||||
|
func (h *ReOpen) SetAccounting(account AccountFn) *ReOpen {
|
||||||
|
h.account = account
|
||||||
|
return h
|
||||||
|
}
|
||||||
|
|
||||||
|
// DelayAccounting makes sure the accounting function only gets called
|
||||||
|
// on the i-th or later read of the data from this point (counting
|
||||||
|
// from 1).
|
||||||
|
//
|
||||||
|
// This is useful so that we don't account initial reads of the data
|
||||||
|
// e.g. when calculating hashes.
|
||||||
|
//
|
||||||
|
// Set this to 0 to account everything.
|
||||||
|
func (h *ReOpen) DelayAccounting(i int) {
|
||||||
|
h.accountOn = i
|
||||||
|
h.reads = 0
|
||||||
|
}
|
||||||
|
|
|
@ -9,12 +9,17 @@ import (
|
||||||
"github.com/rclone/rclone/fs"
|
"github.com/rclone/rclone/fs"
|
||||||
"github.com/rclone/rclone/fs/hash"
|
"github.com/rclone/rclone/fs/hash"
|
||||||
"github.com/rclone/rclone/fstest/mockobject"
|
"github.com/rclone/rclone/fstest/mockobject"
|
||||||
|
"github.com/rclone/rclone/lib/pool"
|
||||||
"github.com/rclone/rclone/lib/readers"
|
"github.com/rclone/rclone/lib/readers"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
// check interface
|
// check interfaces
|
||||||
var _ io.ReadCloser = (*ReOpen)(nil)
|
var (
|
||||||
|
_ io.ReadSeekCloser = (*ReOpen)(nil)
|
||||||
|
_ pool.DelayAccountinger = (*ReOpen)(nil)
|
||||||
|
)
|
||||||
|
|
||||||
var errorTestError = errors.New("test error")
|
var errorTestError = errors.New("test error")
|
||||||
|
|
||||||
|
@ -24,13 +29,36 @@ var errorTestError = errors.New("test error")
|
||||||
// error
|
// error
|
||||||
type reOpenTestObject struct {
|
type reOpenTestObject struct {
|
||||||
fs.Object
|
fs.Object
|
||||||
breaks []int64
|
t *testing.T
|
||||||
|
breaks []int64
|
||||||
|
unknownSize bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open opens the file for read. Call Close() on the returned io.ReadCloser
|
// Open opens the file for read. Call Close() on the returned io.ReadCloser
|
||||||
//
|
//
|
||||||
// This will break after reading the number of bytes in breaks
|
// This will break after reading the number of bytes in breaks
|
||||||
func (o *reOpenTestObject) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error) {
|
func (o *reOpenTestObject) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error) {
|
||||||
|
gotHash := false
|
||||||
|
gotRange := false
|
||||||
|
startPos := int64(0)
|
||||||
|
for _, option := range options {
|
||||||
|
switch x := option.(type) {
|
||||||
|
case *fs.HashesOption:
|
||||||
|
gotHash = true
|
||||||
|
case *fs.RangeOption:
|
||||||
|
gotRange = true
|
||||||
|
startPos = x.Start
|
||||||
|
if o.unknownSize {
|
||||||
|
assert.Equal(o.t, int64(-1), x.End)
|
||||||
|
}
|
||||||
|
case *fs.SeekOption:
|
||||||
|
startPos = x.Offset
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Check if ranging, mustn't have hash if offset != 0
|
||||||
|
if gotHash && gotRange {
|
||||||
|
assert.Equal(o.t, int64(0), startPos)
|
||||||
|
}
|
||||||
rc, err := o.Object.Open(ctx, options...)
|
rc, err := o.Object.Open(ctx, options...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -52,28 +80,53 @@ func (o *reOpenTestObject) Open(ctx context.Context, options ...fs.OpenOption) (
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestReOpen(t *testing.T) {
|
func TestReOpen(t *testing.T) {
|
||||||
for testIndex, testName := range []string{"Seek", "Range"} {
|
for _, testName := range []string{"Normal", "WithRangeOption", "WithSeekOption", "UnknownSize"} {
|
||||||
t.Run(testName, func(t *testing.T) {
|
t.Run(testName, func(t *testing.T) {
|
||||||
// Contents for the mock object
|
// Contents for the mock object
|
||||||
var (
|
var (
|
||||||
reOpenTestcontents = []byte("0123456789")
|
reOpenTestcontents = []byte("0123456789")
|
||||||
expectedRead = reOpenTestcontents
|
expectedRead = reOpenTestcontents
|
||||||
rangeOption *fs.RangeOption
|
rangeOption *fs.RangeOption
|
||||||
|
seekOption *fs.SeekOption
|
||||||
|
unknownSize = false
|
||||||
)
|
)
|
||||||
if testIndex > 0 {
|
switch testName {
|
||||||
rangeOption = &fs.RangeOption{Start: 1, End: 7}
|
case "Normal":
|
||||||
|
case "WithRangeOption":
|
||||||
|
rangeOption = &fs.RangeOption{Start: 1, End: 7} // range is inclusive
|
||||||
expectedRead = reOpenTestcontents[1:8]
|
expectedRead = reOpenTestcontents[1:8]
|
||||||
|
case "WithSeekOption":
|
||||||
|
seekOption = &fs.SeekOption{Offset: 2}
|
||||||
|
expectedRead = reOpenTestcontents[2:]
|
||||||
|
case "UnknownSize":
|
||||||
|
rangeOption = &fs.RangeOption{Start: 1, End: -1}
|
||||||
|
expectedRead = reOpenTestcontents[1:]
|
||||||
|
unknownSize = true
|
||||||
|
default:
|
||||||
|
panic("bad test name")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start the test with the given breaks
|
// Start the test with the given breaks
|
||||||
testReOpen := func(breaks []int64, maxRetries int) (io.ReadCloser, error) {
|
testReOpen := func(breaks []int64, maxRetries int) (*ReOpen, error) {
|
||||||
srcOrig := mockobject.New("potato").WithContent(reOpenTestcontents, mockobject.SeekModeNone)
|
srcOrig := mockobject.New("potato").WithContent(reOpenTestcontents, mockobject.SeekModeNone)
|
||||||
|
srcOrig.SetUnknownSize(unknownSize)
|
||||||
src := &reOpenTestObject{
|
src := &reOpenTestObject{
|
||||||
Object: srcOrig,
|
Object: srcOrig,
|
||||||
breaks: breaks,
|
t: t,
|
||||||
|
breaks: breaks,
|
||||||
|
unknownSize: unknownSize,
|
||||||
}
|
}
|
||||||
hashOption := &fs.HashesOption{Hashes: hash.NewHashSet(hash.MD5)}
|
opts := []fs.OpenOption{}
|
||||||
return NewReOpen(context.Background(), src, maxRetries, hashOption, rangeOption)
|
if rangeOption == nil && seekOption == nil {
|
||||||
|
opts = append(opts, &fs.HashesOption{Hashes: hash.NewHashSet(hash.MD5)})
|
||||||
|
}
|
||||||
|
if rangeOption != nil {
|
||||||
|
opts = append(opts, rangeOption)
|
||||||
|
}
|
||||||
|
if seekOption != nil {
|
||||||
|
opts = append(opts, seekOption)
|
||||||
|
}
|
||||||
|
return NewReOpen(context.Background(), src, maxRetries, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Run("Basics", func(t *testing.T) {
|
t.Run("Basics", func(t *testing.T) {
|
||||||
|
@ -92,16 +145,25 @@ func TestReOpen(t *testing.T) {
|
||||||
assert.Equal(t, 0, n)
|
assert.Equal(t, 0, n)
|
||||||
assert.Equal(t, io.EOF, err)
|
assert.Equal(t, io.EOF, err)
|
||||||
|
|
||||||
|
// Rewind the stream
|
||||||
|
_, err = h.Seek(0, io.SeekStart)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Check contents read correctly
|
||||||
|
got, err = io.ReadAll(h)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, expectedRead, got)
|
||||||
|
|
||||||
// Check close
|
// Check close
|
||||||
assert.NoError(t, h.Close())
|
assert.NoError(t, h.Close())
|
||||||
|
|
||||||
// Check double close
|
// Check double close
|
||||||
assert.Equal(t, errorFileClosed, h.Close())
|
assert.Equal(t, errFileClosed, h.Close())
|
||||||
|
|
||||||
// Check read after close
|
// Check read after close
|
||||||
n, err = h.Read(buf)
|
n, err = h.Read(buf)
|
||||||
assert.Equal(t, 0, n)
|
assert.Equal(t, 0, n)
|
||||||
assert.Equal(t, errorFileClosed, err)
|
assert.Equal(t, errFileClosed, err)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("ErrorAtStart", func(t *testing.T) {
|
t.Run("ErrorAtStart", func(t *testing.T) {
|
||||||
|
@ -139,10 +201,176 @@ func TestReOpen(t *testing.T) {
|
||||||
var buf = make([]byte, 1)
|
var buf = make([]byte, 1)
|
||||||
n, err := h.Read(buf)
|
n, err := h.Read(buf)
|
||||||
assert.Equal(t, 0, n)
|
assert.Equal(t, 0, n)
|
||||||
assert.Equal(t, errorTooManyTries, err)
|
assert.Equal(t, errTooManyTries, err)
|
||||||
|
|
||||||
// Check close
|
// Check close
|
||||||
assert.Equal(t, errorFileClosed, h.Close())
|
assert.Equal(t, errFileClosed, h.Close())
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Seek", func(t *testing.T) {
|
||||||
|
// open
|
||||||
|
h, err := testReOpen([]int64{2, 1, 3}, 10)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// Seek to end
|
||||||
|
pos, err := h.Seek(int64(len(expectedRead)), io.SeekStart)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, int64(len(expectedRead)), pos)
|
||||||
|
|
||||||
|
// Seek to start
|
||||||
|
pos, err = h.Seek(0, io.SeekStart)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, int64(0), pos)
|
||||||
|
|
||||||
|
// Should not allow seek past end
|
||||||
|
pos, err = h.Seek(int64(len(expectedRead))+1, io.SeekCurrent)
|
||||||
|
if !unknownSize {
|
||||||
|
assert.Equal(t, errSeekPastEnd, err)
|
||||||
|
assert.Equal(t, len(expectedRead), int(pos))
|
||||||
|
} else {
|
||||||
|
assert.Equal(t, nil, err)
|
||||||
|
assert.Equal(t, len(expectedRead)+1, int(pos))
|
||||||
|
|
||||||
|
// Seek back to start to get tests in sync
|
||||||
|
pos, err = h.Seek(0, io.SeekStart)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, int64(0), pos)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should not allow seek to negative position start
|
||||||
|
pos, err = h.Seek(-1, io.SeekCurrent)
|
||||||
|
assert.Equal(t, errNegativeSeek, err)
|
||||||
|
assert.Equal(t, 0, int(pos))
|
||||||
|
|
||||||
|
// Should not allow seek with invalid whence
|
||||||
|
pos, err = h.Seek(0, 3)
|
||||||
|
assert.Equal(t, errInvalidWhence, err)
|
||||||
|
assert.Equal(t, 0, int(pos))
|
||||||
|
|
||||||
|
// check read
|
||||||
|
dst := make([]byte, 5)
|
||||||
|
n, err := h.Read(dst)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, 5, n)
|
||||||
|
assert.Equal(t, expectedRead[:5], dst)
|
||||||
|
|
||||||
|
// Test io.SeekCurrent
|
||||||
|
pos, err = h.Seek(-3, io.SeekCurrent)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, 2, int(pos))
|
||||||
|
|
||||||
|
// check read
|
||||||
|
n, err = h.Read(dst)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, 5, n)
|
||||||
|
assert.Equal(t, expectedRead[2:7], dst)
|
||||||
|
|
||||||
|
pos, err = h.Seek(-2, io.SeekCurrent)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, 5, int(pos))
|
||||||
|
|
||||||
|
// Test io.SeekEnd
|
||||||
|
pos, err = h.Seek(-3, io.SeekEnd)
|
||||||
|
if !unknownSize {
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, len(expectedRead)-3, int(pos))
|
||||||
|
} else {
|
||||||
|
assert.Equal(t, errBadEndSeek, err)
|
||||||
|
assert.Equal(t, 0, int(pos))
|
||||||
|
|
||||||
|
// sync
|
||||||
|
pos, err = h.Seek(1, io.SeekCurrent)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, 6, int(pos))
|
||||||
|
}
|
||||||
|
|
||||||
|
// check read
|
||||||
|
dst = make([]byte, 3)
|
||||||
|
n, err = h.Read(dst)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, 3, n)
|
||||||
|
assert.Equal(t, expectedRead[len(expectedRead)-3:], dst)
|
||||||
|
|
||||||
|
// check close
|
||||||
|
assert.NoError(t, h.Close())
|
||||||
|
_, err = h.Seek(0, io.SeekCurrent)
|
||||||
|
assert.Equal(t, errFileClosed, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("AccountRead", func(t *testing.T) {
|
||||||
|
h, err := testReOpen(nil, 10)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
var total int
|
||||||
|
h.SetAccounting(func(n int) error {
|
||||||
|
total += n
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
dst := make([]byte, 3)
|
||||||
|
n, err := h.Read(dst)
|
||||||
|
assert.Equal(t, 3, n)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, 3, total)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("AccountReadDelay", func(t *testing.T) {
|
||||||
|
h, err := testReOpen(nil, 10)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
var total int
|
||||||
|
h.SetAccounting(func(n int) error {
|
||||||
|
total += n
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
rewind := func() {
|
||||||
|
_, err := h.Seek(0, io.SeekStart)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
h.DelayAccounting(3)
|
||||||
|
|
||||||
|
dst := make([]byte, 16)
|
||||||
|
|
||||||
|
n, err := h.Read(dst)
|
||||||
|
assert.Equal(t, len(expectedRead), n)
|
||||||
|
assert.Equal(t, io.EOF, err)
|
||||||
|
assert.Equal(t, 0, total)
|
||||||
|
rewind()
|
||||||
|
|
||||||
|
n, err = h.Read(dst)
|
||||||
|
assert.Equal(t, len(expectedRead), n)
|
||||||
|
assert.Equal(t, io.EOF, err)
|
||||||
|
assert.Equal(t, 0, total)
|
||||||
|
rewind()
|
||||||
|
|
||||||
|
n, err = h.Read(dst)
|
||||||
|
assert.Equal(t, len(expectedRead), n)
|
||||||
|
assert.Equal(t, io.EOF, err)
|
||||||
|
assert.Equal(t, len(expectedRead), total)
|
||||||
|
rewind()
|
||||||
|
|
||||||
|
n, err = h.Read(dst)
|
||||||
|
assert.Equal(t, len(expectedRead), n)
|
||||||
|
assert.Equal(t, io.EOF, err)
|
||||||
|
assert.Equal(t, 2*len(expectedRead), total)
|
||||||
|
rewind()
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("AccountReadError", func(t *testing.T) {
|
||||||
|
// Test accounting errors
|
||||||
|
h, err := testReOpen(nil, 10)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
h.SetAccounting(func(n int) error {
|
||||||
|
return errorTestError
|
||||||
|
})
|
||||||
|
|
||||||
|
dst := make([]byte, 3)
|
||||||
|
n, err := h.Read(dst)
|
||||||
|
assert.Equal(t, 3, n)
|
||||||
|
assert.Equal(t, errorTestError, err)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue