27b281ef69
This converts the ChunkedReader into an interface and provides two implementations one sequential and one parallel. This can be used to improve the performance of the VFS on high bandwidth or high latency links. Fixes #4760
383 lines
9.7 KiB
Go
383 lines
9.7 KiB
Go
package chunkedreader
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
|
|
"github.com/rclone/rclone/fs"
|
|
"github.com/rclone/rclone/fs/hash"
|
|
"github.com/rclone/rclone/fs/log"
|
|
"github.com/rclone/rclone/fs/operations"
|
|
"github.com/rclone/rclone/lib/multipart"
|
|
"github.com/rclone/rclone/lib/pool"
|
|
)
|
|
|
|
// parallel reads Object in chunks of a given size in parallel.
|
|
type parallel struct {
|
|
ctx context.Context
|
|
o fs.Object // source to read from
|
|
mu sync.Mutex // protects following fields
|
|
endStream int64 // offset we have started streams for
|
|
offset int64 // offset the read file pointer is at
|
|
chunkSize int64 // length of the chunks to read
|
|
nstreams int // number of streams to use
|
|
streams []*stream // the opened streams in offset order - the current one is first
|
|
closed bool // has Close been called?
|
|
}
|
|
|
|
// stream holds the info about a single download
|
|
type stream struct {
|
|
cr *parallel // parent reader
|
|
ctx context.Context // ctx to cancel if needed
|
|
cancel func() // cancel the stream
|
|
rc io.ReadCloser // reader that it is reading from, may be nil
|
|
offset int64 // where the stream is reading from
|
|
size int64 // and the size it is reading
|
|
readBytes int64 // bytes read from the stream
|
|
rw *pool.RW // buffer for read
|
|
err chan error // error returned from the read
|
|
name string // name of this stream for debugging
|
|
}
|
|
|
|
// Start a stream reading (offset, offset+size)
|
|
func (cr *parallel) newStream(ctx context.Context, offset, size int64) (s *stream, err error) {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
// Create the stream
|
|
rw := multipart.NewRW()
|
|
s = &stream{
|
|
cr: cr,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
offset: offset,
|
|
size: size,
|
|
rw: rw,
|
|
err: make(chan error, 1),
|
|
}
|
|
s.name = fmt.Sprintf("stream(%d,%d,%p)", s.offset, s.size, s)
|
|
|
|
// Start the background read into the buffer
|
|
go s.readFrom(ctx)
|
|
|
|
// Return the stream to the caller
|
|
return s, nil
|
|
}
|
|
|
|
// read the file into the buffer
|
|
func (s *stream) readFrom(ctx context.Context) {
|
|
// Open the object at the correct range
|
|
fs.Debugf(s.cr.o, "%s: open", s.name)
|
|
rc, err := operations.Open(ctx, s.cr.o,
|
|
&fs.HashesOption{Hashes: hash.Set(hash.None)},
|
|
&fs.RangeOption{Start: s.offset, End: s.offset + s.size - 1})
|
|
if err != nil {
|
|
s.err <- fmt.Errorf("parallel chunked reader: failed to open stream at %d size %d: %w", s.offset, s.size, err)
|
|
return
|
|
}
|
|
s.rc = rc
|
|
|
|
fs.Debugf(s.cr.o, "%s: readfrom started", s.name)
|
|
_, err = s.rw.ReadFrom(s.rc)
|
|
fs.Debugf(s.cr.o, "%s: readfrom finished (%d bytes): %v", s.name, s.rw.Size(), err)
|
|
s.err <- err
|
|
}
|
|
|
|
// eof is true when we've read all the data we are expecting
|
|
func (s *stream) eof() bool {
|
|
return s.readBytes >= s.size
|
|
}
|
|
|
|
// read reads up to len(p) bytes into p. It returns the number of
|
|
// bytes read (0 <= n <= len(p)) and any error encountered. If some
|
|
// data is available but not len(p) bytes, read returns what is
|
|
// available instead of waiting for more.
|
|
func (s *stream) read(p []byte) (n int, err error) {
|
|
defer log.Trace(s.cr.o, "%s: Read len(p)=%d", s.name, len(p))("n=%d, err=%v", &n, &err)
|
|
if len(p) == 0 {
|
|
return n, nil
|
|
}
|
|
for {
|
|
var nn int
|
|
nn, err = s.rw.Read(p[n:])
|
|
fs.Debugf(s.cr.o, "%s: rw.Read nn=%d, err=%v", s.name, nn, err)
|
|
s.readBytes += int64(nn)
|
|
n += nn
|
|
if err != nil && err != io.EOF {
|
|
return n, err
|
|
}
|
|
if s.eof() {
|
|
return n, io.EOF
|
|
}
|
|
// Received a faux io.EOF because we haven't read all the data yet
|
|
if n >= len(p) {
|
|
break
|
|
}
|
|
// Wait for a write to happen to read more
|
|
s.rw.WaitWrite(s.ctx)
|
|
}
|
|
return n, nil
|
|
}
|
|
|
|
// Sets *perr to newErr if err is nil
|
|
func orErr(perr *error, newErr error) {
|
|
if *perr == nil {
|
|
*perr = newErr
|
|
}
|
|
}
|
|
|
|
// Close a stream
|
|
func (s *stream) close() (err error) {
|
|
defer log.Trace(s.cr.o, "%s: close", s.name)("err=%v", &err)
|
|
s.cancel()
|
|
err = <-s.err // wait for readFrom to stop and return error
|
|
orErr(&err, s.rw.Close())
|
|
if s.rc != nil {
|
|
orErr(&err, s.rc.Close())
|
|
}
|
|
if err != nil && err != io.EOF {
|
|
return fmt.Errorf("parallel chunked reader: failed to read stream at %d size %d: %w", s.offset, s.size, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Make a new parallel chunked reader
|
|
//
|
|
// Mustn't be called for an unknown size object
|
|
func newParallel(ctx context.Context, o fs.Object, chunkSize int64, streams int) ChunkedReader {
|
|
// Make sure chunkSize is a multiple of multipart.BufferSize
|
|
if chunkSize < 0 {
|
|
chunkSize = multipart.BufferSize
|
|
}
|
|
newChunkSize := multipart.BufferSize * (chunkSize / multipart.BufferSize)
|
|
if newChunkSize < chunkSize {
|
|
newChunkSize += multipart.BufferSize
|
|
}
|
|
|
|
fs.Debugf(o, "newParallel chunkSize=%d, streams=%d", chunkSize, streams)
|
|
|
|
return ¶llel{
|
|
ctx: ctx,
|
|
o: o,
|
|
offset: 0,
|
|
chunkSize: newChunkSize,
|
|
nstreams: streams,
|
|
}
|
|
}
|
|
|
|
// _open starts the file transferring at offset
|
|
//
|
|
// Call with the lock held
|
|
func (cr *parallel) _open() (err error) {
|
|
size := cr.o.Size()
|
|
if size < 0 {
|
|
return fmt.Errorf("parallel chunked reader: can't use multiple threads for unknown sized object %q", cr.o)
|
|
}
|
|
// Launched enough streams already
|
|
if cr.endStream >= size {
|
|
return nil
|
|
}
|
|
|
|
// Make sure cr.nstreams are running
|
|
for i := len(cr.streams); i < cr.nstreams; i++ {
|
|
// clip to length of file
|
|
chunkSize := cr.chunkSize
|
|
newEndStream := cr.endStream + chunkSize
|
|
if newEndStream > size {
|
|
chunkSize = size - cr.endStream
|
|
newEndStream = cr.endStream + chunkSize
|
|
}
|
|
|
|
s, err := cr.newStream(cr.ctx, cr.endStream, chunkSize)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
cr.streams = append(cr.streams, s)
|
|
cr.endStream = newEndStream
|
|
|
|
if cr.endStream >= size {
|
|
break
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Finished reading the current stream so pop it off and destroy it
|
|
//
|
|
// Call with lock held
|
|
func (cr *parallel) _popStream() (err error) {
|
|
defer log.Trace(cr.o, "streams=%+v", cr.streams)("streams=%+v, err=%v", &cr.streams, &err)
|
|
if len(cr.streams) == 0 {
|
|
return nil
|
|
}
|
|
stream := cr.streams[0]
|
|
err = stream.close()
|
|
cr.streams[0] = nil
|
|
cr.streams = cr.streams[1:]
|
|
return err
|
|
}
|
|
|
|
// Get rid of all the streams
|
|
//
|
|
// Call with lock held
|
|
func (cr *parallel) _popStreams() (err error) {
|
|
defer log.Trace(cr.o, "streams=%+v", cr.streams)("streams=%+v, err=%v", &cr.streams, &err)
|
|
for len(cr.streams) > 0 {
|
|
orErr(&err, cr._popStream())
|
|
}
|
|
cr.streams = nil
|
|
return err
|
|
}
|
|
|
|
// Read from the file - for details see io.Reader
|
|
func (cr *parallel) Read(p []byte) (n int, err error) {
|
|
defer log.Trace(cr.o, "Read len(p)=%d", len(p))("n=%d, err=%v", &n, &err)
|
|
cr.mu.Lock()
|
|
defer cr.mu.Unlock()
|
|
|
|
if cr.closed {
|
|
return 0, ErrorFileClosed
|
|
}
|
|
|
|
for n < len(p) {
|
|
// Make sure we have the correct number of streams open
|
|
err = cr._open()
|
|
if err != nil {
|
|
return n, err
|
|
}
|
|
|
|
// No streams left means EOF
|
|
if len(cr.streams) == 0 {
|
|
return n, io.EOF
|
|
}
|
|
|
|
// Read from the stream
|
|
stream := cr.streams[0]
|
|
nn, err := stream.read(p[n:])
|
|
n += nn
|
|
cr.offset += int64(nn)
|
|
if err == io.EOF {
|
|
err = cr._popStream()
|
|
if err != nil {
|
|
break
|
|
}
|
|
} else if err != nil {
|
|
break
|
|
}
|
|
}
|
|
return n, err
|
|
}
|
|
|
|
// Close the file - for details see io.Closer
|
|
//
|
|
// All methods on ChunkedReader will return ErrorFileClosed afterwards
|
|
func (cr *parallel) Close() error {
|
|
cr.mu.Lock()
|
|
defer cr.mu.Unlock()
|
|
|
|
if cr.closed {
|
|
return ErrorFileClosed
|
|
}
|
|
cr.closed = true
|
|
|
|
// Close all the streams
|
|
return cr._popStreams()
|
|
}
|
|
|
|
// Seek the file - for details see io.Seeker
|
|
func (cr *parallel) Seek(offset int64, whence int) (int64, error) {
|
|
cr.mu.Lock()
|
|
defer cr.mu.Unlock()
|
|
|
|
fs.Debugf(cr.o, "parallel chunked reader: seek from %d to %d whence %d", cr.offset, offset, whence)
|
|
|
|
if cr.closed {
|
|
return 0, ErrorFileClosed
|
|
}
|
|
|
|
size := cr.o.Size()
|
|
currentOffset := cr.offset
|
|
switch whence {
|
|
case io.SeekStart:
|
|
currentOffset = 0
|
|
case io.SeekEnd:
|
|
currentOffset = size
|
|
}
|
|
// set the new chunk start
|
|
newOffset := currentOffset + offset
|
|
if newOffset < 0 || newOffset >= size {
|
|
return 0, ErrorInvalidSeek
|
|
}
|
|
|
|
// If seek pointer didn't move, return now
|
|
if newOffset == cr.offset {
|
|
fs.Debugf(cr.o, "parallel chunked reader: seek pointer didn't move")
|
|
return cr.offset, nil
|
|
}
|
|
|
|
cr.offset = newOffset
|
|
|
|
// Ditch out of range streams
|
|
for len(cr.streams) > 0 {
|
|
stream := cr.streams[0]
|
|
if newOffset >= stream.offset+stream.size {
|
|
_ = cr._popStream()
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
|
|
// If no streams remain we can just restart
|
|
if len(cr.streams) == 0 {
|
|
fs.Debugf(cr.o, "parallel chunked reader: no streams remain")
|
|
cr.endStream = cr.offset
|
|
return cr.offset, nil
|
|
}
|
|
|
|
// Current stream
|
|
stream := cr.streams[0]
|
|
|
|
// If new offset is before current stream then ditch all the streams
|
|
if newOffset < stream.offset {
|
|
_ = cr._popStreams()
|
|
fs.Debugf(cr.o, "parallel chunked reader: new offset is before current stream - ditch all")
|
|
cr.endStream = cr.offset
|
|
return cr.offset, nil
|
|
}
|
|
|
|
// Seek the current stream
|
|
streamOffset := newOffset - stream.offset
|
|
stream.readBytes = streamOffset // correct read value
|
|
fs.Debugf(cr.o, "parallel chunked reader: seek the current stream to %d", streamOffset)
|
|
// Wait for the read to the correct part of the data
|
|
for stream.rw.Size() < streamOffset {
|
|
stream.rw.WaitWrite(cr.ctx)
|
|
}
|
|
_, err := stream.rw.Seek(streamOffset, io.SeekStart)
|
|
if err != nil {
|
|
return cr.offset, fmt.Errorf("parallel chunked reader: failed to seek stream: %w", err)
|
|
}
|
|
|
|
return cr.offset, nil
|
|
}
|
|
|
|
// RangeSeek the file - for details see RangeSeeker
|
|
//
|
|
// In the parallel chunked reader this just acts like Seek
|
|
func (cr *parallel) RangeSeek(ctx context.Context, offset int64, whence int, length int64) (int64, error) {
|
|
return cr.Seek(offset, whence)
|
|
}
|
|
|
|
// Open forces the connection to be opened
|
|
func (cr *parallel) Open() (ChunkedReader, error) {
|
|
cr.mu.Lock()
|
|
defer cr.mu.Unlock()
|
|
|
|
return cr, cr._open()
|
|
}
|
|
|
|
var (
|
|
_ ChunkedReader = (*parallel)(nil)
|
|
)
|