diff --git a/backend/b2/upload.go b/backend/b2/upload.go index 399aaa6a2..0dedaeaeb 100644 --- a/backend/b2/upload.go +++ b/backend/b2/upload.go @@ -70,6 +70,7 @@ type largeUpload struct { f *Fs // parent Fs o *Object // object being uploaded in io.Reader // read the data from here + wrap accounting.WrapFn // account parts being transferred id string // ID of the file being uploaded size int64 // total size parts int64 // calculated number of parts, if known @@ -126,10 +127,14 @@ func (f *Fs) newLargeUpload(o *Object, in io.Reader, src fs.ObjectInfo) (up *lar if err != nil { return nil, err } + // unwrap the accounting from the input, we use wrap to put it + // back on after the buffering + in, wrap := accounting.UnWrap(in) up = &largeUpload{ f: f, o: o, in: in, + wrap: wrap, id: response.ID, size: size, parts: parts, @@ -221,7 +226,7 @@ func (up *largeUpload) transferChunk(part int64, body []byte) error { opts := rest.Opts{ Method: "POST", RootURL: upload.UploadURL, - Body: accounting.AccountPart(up.o, in), + Body: up.wrap(in), ExtraHeaders: map[string]string{ "Authorization": upload.AuthorizationToken, "X-Bz-Part-Number": fmt.Sprintf("%d", part), @@ -331,7 +336,6 @@ func (up *largeUpload) Stream(initialUploadBlock []byte) (err error) { errs := make(chan error, 1) hasMoreParts := true var wg sync.WaitGroup - accounting.AccountByPart(up.o) // Cancel whole file accounting before reading // Transfer initial chunk up.size = int64(len(initialUploadBlock)) @@ -392,7 +396,6 @@ func (up *largeUpload) Upload() error { errs := make(chan error, 1) var wg sync.WaitGroup var err error - accounting.AccountByPart(up.o) // Cancel whole file accounting before reading outer: for part := int64(1); part <= up.parts; part++ { // Check any errors diff --git a/backend/crypt/cipher.go b/backend/crypt/cipher.go index 5394ef0b2..23feabcf4 100644 --- a/backend/crypt/cipher.go +++ b/backend/crypt/cipher.go @@ -14,6 +14,7 @@ import ( "unicode/utf8" "github.com/ncw/rclone/backend/crypt/pkcs7" + "github.com/ncw/rclone/fs/accounting" "github.com/pkg/errors" "golang.org/x/crypto/nacl/secretbox" @@ -691,11 +692,12 @@ func (fh *encrypter) finish(err error) (int, error) { // Encrypt data encrypts the data stream func (c *cipher) EncryptData(in io.Reader) (io.Reader, error) { + in, wrap := accounting.UnWrap(in) // unwrap the accounting off the Reader out, err := c.newEncrypter(in, nil) if err != nil { return nil, err } - return out, nil + return wrap(out), nil // and wrap the accounting back on } // decrypter decrypts an io.ReaderCloser on the fly diff --git a/fs/accounting/accounting.go b/fs/accounting/accounting.go index 08a2d77a2..cf0340f93 100644 --- a/fs/accounting/accounting.go +++ b/fs/accounting/accounting.go @@ -20,8 +20,9 @@ type Account struct { // CancelRequest so this race can happen when it apparently // shouldn't. mu sync.Mutex - in io.ReadCloser + in io.Reader origIn io.ReadCloser + close io.Closer size int64 name string statmu sync.Mutex // Separate mutex for stat values. @@ -33,8 +34,6 @@ type Account struct { closed bool // set if the file is closed exit chan struct{} // channel that will be closed when transfer is finished withBuf bool // is using a buffered in - - wholeFileDisabled bool // disables the whole file when doing parts } // NewAccountSizeName makes a Account reader for an io.ReadCloser of @@ -42,6 +41,7 @@ type Account struct { func NewAccountSizeName(in io.ReadCloser, size int64, name string) *Account { acc := &Account{ in: in, + close: in, origIn: in, size: size, name: name, @@ -70,17 +70,18 @@ func (acc *Account) WithBuffer() *Account { } // On big files add a buffer if buffers > 0 { - in, err := asyncreader.New(acc.in, buffers) + rc, err := asyncreader.New(acc.origIn, buffers) if err != nil { fs.Errorf(acc.name, "Failed to make buffer: %v", err) } else { - acc.in = in + acc.in = rc + acc.close = rc } } return acc } -// GetReader returns the underlying io.ReadCloser +// GetReader returns the underlying io.ReadCloser under any Buffer func (acc *Account) GetReader() io.ReadCloser { acc.mu.Lock() defer acc.mu.Unlock() @@ -94,29 +95,19 @@ func (acc *Account) StopBuffering() { } } -// UpdateReader updates the underlying io.ReadCloser +// UpdateReader updates the underlying io.ReadCloser stopping the +// asynb buffer (if any) and re-adding it func (acc *Account) UpdateReader(in io.ReadCloser) { acc.mu.Lock() acc.StopBuffering() acc.in = in + acc.close = in acc.origIn = in acc.WithBuffer() acc.mu.Unlock() } -// disableWholeFileAccounting turns off the whole file accounting -func (acc *Account) disableWholeFileAccounting() { - acc.mu.Lock() - acc.wholeFileDisabled = true - acc.mu.Unlock() -} - -// accountPart disables the whole file counter and returns an -// io.Reader to wrap a segment of the transfer. -func (acc *Account) accountPart(in io.Reader) io.Reader { - return newAccountStream(acc, in) -} - +// averageLoop calculates averages for the stats in the background func (acc *Account) averageLoop() { tick := time.NewTicker(time.Second) defer tick.Stop() @@ -165,16 +156,25 @@ func (acc *Account) read(in io.Reader, p []byte) (n int, err error) { func (acc *Account) Read(p []byte) (n int, err error) { acc.mu.Lock() defer acc.mu.Unlock() - if acc.wholeFileDisabled { - // Don't account - return acc.in.Read(p) - } return acc.read(acc.in, p) } -// Progress returns bytes read as well as the size. +// Close the object +func (acc *Account) Close() error { + acc.mu.Lock() + defer acc.mu.Unlock() + if acc.closed { + return nil + } + acc.closed = true + close(acc.exit) + Stats.inProgress.clear(acc.name) + return acc.close.Close() +} + +// progress returns bytes read as well as the size. // Size can be <= 0 if the size is unknown. -func (acc *Account) Progress() (bytes, size int64) { +func (acc *Account) progress() (bytes, size int64) { if acc == nil { return 0, 0 } @@ -184,10 +184,10 @@ func (acc *Account) Progress() (bytes, size int64) { return bytes, size } -// Speed returns the speed of the current file transfer +// speed returns the speed of the current file transfer // in bytes per second, as well a an exponentially weighted moving average // If no read has completed yet, 0 is returned for both values. -func (acc *Account) Speed() (bps, current float64) { +func (acc *Account) speed() (bps, current float64) { if acc == nil { return 0, 0 } @@ -203,10 +203,10 @@ func (acc *Account) Speed() (bps, current float64) { return } -// ETA returns the ETA of the current operation, +// eta returns the ETA of the current operation, // rounded to full seconds. // If the ETA cannot be determined 'ok' returns false. -func (acc *Account) ETA() (eta time.Duration, ok bool) { +func (acc *Account) eta() (eta time.Duration, ok bool) { if acc == nil || acc.size <= 0 { return 0, false } @@ -230,9 +230,9 @@ func (acc *Account) ETA() (eta time.Duration, ok bool) { // String produces stats for this file func (acc *Account) String() string { - a, b := acc.Progress() - _, cur := acc.Speed() - eta, etaok := acc.ETA() + a, b := acc.progress() + _, cur := acc.speed() + eta, etaok := acc.eta() etas := "-" if etaok { if eta > 0 { @@ -268,17 +268,27 @@ func (acc *Account) String() string { ) } -// Close the object -func (acc *Account) Close() error { +// OldStream returns the top io.Reader +func (acc *Account) OldStream() io.Reader { acc.mu.Lock() defer acc.mu.Unlock() - if acc.closed { - return nil + return acc.in +} + +// SetStream updates the top io.Reader +func (acc *Account) SetStream(in io.Reader) { + acc.mu.Lock() + acc.in = in + acc.mu.Unlock() +} + +// WrapStream wraps an io Reader so it will be accounted in the same +// way as account +func (acc *Account) WrapStream(in io.Reader) io.Reader { + return &accountStream{ + acc: acc, + in: in, } - acc.closed = true - close(acc.exit) - Stats.inProgress.clear(acc.name) - return acc.in.Close() } // accountStream accounts a single io.Reader into a parent *Account @@ -287,12 +297,19 @@ type accountStream struct { in io.Reader } -// newAccountStream makes a new accountStream for an in -func newAccountStream(acc *Account, in io.Reader) *accountStream { - return &accountStream{ - acc: acc, - in: in, - } +// OldStream return the underlying stream +func (a *accountStream) OldStream() io.Reader { + return a.in +} + +// SetStream set the underlying stream +func (a *accountStream) SetStream(in io.Reader) { + a.in = in +} + +// WrapStream wrap in in an accounter +func (a *accountStream) WrapStream(in io.Reader) io.Reader { + return a.acc.WrapStream(in) } // Read bytes from the object - see io.Reader @@ -300,33 +317,30 @@ func (a *accountStream) Read(p []byte) (n int, err error) { return a.acc.read(a.in, p) } -// AccountByPart turns off whole file accounting -// -// Returns the current account or nil if not found -func AccountByPart(obj fs.Object) *Account { - acc := Stats.inProgress.get(obj.Remote()) - if acc == nil { - fs.Debugf(obj, "Didn't find object to account part transfer") - return nil - } - acc.disableWholeFileAccounting() - return acc +// Accounter accounts a stream allowing the accounting to be removed and re-added +type Accounter interface { + io.Reader + OldStream() io.Reader + SetStream(io.Reader) + WrapStream(io.Reader) io.Reader } -// AccountPart accounts for part of a transfer -// -// It disables the whole file counter and returns an io.Reader to wrap -// a segment of the transfer. -func AccountPart(obj fs.Object, in io.Reader) io.Reader { - acc := AccountByPart(obj) - if acc == nil { - return in - } - return acc.accountPart(in) -} +// WrapFn wraps an io.Reader (for accounting purposes usually) +type WrapFn func(io.Reader) io.Reader -// Check it satisfies the interface -var ( - _ io.ReadCloser = &Account{} - _ io.Reader = &accountStream{} -) +// UnWrap unwraps a reader returning unwrapped and wrap, a function to +// wrap it back up again. If `in` is an Accounter then this function +// will take the accounting unwrapped and wrap will put it back on +// again the new Reader passed in. +// +// This allows functions which wrap io.Readers to move the accounting +// to the end of the wrapped chain of readers. This is very important +// if buffering is being introduced and if the Reader might be wrapped +// again. +func UnWrap(in io.Reader) (unwrapped io.Reader, wrap WrapFn) { + acc, ok := in.(Accounter) + if !ok { + return in, func(r io.Reader) io.Reader { return r } + } + return acc.OldStream(), acc.WrapStream +} diff --git a/fs/accounting/accounting_test.go b/fs/accounting/accounting_test.go new file mode 100644 index 000000000..554a49623 --- /dev/null +++ b/fs/accounting/accounting_test.go @@ -0,0 +1,183 @@ +package accounting + +import ( + "bytes" + "io" + "io/ioutil" + "strings" + "testing" + + "github.com/ncw/rclone/fs/asyncreader" + "github.com/ncw/rclone/fstest/mockobject" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// Check it satisfies the interfaces +var ( + _ io.ReadCloser = &Account{} + _ io.Reader = &accountStream{} + _ Accounter = &Account{} + _ Accounter = &accountStream{} +) + +func TestNewAccountSizeName(t *testing.T) { + in := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) + acc := NewAccountSizeName(in, 1, "test") + assert.Equal(t, in, acc.in) + assert.Equal(t, acc, Stats.inProgress.get("test")) + err := acc.Close() + assert.NoError(t, err) + assert.Nil(t, Stats.inProgress.get("test")) +} + +func TestNewAccount(t *testing.T) { + obj := mockobject.Object("test") + in := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) + acc := NewAccount(in, obj) + assert.Equal(t, in, acc.in) + assert.Equal(t, acc, Stats.inProgress.get("test")) + err := acc.Close() + assert.NoError(t, err) + assert.Nil(t, Stats.inProgress.get("test")) +} + +func TestAccountWithBuffer(t *testing.T) { + in := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) + + acc := NewAccountSizeName(in, -1, "test") + acc.WithBuffer() + // should have a buffer for an unknown size + _, ok := acc.in.(*asyncreader.AsyncReader) + require.True(t, ok) + assert.NoError(t, acc.Close()) + + acc = NewAccountSizeName(in, 1, "test") + acc.WithBuffer() + // should not have a buffer for a small size + _, ok = acc.in.(*asyncreader.AsyncReader) + require.False(t, ok) + assert.NoError(t, acc.Close()) +} + +func TestAccountGetUpdateReader(t *testing.T) { + in := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) + acc := NewAccountSizeName(in, 1, "test") + + assert.Equal(t, in, acc.GetReader()) + + in2 := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) + acc.UpdateReader(in2) + + assert.Equal(t, in2, acc.GetReader()) + + assert.NoError(t, acc.Close()) +} + +func TestAccountRead(t *testing.T) { + in := ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3})) + acc := NewAccountSizeName(in, 1, "test") + + assert.True(t, acc.start.IsZero()) + assert.Equal(t, 0, acc.lpBytes) + assert.Equal(t, int64(0), acc.bytes) + assert.Equal(t, int64(0), Stats.bytes) + + var buf = make([]byte, 2) + n, err := acc.Read(buf) + assert.NoError(t, err) + assert.Equal(t, 2, n) + assert.Equal(t, []byte{1, 2}, buf[:n]) + + assert.False(t, acc.start.IsZero()) + assert.Equal(t, 2, acc.lpBytes) + assert.Equal(t, int64(2), acc.bytes) + assert.Equal(t, int64(2), Stats.bytes) + + n, err = acc.Read(buf) + assert.NoError(t, err) + assert.Equal(t, 1, n) + assert.Equal(t, []byte{3}, buf[:n]) + + n, err = acc.Read(buf) + assert.Equal(t, io.EOF, err) + assert.Equal(t, 0, n) + + assert.NoError(t, acc.Close()) +} + +func TestAccountString(t *testing.T) { + in := ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3})) + acc := NewAccountSizeName(in, 3, "test") + + // FIXME not an exhaustive test! + + assert.Equal(t, "test: 0% /3, 0/s, -", strings.TrimSpace(acc.String())) + + var buf = make([]byte, 2) + n, err := acc.Read(buf) + assert.NoError(t, err) + assert.Equal(t, 2, n) + + assert.Equal(t, "test: 66% /3, 0/s, -", strings.TrimSpace(acc.String())) + + assert.NoError(t, acc.Close()) +} + +// Test the Accounter interface methods on Account and accountStream +func TestAccountAccounter(t *testing.T) { + in := ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3})) + acc := NewAccountSizeName(in, 3, "test") + + assert.True(t, in == acc.OldStream()) + + in2 := ioutil.NopCloser(bytes.NewBuffer([]byte{2, 3, 4})) + + acc.SetStream(in2) + assert.True(t, in2 == acc.OldStream()) + + r := acc.WrapStream(in) + as, ok := r.(Accounter) + require.True(t, ok) + assert.True(t, in == as.OldStream()) + assert.True(t, in2 == acc.OldStream()) + accs, ok := r.(*accountStream) + require.True(t, ok) + assert.Equal(t, acc, accs.acc) + assert.True(t, in == accs.in) + + // Check Read on the accountStream + var buf = make([]byte, 2) + n, err := r.Read(buf) + assert.NoError(t, err) + assert.Equal(t, 2, n) + assert.Equal(t, []byte{1, 2}, buf[:n]) + + // Test that we can get another accountstream out + in3 := ioutil.NopCloser(bytes.NewBuffer([]byte{3, 1, 2})) + r2 := as.WrapStream(in3) + as2, ok := r2.(Accounter) + require.True(t, ok) + assert.True(t, in3 == as2.OldStream()) + assert.True(t, in2 == acc.OldStream()) + accs2, ok := r2.(*accountStream) + require.True(t, ok) + assert.Equal(t, acc, accs2.acc) + assert.True(t, in3 == accs2.in) + + // Test we can set this new accountStream + as2.SetStream(in) + assert.True(t, in == as2.OldStream()) + + // Test UnWrap on accountStream + unwrapped, wrap := UnWrap(r2) + assert.True(t, unwrapped == in) + r3 := wrap(in2) + assert.True(t, in2 == r3.(Accounter).OldStream()) + + // TestUnWrap on a normal io.Reader + unwrapped, wrap = UnWrap(in2) + assert.True(t, unwrapped == in2) + assert.True(t, wrap(in3) == in3) + +}