forked from TrueCloudLab/rclone
11da2a6c9b
The purpose of this is to make it easier to maintain and eventually to allow the rclone backends to be re-used in other projects without having to use the rclone configuration system. The new code layout is documented in CONTRIBUTING.
283 lines
6.5 KiB
Go
283 lines
6.5 KiB
Go
package asyncreader
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"io"
|
|
"io/ioutil"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"testing/iotest"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestAsyncReader(t *testing.T) {
|
|
buf := ioutil.NopCloser(bytes.NewBufferString("Testbuffer"))
|
|
ar, err := New(buf, 4)
|
|
require.NoError(t, err)
|
|
|
|
var dst = make([]byte, 100)
|
|
n, err := ar.Read(dst)
|
|
assert.Equal(t, io.EOF, err)
|
|
assert.Equal(t, 10, n)
|
|
|
|
n, err = ar.Read(dst)
|
|
assert.Equal(t, io.EOF, err)
|
|
assert.Equal(t, 0, n)
|
|
|
|
// Test read after error
|
|
n, err = ar.Read(dst)
|
|
assert.Equal(t, io.EOF, err)
|
|
assert.Equal(t, 0, n)
|
|
|
|
err = ar.Close()
|
|
require.NoError(t, err)
|
|
// Test double close
|
|
err = ar.Close()
|
|
require.NoError(t, err)
|
|
|
|
// Test Close without reading everything
|
|
buf = ioutil.NopCloser(bytes.NewBuffer(make([]byte, 50000)))
|
|
ar, err = New(buf, 4)
|
|
require.NoError(t, err)
|
|
err = ar.Close()
|
|
require.NoError(t, err)
|
|
|
|
}
|
|
|
|
func TestAsyncWriteTo(t *testing.T) {
|
|
buf := ioutil.NopCloser(bytes.NewBufferString("Testbuffer"))
|
|
ar, err := New(buf, 4)
|
|
require.NoError(t, err)
|
|
|
|
var dst = &bytes.Buffer{}
|
|
n, err := io.Copy(dst, ar)
|
|
assert.Equal(t, io.EOF, err)
|
|
assert.Equal(t, int64(10), n)
|
|
|
|
// Should still return EOF
|
|
n, err = io.Copy(dst, ar)
|
|
assert.Equal(t, io.EOF, err)
|
|
assert.Equal(t, int64(0), n)
|
|
|
|
err = ar.Close()
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
func TestAsyncReaderErrors(t *testing.T) {
|
|
// test nil reader
|
|
_, err := New(nil, 4)
|
|
require.Error(t, err)
|
|
|
|
// invalid buffer number
|
|
buf := ioutil.NopCloser(bytes.NewBufferString("Testbuffer"))
|
|
_, err = New(buf, 0)
|
|
require.Error(t, err)
|
|
_, err = New(buf, -1)
|
|
require.Error(t, err)
|
|
}
|
|
|
|
// Complex read tests, leveraged from "bufio".
|
|
|
|
type readMaker struct {
|
|
name string
|
|
fn func(io.Reader) io.Reader
|
|
}
|
|
|
|
var readMakers = []readMaker{
|
|
{"full", func(r io.Reader) io.Reader { return r }},
|
|
{"byte", iotest.OneByteReader},
|
|
{"half", iotest.HalfReader},
|
|
{"data+err", iotest.DataErrReader},
|
|
{"timeout", iotest.TimeoutReader},
|
|
}
|
|
|
|
// Call Read to accumulate the text of a file
|
|
func reads(buf io.Reader, m int) string {
|
|
var b [1000]byte
|
|
nb := 0
|
|
for {
|
|
n, err := buf.Read(b[nb : nb+m])
|
|
nb += n
|
|
if err == io.EOF {
|
|
break
|
|
} else if err != nil && err != iotest.ErrTimeout {
|
|
panic("Data: " + err.Error())
|
|
} else if err != nil {
|
|
break
|
|
}
|
|
}
|
|
return string(b[0:nb])
|
|
}
|
|
|
|
type bufReader struct {
|
|
name string
|
|
fn func(io.Reader) string
|
|
}
|
|
|
|
var bufreaders = []bufReader{
|
|
{"1", func(b io.Reader) string { return reads(b, 1) }},
|
|
{"2", func(b io.Reader) string { return reads(b, 2) }},
|
|
{"3", func(b io.Reader) string { return reads(b, 3) }},
|
|
{"4", func(b io.Reader) string { return reads(b, 4) }},
|
|
{"5", func(b io.Reader) string { return reads(b, 5) }},
|
|
{"7", func(b io.Reader) string { return reads(b, 7) }},
|
|
}
|
|
|
|
const minReadBufferSize = 16
|
|
|
|
var bufsizes = []int{
|
|
0, minReadBufferSize, 23, 32, 46, 64, 93, 128, 1024, 4096,
|
|
}
|
|
|
|
// Test various input buffer sizes, number of buffers and read sizes.
|
|
func TestAsyncReaderSizes(t *testing.T) {
|
|
var texts [31]string
|
|
str := ""
|
|
all := ""
|
|
for i := 0; i < len(texts)-1; i++ {
|
|
texts[i] = str + "\n"
|
|
all += texts[i]
|
|
str += string(i%26 + 'a')
|
|
}
|
|
texts[len(texts)-1] = all
|
|
|
|
for h := 0; h < len(texts); h++ {
|
|
text := texts[h]
|
|
for i := 0; i < len(readMakers); i++ {
|
|
for j := 0; j < len(bufreaders); j++ {
|
|
for k := 0; k < len(bufsizes); k++ {
|
|
for l := 1; l < 10; l++ {
|
|
readmaker := readMakers[i]
|
|
bufreader := bufreaders[j]
|
|
bufsize := bufsizes[k]
|
|
read := readmaker.fn(strings.NewReader(text))
|
|
buf := bufio.NewReaderSize(read, bufsize)
|
|
ar, _ := New(ioutil.NopCloser(buf), l)
|
|
s := bufreader.fn(ar)
|
|
// "timeout" expects the Reader to recover, AsyncReader does not.
|
|
if s != text && readmaker.name != "timeout" {
|
|
t.Errorf("reader=%s fn=%s bufsize=%d want=%q got=%q",
|
|
readmaker.name, bufreader.name, bufsize, text, s)
|
|
}
|
|
err := ar.Close()
|
|
require.NoError(t, err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Test various input buffer sizes, number of buffers and read sizes.
|
|
func TestAsyncReaderWriteTo(t *testing.T) {
|
|
var texts [31]string
|
|
str := ""
|
|
all := ""
|
|
for i := 0; i < len(texts)-1; i++ {
|
|
texts[i] = str + "\n"
|
|
all += texts[i]
|
|
str += string(i%26 + 'a')
|
|
}
|
|
texts[len(texts)-1] = all
|
|
|
|
for h := 0; h < len(texts); h++ {
|
|
text := texts[h]
|
|
for i := 0; i < len(readMakers); i++ {
|
|
for j := 0; j < len(bufreaders); j++ {
|
|
for k := 0; k < len(bufsizes); k++ {
|
|
for l := 1; l < 10; l++ {
|
|
readmaker := readMakers[i]
|
|
bufreader := bufreaders[j]
|
|
bufsize := bufsizes[k]
|
|
read := readmaker.fn(strings.NewReader(text))
|
|
buf := bufio.NewReaderSize(read, bufsize)
|
|
ar, _ := New(ioutil.NopCloser(buf), l)
|
|
dst := &bytes.Buffer{}
|
|
_, err := ar.WriteTo(dst)
|
|
if err != nil && err != io.EOF && err != iotest.ErrTimeout {
|
|
t.Fatal("Copy:", err)
|
|
}
|
|
s := dst.String()
|
|
// "timeout" expects the Reader to recover, AsyncReader does not.
|
|
if s != text && readmaker.name != "timeout" {
|
|
t.Errorf("reader=%s fn=%s bufsize=%d want=%q got=%q",
|
|
readmaker.name, bufreader.name, bufsize, text, s)
|
|
}
|
|
err = ar.Close()
|
|
require.NoError(t, err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Read an infinite number of zeros
|
|
type zeroReader struct {
|
|
closed bool
|
|
}
|
|
|
|
func (z *zeroReader) Read(p []byte) (n int, err error) {
|
|
if z.closed {
|
|
return 0, io.EOF
|
|
}
|
|
for i := range p {
|
|
p[i] = 0
|
|
}
|
|
return len(p), nil
|
|
}
|
|
|
|
func (z *zeroReader) Close() error {
|
|
if z.closed {
|
|
panic("double close on zeroReader")
|
|
}
|
|
z.closed = true
|
|
return nil
|
|
}
|
|
|
|
// Test closing and abandoning
|
|
func testAsyncReaderClose(t *testing.T, writeto bool) {
|
|
zr := &zeroReader{}
|
|
a, err := New(zr, 16)
|
|
require.NoError(t, err)
|
|
var copyN int64
|
|
var copyErr error
|
|
var wg sync.WaitGroup
|
|
started := make(chan struct{})
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
close(started)
|
|
if writeto {
|
|
// exercise the WriteTo path
|
|
copyN, copyErr = a.WriteTo(ioutil.Discard)
|
|
} else {
|
|
// exercise the Read path
|
|
buf := make([]byte, 64*1024)
|
|
for {
|
|
var n int
|
|
n, copyErr = a.Read(buf)
|
|
copyN += int64(n)
|
|
if copyErr != nil {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
// Do some copying
|
|
<-started
|
|
time.Sleep(100 * time.Millisecond)
|
|
// Abandon the copy
|
|
a.Abandon()
|
|
wg.Wait()
|
|
assert.Equal(t, errorStreamAbandoned, copyErr)
|
|
// t.Logf("Copied %d bytes, err %v", copyN, copyErr)
|
|
assert.True(t, copyN > 0)
|
|
}
|
|
func TestAsyncReaderCloseRead(t *testing.T) { testAsyncReaderClose(t, false) }
|
|
func TestAsyncReaderCloseWriteTo(t *testing.T) { testAsyncReaderClose(t, true) }
|