diff --git a/fs/buffer.go b/fs/buffer.go new file mode 100644 index 000000000..be22e349a --- /dev/null +++ b/fs/buffer.go @@ -0,0 +1,199 @@ +package fs + +import ( + "fmt" + "io" +) + +// asyncReader will do async read-ahead from the input reader +// and make the data available as an io.Reader. +// This should be fully transparent, except that once an error +// has been returned from the Reader, it will not recover. +type asyncReader struct { + in io.ReadCloser // Input reader + ready chan *buffer // Buffers ready to be handed to the reader + reuse chan *buffer // Buffers to reuse for input reading + exit chan struct{} // Closes when finished + buffers int // Number of buffers + err error // If an error has occurred it is here + cur *buffer // Current buffer being served + exited chan struct{} // Channel is closed been the async reader shuts down + closed bool // Has the parent reader been closed? +} + +// newAsyncReader returns a reader that will asynchronously read from +// the supplied Reader into a number of buffers each with a given size. +// It will start reading from the input at once, maybe even before this +// function has returned. +// The input can be read from the returned reader. +// When done use Close to release the buffers and close the supplied input. +func newAsyncReader(rd io.ReadCloser, buffers, size int) (io.ReadCloser, error) { + if size <= 0 { + return nil, fmt.Errorf("buffer size too small") + } + if buffers <= 0 { + return nil, fmt.Errorf("number of buffers too small") + } + if rd == nil { + return nil, fmt.Errorf("nil reader supplied") + } + a := &asyncReader{} + a.init(rd, buffers, size) + return a, nil +} + +func (a *asyncReader) init(rd io.ReadCloser, buffers, size int) { + a.in = rd + a.ready = make(chan *buffer, buffers) + a.reuse = make(chan *buffer, buffers) + a.exit = make(chan struct{}, 0) + a.exited = make(chan struct{}, 0) + a.buffers = buffers + a.cur = nil + + // Create buffers + for i := 0; i < buffers; i++ { + a.reuse <- newBuffer(size) + } + + // Start async reader + go func() { + // Ensure that when we exit this is signalled. + defer close(a.exited) + for { + select { + case b := <-a.reuse: + err := b.read(a.in) + a.ready <- b + if err != nil { + close(a.ready) + return + } + case <-a.exit: + return + } + } + }() +} + +// Read will return the next available data. +func (a *asyncReader) fill() (err error) { + if a.cur.isEmpty() { + if a.cur != nil { + a.reuse <- a.cur + a.cur = nil + } + b, ok := <-a.ready + if !ok { + return a.err + } + a.cur = b + } + return nil +} + +// Read will return the next available data. +func (a *asyncReader) Read(p []byte) (n int, err error) { + // Swap buffer and maybe return error + err = a.fill() + if err != nil { + return 0, err + } + + // Copy what we can + n = copy(p, a.cur.buffer()) + a.cur.increment(n) + + // If at end of buffer, return any error, if present + if a.cur.isEmpty() { + a.err = a.cur.err + return n, a.err + } + return n, nil +} + +// WriteTo writes data to w until there's no more data to write or when an error occurs. +// The return value n is the number of bytes written. +// Any error encountered during the write is also returned. +func (a *asyncReader) WriteTo(w io.Writer) (n int64, err error) { + n = 0 + for { + err = a.fill() + if err != nil { + return n, err + } + n2, err := w.Write(a.cur.buffer()) + a.cur.increment(n2) + n += int64(n2) + if err != nil { + return n, err + } + if a.cur.err != nil { + a.err = a.cur.err + return n, a.cur.err + } + } +} + +// Close will ensure that the underlying async reader is shut down. +// It will also close the input supplied on newAsyncReader. +func (a *asyncReader) Close() (err error) { + select { + case <-a.exited: + default: + close(a.exit) + <-a.exited + } + if !a.closed { + a.closed = true + return a.in.Close() + } + return nil +} + +// Internal buffer +// If an error is present, it must be returned +// once all buffer content has been served. +type buffer struct { + buf []byte + err error + offset int + size int +} + +func newBuffer(size int) *buffer { + return &buffer{buf: make([]byte, size), err: nil, size: size} +} + +// isEmpty returns true is offset is at end of +// buffer, or +func (b *buffer) isEmpty() bool { + if b == nil { + return true + } + if len(b.buf)-b.offset <= 0 { + return true + } + return false +} + +// read into start of the buffer from the supplied reader, +// resets the offset and updates the size of the buffer. +// Any error encountered during the read is returned. +func (b *buffer) read(rd io.Reader) error { + var n int + n, b.err = rd.Read(b.buf[0:b.size]) + b.buf = b.buf[0:n] + b.offset = 0 + return b.err +} + +// Return the buffer at current offset +func (b *buffer) buffer() []byte { + return b.buf[b.offset:] +} + +// increment the offset +func (b *buffer) increment(n int) { + b.offset += n +} diff --git a/fs/buffer_test.go b/fs/buffer_test.go new file mode 100644 index 000000000..be4a286c0 --- /dev/null +++ b/fs/buffer_test.go @@ -0,0 +1,268 @@ +package fs + +import ( + "bufio" + "bytes" + "io" + "io/ioutil" + "strings" + "testing" + "testing/iotest" +) + +func TestAsyncReader(t *testing.T) { + buf := ioutil.NopCloser(bytes.NewBufferString("Testbuffer")) + ar, err := newAsyncReader(buf, 4, 10000) + if err != nil { + t.Fatal("error when creating:", err) + } + + var dst = make([]byte, 100) + n, err := ar.Read(dst) + if err != nil { + t.Fatal("error when reading:", err) + } + if n != 10 { + t.Fatal("unexpected length, expected 10, got ", n) + } + + n, err = ar.Read(dst) + if err != io.EOF { + t.Fatal("expected io.EOF, got", err) + } + if n != 0 { + t.Fatal("unexpected length, expected 0, got ", n) + } + + // Test read after error + n, err = ar.Read(dst) + if err != io.EOF { + t.Fatal("expected io.EOF, got", err) + } + if n != 0 { + t.Fatal("unexpected length, expected 0, got ", n) + } + + err = ar.Close() + if err != nil { + t.Fatal("error when closing:", err) + } + // Test double close + err = ar.Close() + if err != nil { + t.Fatal("error when closing:", err) + } + + // Test Close without reading everything + buf = ioutil.NopCloser(bytes.NewBuffer(make([]byte, 50000))) + ar, err = newAsyncReader(buf, 4, 100) + if err != nil { + t.Fatal("error when creating:", err) + } + err = ar.Close() + if err != nil { + t.Fatal("error when closing, noread:", err) + } + +} + +func TestAsyncWriteTo(t *testing.T) { + buf := ioutil.NopCloser(bytes.NewBufferString("Testbuffer")) + ar, err := newAsyncReader(buf, 4, 10000) + if err != nil { + t.Fatal("error when creating:", err) + } + + var dst = &bytes.Buffer{} + n, err := io.Copy(dst, ar) + if err != io.EOF { + t.Fatal("error when reading:", err) + } + if n != 10 { + t.Fatal("unexpected length, expected 10, got ", n) + } + + // Should still return EOF + n, err = io.Copy(dst, ar) + if err != io.EOF { + t.Fatal("expected io.EOF, got", err) + } + if n != 0 { + t.Fatal("unexpected length, expected 0, got ", n) + } + + err = ar.Close() + if err != nil { + t.Fatal("error when closing:", err) + } +} + +func TestAsyncReaderErrors(t *testing.T) { + // test nil reader + _, err := newAsyncReader(nil, 4, 10000) + if err == nil { + t.Fatal("expected error when creating, but got nil") + } + + // invalid buffer number + buf := ioutil.NopCloser(bytes.NewBufferString("Testbuffer")) + _, err = newAsyncReader(buf, 0, 10000) + if err == nil { + t.Fatal("expected error when creating, but got nil") + } + _, err = newAsyncReader(buf, -1, 10000) + if err == nil { + t.Fatal("expected error when creating, but got nil") + } + + // invalid buffer size + _, err = newAsyncReader(buf, 4, 0) + if err == nil { + t.Fatal("expected error when creating, but got nil") + } + _, err = newAsyncReader(buf, 4, -1) + if err == nil { + t.Fatal("expected error when creating, but got nil") + } +} + +// Complex read tests, leveraged from "bufio". + +type readMaker struct { + name string + fn func(io.Reader) io.Reader +} + +var readMakers = []readMaker{ + {"full", func(r io.Reader) io.Reader { return r }}, + {"byte", iotest.OneByteReader}, + {"half", iotest.HalfReader}, + {"data+err", iotest.DataErrReader}, + {"timeout", iotest.TimeoutReader}, +} + +// Call Read to accumulate the text of a file +func reads(buf io.Reader, m int) string { + var b [1000]byte + nb := 0 + for { + n, err := buf.Read(b[nb : nb+m]) + nb += n + if err == io.EOF { + break + } else if err != nil && err != iotest.ErrTimeout { + panic("Data: " + err.Error()) + } else if err != nil { + break + } + } + return string(b[0:nb]) +} + +type bufReader struct { + name string + fn func(io.Reader) string +} + +var bufreaders = []bufReader{ + {"1", func(b io.Reader) string { return reads(b, 1) }}, + {"2", func(b io.Reader) string { return reads(b, 2) }}, + {"3", func(b io.Reader) string { return reads(b, 3) }}, + {"4", func(b io.Reader) string { return reads(b, 4) }}, + {"5", func(b io.Reader) string { return reads(b, 5) }}, + {"7", func(b io.Reader) string { return reads(b, 7) }}, +} + +const minReadBufferSize = 16 + +var bufsizes = []int{ + 0, minReadBufferSize, 23, 32, 46, 64, 93, 128, 1024, 4096, +} + +// Test various input buffer sizes, number of buffers and read sizes. +func TestAsyncReaderSizes(t *testing.T) { + var texts [31]string + str := "" + all := "" + for i := 0; i < len(texts)-1; i++ { + texts[i] = str + "\n" + all += texts[i] + str += string(i%26 + 'a') + } + texts[len(texts)-1] = all + + for h := 0; h < len(texts); h++ { + text := texts[h] + for i := 0; i < len(readMakers); i++ { + for j := 0; j < len(bufreaders); j++ { + for k := 0; k < len(bufsizes); k++ { + for l := 1; l < 10; l++ { + readmaker := readMakers[i] + bufreader := bufreaders[j] + bufsize := bufsizes[k] + read := readmaker.fn(strings.NewReader(text)) + buf := bufio.NewReaderSize(read, bufsize) + ar, _ := newAsyncReader(ioutil.NopCloser(buf), l, 100) + s := bufreader.fn(ar) + // "timeout" expects the Reader to recover, asyncReader does not. + if s != text && readmaker.name != "timeout" { + t.Errorf("reader=%s fn=%s bufsize=%d want=%q got=%q", + readmaker.name, bufreader.name, bufsize, text, s) + } + err := ar.Close() + if err != nil { + t.Fatal("Unexpected close error:", err) + } + } + } + } + } + } +} + +// Test various input buffer sizes, number of buffers and read sizes. +func TestAsyncReaderWriteTo(t *testing.T) { + var texts [31]string + str := "" + all := "" + for i := 0; i < len(texts)-1; i++ { + texts[i] = str + "\n" + all += texts[i] + str += string(i%26 + 'a') + } + texts[len(texts)-1] = all + + for h := 0; h < len(texts); h++ { + text := texts[h] + for i := 0; i < len(readMakers); i++ { + for j := 0; j < len(bufreaders); j++ { + for k := 0; k < len(bufsizes); k++ { + for l := 1; l < 10; l++ { + readmaker := readMakers[i] + bufreader := bufreaders[j] + bufsize := bufsizes[k] + read := readmaker.fn(strings.NewReader(text)) + buf := bufio.NewReaderSize(read, bufsize) + ar, _ := newAsyncReader(ioutil.NopCloser(buf), l, 100) + dst := &bytes.Buffer{} + wt := ar.(io.WriterTo) + _, err := wt.WriteTo(dst) + if err != nil && err != io.EOF && err != iotest.ErrTimeout { + t.Fatal("Copy:", err) + } + s := dst.String() + // "timeout" expects the Reader to recover, asyncReader does not. + if s != text && readmaker.name != "timeout" { + t.Errorf("reader=%s fn=%s bufsize=%d want=%q got=%q", + readmaker.name, bufreader.name, bufsize, text, s) + } + err = ar.Close() + if err != nil { + t.Fatal("Unexpected close error:", err) + } + } + } + } + } + } +} diff --git a/fs/operations.go b/fs/operations.go index ffd1b1949..7b8fc86bb 100644 --- a/fs/operations.go +++ b/fs/operations.go @@ -198,6 +198,12 @@ tryAgain: ErrorLog(src, "Failed to open: %s", err) return } + + // On big files add a buffer + if src.Size() > 10<<20 { + in0, _ = newAsyncReader(in0, 4, 4<<20) + } + in := NewAccount(in0, src) // account the transfer if doUpdate {