fs: improve ChunkedReader

- make Close permanent and return errors afterwards
- use RangeSeek from the wrapped reader if present
- add a limit to chunk growth
- correct RangeSeek interface behavior
- add tests
This commit is contained in:
Fabian Möller 2018-02-18 15:10:15 +01:00 committed by Nick Craig-Wood
parent fe25cb9c54
commit 9fdf273614
3 changed files with 320 additions and 29 deletions

View file

@ -1,43 +1,55 @@
package chunkedreader
import (
"errors"
"io"
"sync"
"github.com/ncw/rclone/fs"
)
// io related errors returned by ChunkedReader
var (
ErrorFileClosed = errors.New("file already closed")
ErrorInvalidSeek = errors.New("invalid seek position")
)
// ChunkedReader is a reader for a Object with the possibility
// of reading the source in chunks of given size
//
// A initialChunkSize of 0 will disable chunked reading.
// A initialChunkSize of <= 0 will disable chunked reading.
type ChunkedReader struct {
mu sync.Mutex
o fs.Object
rc io.ReadCloser
offset int64
chunkOffset int64
chunkSize int64
initialChunkSize int64
chunkGrowth bool
doSeek bool
mu sync.Mutex // protects following fields
o fs.Object // source to read from
rc io.ReadCloser // reader for the current open chunk
offset int64 // offset the next Read will start. -1 forces a reopen of o
chunkOffset int64 // beginning of the current or next chunk
chunkSize int64 // length of the current or next chunk. -1 will open o from chunkOffset to the end
initialChunkSize int64 // default chunkSize after the chunk specified by RangeSeek is complete
maxChunkSize int64 // consecutive read chunks will double in size until reached. -1 means no limit
customChunkSize bool // is the current chunkSize set by RangeSeek?
closed bool // has Close been called?
}
// New returns a ChunkedReader for the Object.
//
// A initialChunkSize of 0 will disable chunked reading.
// If chunkGrowth is true, the chunk size will be doubled after each chunk read.
// A initialChunkSize of <= 0 will disable chunked reading.
// If maxChunkSize is greater than initialChunkSize, the chunk size will be
// doubled after each chunk read with a maximun of maxChunkSize.
// A Seek or RangeSeek will reset the chunk size to it's initial value
func New(o fs.Object, initialChunkSize int64, chunkGrowth bool) *ChunkedReader {
if initialChunkSize < 0 {
initialChunkSize = 0
func New(o fs.Object, initialChunkSize int64, maxChunkSize int64) *ChunkedReader {
if initialChunkSize <= 0 {
initialChunkSize = -1
}
if maxChunkSize != -1 && maxChunkSize < initialChunkSize {
maxChunkSize = initialChunkSize
}
return &ChunkedReader{
o: o,
offset: -1,
chunkSize: initialChunkSize,
initialChunkSize: initialChunkSize,
chunkGrowth: chunkGrowth,
maxChunkSize: maxChunkSize,
}
}
@ -46,21 +58,32 @@ func (cr *ChunkedReader) Read(p []byte) (n int, err error) {
cr.mu.Lock()
defer cr.mu.Unlock()
if cr.closed {
return 0, ErrorFileClosed
}
for reqSize := int64(len(p)); reqSize > 0; reqSize = int64(len(p)) {
// the current chunk boundary. valid only when chunkSize > 0
chunkEnd := cr.chunkOffset + cr.chunkSize
fs.Debugf(cr.o, "ChunkedReader.Read at %d length %d chunkOffset %d chunkSize %d", cr.offset, reqSize, cr.chunkOffset, cr.chunkSize)
if atChunkEnd := cr.offset == chunkEnd; cr.offset == -1 || atChunkEnd {
if atChunkEnd && cr.chunkSize > 0 {
if cr.doSeek {
cr.doSeek = false
cr.chunkSize = cr.initialChunkSize
} else if cr.chunkGrowth {
cr.chunkSize *= 2
switch {
case cr.chunkSize > 0 && cr.offset == chunkEnd: // last chunk read completely
cr.chunkOffset = cr.offset
if cr.customChunkSize { // last chunkSize was set by RangeSeek
cr.customChunkSize = false
cr.chunkSize = cr.initialChunkSize
} else {
cr.chunkSize *= 2
if cr.chunkSize > cr.maxChunkSize && cr.maxChunkSize != -1 {
cr.chunkSize = cr.maxChunkSize
}
cr.chunkOffset = cr.offset
}
// recalculate the chunk boundary. valid only when chunkSize > 0
chunkEnd = cr.chunkOffset + cr.chunkSize
fallthrough
case cr.offset == -1: // first Read or Read after RangeSeek
err = cr.openRange()
if err != nil {
return
@ -69,7 +92,8 @@ func (cr *ChunkedReader) Read(p []byte) (n int, err error) {
var buf []byte
chunkRest := chunkEnd - cr.offset
if reqSize > chunkRest && cr.chunkSize != 0 {
// limit read to chunk boundaries if chunkSize > 0
if reqSize > chunkRest && cr.chunkSize > 0 {
buf, p = p[0:chunkRest], p[chunkRest:]
} else {
buf, p = p, nil
@ -79,6 +103,9 @@ func (cr *ChunkedReader) Read(p []byte) (n int, err error) {
n += rn
cr.offset += int64(rn)
if err != nil {
if err == io.ErrUnexpectedEOF {
err = io.EOF
}
return
}
}
@ -86,10 +113,17 @@ func (cr *ChunkedReader) Read(p []byte) (n int, err error) {
}
// Close the file - for details see io.Closer
//
// All methods on ChunkedReader will return ErrorFileClosed afterwards
func (cr *ChunkedReader) Close() error {
cr.mu.Lock()
defer cr.mu.Unlock()
if cr.closed {
return ErrorFileClosed
}
cr.closed = true
return cr.resetReader(nil, 0)
}
@ -99,11 +133,18 @@ func (cr *ChunkedReader) Seek(offset int64, whence int) (int64, error) {
}
// RangeSeek the file - for details see RangeSeeker
//
// The specified length will only apply to the next chunk opened.
// RangeSeek will not reopen the source until Read is called.
func (cr *ChunkedReader) RangeSeek(offset int64, whence int, length int64) (int64, error) {
cr.mu.Lock()
defer cr.mu.Unlock()
fs.Debugf(cr.o, "ChunkedReader.RangeSeek from %d to %d", cr.offset, offset)
fs.Debugf(cr.o, "ChunkedReader.RangeSeek from %d to %d length %d", cr.offset, offset, length)
if cr.closed {
return 0, ErrorFileClosed
}
size := cr.o.Size()
switch whence {
@ -112,15 +153,21 @@ func (cr *ChunkedReader) RangeSeek(offset int64, whence int, length int64) (int6
case io.SeekEnd:
cr.offset = size
}
// set the new chunk start
cr.chunkOffset = cr.offset + offset
// force reopen on next Read
cr.offset = -1
cr.doSeek = true
if length > 0 {
cr.customChunkSize = true
cr.chunkSize = length
} else {
cr.chunkSize = cr.initialChunkSize
}
return cr.offset, nil
if cr.chunkOffset < 0 || cr.chunkOffset >= size {
cr.chunkOffset = 0
return 0, ErrorInvalidSeek
}
return cr.chunkOffset, nil
}
// Open forces the connection to be opened
@ -128,15 +175,39 @@ func (cr *ChunkedReader) Open() (*ChunkedReader, error) {
cr.mu.Lock()
defer cr.mu.Unlock()
if cr.rc != nil && cr.offset != -1 {
return cr, nil
}
return cr, cr.openRange()
}
// openRange will open the source Object with the given range
// openRange will open the source Object with the current chunk range
//
// If the current open reader implenets RangeSeeker, it is tried first.
// When RangeSeek failes, o.Open with a RangeOption is used.
//
// A length <= 0 will request till the end of the file
func (cr *ChunkedReader) openRange() error {
offset, length := cr.chunkOffset, cr.chunkSize
fs.Debugf(cr.o, "ChunkedReader.openRange at %d length %d", offset, length)
if cr.closed {
return ErrorFileClosed
}
if rs, ok := cr.rc.(fs.RangeSeeker); ok {
n, err := rs.RangeSeek(offset, io.SeekStart, length)
if err == nil && n == offset {
cr.offset = offset
return nil
}
if err != nil {
fs.Debugf(cr.o, "ChunkedReader.openRange seek failed (%s). Trying Open", err)
} else {
fs.Debugf(cr.o, "ChunkedReader.openRange seeked to wrong offset. Wanted %d, got %d. Trying Open", offset, n)
}
}
var rc io.ReadCloser
var err error
if length <= 0 {