vfs: try to seek buffer on read only files

This commit is contained in:
Fabian Möller 2018-08-11 10:18:19 +02:00 committed by Nick Craig-Wood
parent 7d35b14138
commit 552eb8e06b
4 changed files with 167 additions and 1 deletions

View file

@ -96,6 +96,16 @@ func (acc *Account) GetReader() io.ReadCloser {
return acc.origIn
}
// GetAsyncReader returns the current AsyncReader or nil if Account is unbuffered
func (acc *Account) GetAsyncReader() *asyncreader.AsyncReader {
acc.mu.Lock()
defer acc.mu.Unlock()
if asyncIn, ok := acc.in.(*asyncreader.AsyncReader); ok {
return asyncIn
}
return nil
}
// StopBuffering stops the async buffer doing any more buffering
func (acc *Account) StopBuffering() {
if asyncIn, ok := acc.in.(*asyncreader.AsyncReader); ok {

View file

@ -180,6 +180,76 @@ func (a *AsyncReader) WriteTo(w io.Writer) (n int64, err error) {
}
}
// SkipBytes will try to seek 'skip' bytes relative to the current position.
// On success it returns true. If 'skip' is outside the current buffer data or
// an error occurs, Abandon is called and false is returned.
func (a *AsyncReader) SkipBytes(skip int) (ok bool) {
a.mu.Lock()
defer func() {
a.mu.Unlock()
if !ok {
a.Abandon()
}
}()
if a.err != nil {
return false
}
if skip < 0 {
// seek backwards if skip is inside current buffer
if a.cur != nil && a.cur.offset+skip >= 0 {
a.cur.offset += skip
return true
}
return false
}
// early return if skip is past the maximum buffer capacity
if skip >= (len(a.ready)+1)*BufferSize {
return false
}
refillTokens := 0
for {
if a.cur.isEmpty() {
if a.cur != nil {
a.putBuffer(a.cur)
refillTokens++
a.cur = nil
}
select {
case b, ok := <-a.ready:
if !ok {
return false
}
a.cur = b
default:
return false
}
}
n := len(a.cur.buffer())
if n > skip {
n = skip
}
a.cur.increment(n)
skip -= n
if skip == 0 {
for ; refillTokens > 0; refillTokens-- {
a.token <- struct{}{}
}
// If at end of buffer, store any error, if present
if a.cur.isEmpty() && a.cur.err != nil {
a.err = a.cur.err
}
return true
}
if a.cur.err != nil {
a.err = a.cur.err
return false
}
}
}
// Abandon will ensure that the underlying async reader is shut down.
// It will NOT close the input supplied on New.
func (a *AsyncReader) Abandon() {

View file

@ -3,14 +3,17 @@ package asyncreader
import (
"bufio"
"bytes"
"fmt"
"io"
"io/ioutil"
"math/rand"
"strings"
"sync"
"testing"
"testing/iotest"
"time"
"github.com/ncw/rclone/lib/readers"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -281,3 +284,78 @@ func testAsyncReaderClose(t *testing.T, writeto bool) {
}
func TestAsyncReaderCloseRead(t *testing.T) { testAsyncReaderClose(t, false) }
func TestAsyncReaderCloseWriteTo(t *testing.T) { testAsyncReaderClose(t, true) }
func TestAsyncReaderSkipBytes(t *testing.T) {
t.Parallel()
data := make([]byte, 15000)
buf := make([]byte, len(data))
r := rand.New(rand.NewSource(42))
n, err := r.Read(data)
require.NoError(t, err)
require.Equal(t, len(data), n)
initialReads := []int{0, 1, 100, 2048,
softStartInitial - 1, softStartInitial, softStartInitial + 1,
8000, len(data)}
skips := []int{-1000, -101, -100, -99, 0, 1, 2048,
softStartInitial - 1, softStartInitial, softStartInitial + 1,
8000, len(data), BufferSize, 2 * BufferSize}
for buffers := 1; buffers <= 5; buffers++ {
t.Run(fmt.Sprintf("%d", buffers), func(t *testing.T) {
for _, initialRead := range initialReads {
t.Run(fmt.Sprintf("%d", initialRead), func(t *testing.T) {
for _, skip := range skips {
t.Run(fmt.Sprintf("%d", skip), func(t *testing.T) {
ar, err := New(ioutil.NopCloser(bytes.NewReader(data)), buffers)
require.NoError(t, err)
wantSkipFalse := false
buf = buf[:initialRead]
n, err := readers.ReadFill(ar, buf)
if initialRead >= len(data) {
wantSkipFalse = true
if initialRead > len(data) {
assert.Equal(t, err, io.EOF)
} else {
assert.True(t, err == nil || err == io.EOF)
}
assert.Equal(t, len(data), n)
assert.Equal(t, data, buf[:len(data)])
} else {
assert.NoError(t, err)
assert.Equal(t, initialRead, n)
assert.Equal(t, data[:initialRead], buf)
}
skipped := ar.SkipBytes(skip)
buf = buf[:1024]
n, err = readers.ReadFill(ar, buf)
offset := initialRead + skip
if skipped {
assert.False(t, wantSkipFalse)
l := len(buf)
if offset >= len(data) {
assert.Equal(t, err, io.EOF)
} else {
if offset+1024 >= len(data) {
l = len(data) - offset
}
assert.Equal(t, l, n)
assert.Equal(t, data[offset:offset+l], buf[:l])
}
} else {
if initialRead >= len(data) {
assert.Equal(t, err, io.EOF)
} else {
assert.True(t, err == errorStreamAbandoned || err == io.EOF)
}
}
})
}
})
}
})
}
}

View file

@ -104,8 +104,16 @@ func (fh *ReadFileHandle) seek(offset int64, reopen bool) (err error) {
if fh.noSeek {
return ESPIPE
}
fh.r.StopBuffering() // stop the background reading first
fh.hash = nil
if !reopen {
ar := fh.r.GetAsyncReader()
// try to fullfill the seek with buffer discard
if ar != nil && ar.SkipBytes(int(offset-fh.offset)) {
fh.offset = offset
return nil
}
}
fh.r.StopBuffering() // stop the background reading first
oldReader := fh.r.GetReader()
r, ok := oldReader.(*chunkedreader.ChunkedReader)
if !ok {