diff --git a/cmd/mount/read.go b/cmd/mount/read.go index 4739e5ac6..f4c8d9640 100644 --- a/cmd/mount/read.go +++ b/cmd/mount/read.go @@ -27,10 +27,11 @@ func newReadFileHandle(o fs.Object) (*ReadFileHandle, error) { if err != nil { return nil, err } - return &ReadFileHandle{ - r: r, + fh := &ReadFileHandle{ o: o, - }, nil + r: fs.NewAccount(r, o), // account and buffer the transfer + } + return fh, nil } // Check interface satisfied @@ -63,7 +64,7 @@ func (fh *ReadFileHandle) seek(offset int64) error { if err != nil { fs.Debug(fh.o, "ReadFileHandle.Read seek close old failed: %v", err) } - fh.r = r + fh.r = fs.NewAccount(r, fh.o) // account and buffer the transfer } fh.offset = offset return nil diff --git a/cmd/mount/write.go b/cmd/mount/write.go index a58c17a56..7d06f5553 100644 --- a/cmd/mount/write.go +++ b/cmd/mount/write.go @@ -38,8 +38,9 @@ func newWriteFileHandle(d *Dir, f *File, src fs.ObjectInfo) (*WriteFileHandle, e file: f, } fh.pipeReader, fh.pipeWriter = io.Pipe() + r := fs.NewAccountSizeName(fh.pipeReader, 0, src.Remote()) // account and buffer the transfer go func() { - o, err := d.f.Put(fh.pipeReader, src) + o, err := d.f.Put(r, src) fh.o = o fh.result <- err }() diff --git a/fs/accounting.go b/fs/accounting.go index c2bfedaff..355753ebb 100644 --- a/fs/accounting.go +++ b/fs/accounting.go @@ -276,12 +276,28 @@ type Account struct { wholeFileDisabled bool // disables the whole file when doing parts } -// NewAccount makes a Account reader for an object -func NewAccount(in io.ReadCloser, obj Object) *Account { +// NewAccountSizeName makes a Account reader for an io.ReadCloser of +// the given size and name +// +// If the file is above a certain size it adds an Async reader +func NewAccountSizeName(in io.ReadCloser, size int64, name string) *Account { + // On big files add a buffer + if size > 10<<20 { + const memUsed = 16 * 1024 * 1024 + const bufSize = 128 * 1024 + const buffers = memUsed / bufSize + newIn, err := newAsyncReader(in, buffers, bufSize) + if err != nil { + ErrorLog(name, "Failed to make buffer: %v", err) + } else { + in = newIn + } + } + acc := &Account{ in: in, - size: obj.Size(), - name: obj.Remote(), + size: size, + name: name, exit: make(chan struct{}), avg: ewma.NewMovingAverage(), lpTime: time.Now(), @@ -291,6 +307,11 @@ func NewAccount(in io.ReadCloser, obj Object) *Account { return acc } +// NewAccount makes a Account reader for an object +func NewAccount(in io.ReadCloser, obj Object) *Account { + return NewAccountSizeName(in, obj.Size(), obj.Remote()) +} + // disableWholeFileAccounting turns off the whole file accounting func (acc *Account) disableWholeFileAccounting() { acc.mu.Lock() diff --git a/fs/operations.go b/fs/operations.go index 850635e84..e48b26a07 100644 --- a/fs/operations.go +++ b/fs/operations.go @@ -264,12 +264,7 @@ func Copy(f Fs, dst Object, remote string, src Object) (err error) { if err != nil { err = errors.Wrap(err, "failed to open source object") } else { - // On big files add a buffer - if src.Size() > 10<<20 { - in0, _ = newAsyncReader(in0, 4, 4<<20) - } - - in := NewAccount(in0, src) // account the transfer + in := NewAccount(in0, src) // account and buffer the transfer wrappedSrc := &overrideRemoteObject{Object: src, remote: remote} if doUpdate { @@ -1131,7 +1126,7 @@ func Cat(f Fs, w io.Writer) error { ErrorLog(o, "Failed to close: %v", err) } }() - inAccounted := NewAccount(in, o) // account the transfer + inAccounted := NewAccount(in, o) // account and buffer the transfer _, err = io.Copy(w, inAccounted) if err != nil { Stats.Error()