accounting: change stats interface

This is done to make clear ownership over accounting object and prepare
for removing global stats object.

Stats elapsed time calculation has been altered to account for actual
transfer time instead of stats creation time.
This commit is contained in:
Aleksandar Jankovic 2019-07-16 13:56:20 +02:00 committed by Nick Craig-Wood
parent 2d561b51db
commit be0464f5f1
14 changed files with 288 additions and 118 deletions

View file

@ -214,8 +214,10 @@ func (d *Driver) ListDir(path string, callback func(ftp.FileInfo) error) (err er
} }
// Account the transfer // Account the transfer
accounting.Stats.Transferring(path) tr := accounting.Stats.NewTransferRemoteSize(path, node.Size())
defer accounting.Stats.DoneTransferring(path, true) defer func() {
tr.Done(err)
}()
for _, file := range dirEntries { for _, file := range dirEntries {
err = callback(&FileInfo{file, file.Mode(), d.vfs.Opt.UID, d.vfs.Opt.GID}) err = callback(&FileInfo{file, file.Mode(), d.vfs.Opt.UID, d.vfs.Opt.GID})
@ -311,8 +313,8 @@ func (d *Driver) GetFile(path string, offset int64) (size int64, fr io.ReadClose
} }
// Account the transfer // Account the transfer
accounting.Stats.Transferring(path) tr := accounting.Stats.NewTransferRemoteSize(path, node.Size())
defer accounting.Stats.DoneTransferring(path, true) defer tr.Done(nil)
return node.Size(), handle, nil return node.Size(), handle, nil
} }

View file

@ -187,8 +187,8 @@ func (s *server) serveFile(w http.ResponseWriter, r *http.Request, remote string
}() }()
// Account the transfer // Account the transfer
accounting.Stats.Transferring(remote) tr := accounting.Stats.NewTransfer(obj)
defer accounting.Stats.DoneTransferring(remote, true) defer tr.Done(nil)
// FIXME in = fs.NewAccount(in, obj).WithBuffer() // account the transfer // FIXME in = fs.NewAccount(in, obj).WithBuffer() // account the transfer
// Serve the file // Serve the file

View file

@ -75,8 +75,8 @@ func Error(what interface{}, w http.ResponseWriter, text string, err error) {
// Serve serves a directory // Serve serves a directory
func (d *Directory) Serve(w http.ResponseWriter, r *http.Request) { func (d *Directory) Serve(w http.ResponseWriter, r *http.Request) {
// Account the transfer // Account the transfer
accounting.Stats.Transferring(d.DirRemote) tr := accounting.Stats.NewTransferRemoteSize(d.DirRemote, -1)
defer accounting.Stats.DoneTransferring(d.DirRemote, true) defer tr.Done(nil)
fs.Infof(d.DirRemote, "%s: Serving directory", r.RemoteAddr) fs.Infof(d.DirRemote, "%s: Serving directory", r.RemoteAddr)

View file

@ -75,22 +75,11 @@ func Object(w http.ResponseWriter, r *http.Request, o fs.Object) {
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound) http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
return return
} }
accounting.Stats.Transferring(o.Remote()) tr := accounting.Stats.NewTransfer(o)
in := accounting.NewAccount(file, o) // account the transfer (no buffering)
defer func() { defer func() {
closeErr := in.Close() tr.Done(err)
if closeErr != nil {
fs.Errorf(o, "Get request: close failed: %v", closeErr)
if err == nil {
err = closeErr
}
}
ok := err == nil
accounting.Stats.DoneTransferring(o.Remote(), ok)
if !ok {
accounting.Stats.Error(err)
}
}() }()
in := tr.Account(file) // account the transfer (no buffering)
w.WriteHeader(code) w.WriteHeader(code)

View file

@ -20,6 +20,7 @@ var ErrorMaxTransferLimitReached = fserrors.FatalError(errors.New("Max transfer
// Account limits and accounts for one transfer // Account limits and accounts for one transfer
type Account struct { type Account struct {
stats *StatsInfo
// The mutex is to make sure Read() and Close() aren't called // The mutex is to make sure Read() and Close() aren't called
// concurrently. Unfortunately the persistent connection loop // concurrently. Unfortunately the persistent connection loop
// in http transport calls Read() after Do() returns on // in http transport calls Read() after Do() returns on
@ -45,10 +46,11 @@ type Account struct {
const averagePeriod = 16 // period to do exponentially weighted averages over const averagePeriod = 16 // period to do exponentially weighted averages over
// NewAccountSizeName makes a Account reader for an io.ReadCloser of // newAccountSizeName makes a Account reader for an io.ReadCloser of
// the given size and name // the given size and name
func NewAccountSizeName(in io.ReadCloser, size int64, name string) *Account { func newAccountSizeName(stats *StatsInfo, in io.ReadCloser, size int64, name string) *Account {
acc := &Account{ acc := &Account{
stats: stats,
in: in, in: in,
close: in, close: in,
origIn: in, origIn: in,
@ -60,15 +62,10 @@ func NewAccountSizeName(in io.ReadCloser, size int64, name string) *Account {
max: int64(fs.Config.MaxTransfer), max: int64(fs.Config.MaxTransfer),
} }
go acc.averageLoop() go acc.averageLoop()
Stats.inProgress.set(acc.name, acc) stats.inProgress.set(acc.name, acc)
return acc return acc
} }
// NewAccount makes a Account reader for an object
func NewAccount(in io.ReadCloser, obj fs.Object) *Account {
return NewAccountSizeName(in, obj.Size(), obj.Remote())
}
// WithBuffer - If the file is above a certain size it adds an Async reader // WithBuffer - If the file is above a certain size it adds an Async reader
func (acc *Account) WithBuffer() *Account { func (acc *Account) WithBuffer() *Account {
acc.withBuf = true acc.withBuf = true
@ -157,7 +154,7 @@ func (acc *Account) averageLoop() {
// Check the read is valid // Check the read is valid
func (acc *Account) checkRead() (err error) { func (acc *Account) checkRead() (err error) {
acc.statmu.Lock() acc.statmu.Lock()
if acc.max >= 0 && Stats.GetBytes() >= acc.max { if acc.max >= 0 && acc.stats.GetBytes() >= acc.max {
acc.statmu.Unlock() acc.statmu.Unlock()
return ErrorMaxTransferLimitReached return ErrorMaxTransferLimitReached
} }
@ -177,7 +174,7 @@ func (acc *Account) accountRead(n int) {
acc.bytes += int64(n) acc.bytes += int64(n)
acc.statmu.Unlock() acc.statmu.Unlock()
Stats.Bytes(int64(n)) acc.stats.Bytes(int64(n))
limitBandwidth(n) limitBandwidth(n)
} }
@ -219,7 +216,7 @@ func (acc *Account) Close() error {
} }
acc.closed = true acc.closed = true
close(acc.exit) close(acc.exit)
Stats.inProgress.clear(acc.name) acc.stats.inProgress.clear(acc.name)
if acc.close == nil { if acc.close == nil {
return nil return nil
} }

View file

@ -12,7 +12,6 @@ import (
"github.com/ncw/rclone/fs" "github.com/ncw/rclone/fs"
"github.com/ncw/rclone/fs/asyncreader" "github.com/ncw/rclone/fs/asyncreader"
"github.com/ncw/rclone/fs/fserrors" "github.com/ncw/rclone/fs/fserrors"
"github.com/ncw/rclone/fstest/mockobject"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -27,36 +26,27 @@ var (
func TestNewAccountSizeName(t *testing.T) { func TestNewAccountSizeName(t *testing.T) {
in := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) in := ioutil.NopCloser(bytes.NewBuffer([]byte{1}))
acc := NewAccountSizeName(in, 1, "test") stats := NewStats()
acc := newAccountSizeName(stats, in, 1, "test")
assert.Equal(t, in, acc.in) assert.Equal(t, in, acc.in)
assert.Equal(t, acc, Stats.inProgress.get("test")) assert.Equal(t, acc, stats.inProgress.get("test"))
err := acc.Close() err := acc.Close()
assert.NoError(t, err) assert.NoError(t, err)
assert.Nil(t, Stats.inProgress.get("test")) assert.Nil(t, stats.inProgress.get("test"))
}
func TestNewAccount(t *testing.T) {
obj := mockobject.Object("test")
in := ioutil.NopCloser(bytes.NewBuffer([]byte{1}))
acc := NewAccount(in, obj)
assert.Equal(t, in, acc.in)
assert.Equal(t, acc, Stats.inProgress.get("test"))
err := acc.Close()
assert.NoError(t, err)
assert.Nil(t, Stats.inProgress.get("test"))
} }
func TestAccountWithBuffer(t *testing.T) { func TestAccountWithBuffer(t *testing.T) {
in := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) in := ioutil.NopCloser(bytes.NewBuffer([]byte{1}))
acc := NewAccountSizeName(in, -1, "test") stats := NewStats()
acc := newAccountSizeName(stats, in, -1, "test")
acc.WithBuffer() acc.WithBuffer()
// should have a buffer for an unknown size // should have a buffer for an unknown size
_, ok := acc.in.(*asyncreader.AsyncReader) _, ok := acc.in.(*asyncreader.AsyncReader)
require.True(t, ok) require.True(t, ok)
assert.NoError(t, acc.Close()) assert.NoError(t, acc.Close())
acc = NewAccountSizeName(in, 1, "test") acc = newAccountSizeName(stats, in, 1, "test")
acc.WithBuffer() acc.WithBuffer()
// should not have a buffer for a small size // should not have a buffer for a small size
_, ok = acc.in.(*asyncreader.AsyncReader) _, ok = acc.in.(*asyncreader.AsyncReader)
@ -66,7 +56,8 @@ func TestAccountWithBuffer(t *testing.T) {
func TestAccountGetUpdateReader(t *testing.T) { func TestAccountGetUpdateReader(t *testing.T) {
in := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) in := ioutil.NopCloser(bytes.NewBuffer([]byte{1}))
acc := NewAccountSizeName(in, 1, "test") stats := NewStats()
acc := newAccountSizeName(stats, in, 1, "test")
assert.Equal(t, in, acc.GetReader()) assert.Equal(t, in, acc.GetReader())
@ -80,12 +71,13 @@ func TestAccountGetUpdateReader(t *testing.T) {
func TestAccountRead(t *testing.T) { func TestAccountRead(t *testing.T) {
in := ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3})) in := ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3}))
acc := NewAccountSizeName(in, 1, "test") stats := NewStats()
acc := newAccountSizeName(stats, in, 1, "test")
assert.True(t, acc.start.IsZero()) assert.True(t, acc.start.IsZero())
assert.Equal(t, 0, acc.lpBytes) assert.Equal(t, 0, acc.lpBytes)
assert.Equal(t, int64(0), acc.bytes) assert.Equal(t, int64(0), acc.bytes)
assert.Equal(t, int64(0), Stats.bytes) assert.Equal(t, int64(0), stats.bytes)
var buf = make([]byte, 2) var buf = make([]byte, 2)
n, err := acc.Read(buf) n, err := acc.Read(buf)
@ -96,7 +88,7 @@ func TestAccountRead(t *testing.T) {
assert.False(t, acc.start.IsZero()) assert.False(t, acc.start.IsZero())
assert.Equal(t, 2, acc.lpBytes) assert.Equal(t, 2, acc.lpBytes)
assert.Equal(t, int64(2), acc.bytes) assert.Equal(t, int64(2), acc.bytes)
assert.Equal(t, int64(2), Stats.bytes) assert.Equal(t, int64(2), stats.bytes)
n, err = acc.Read(buf) n, err = acc.Read(buf)
assert.NoError(t, err) assert.NoError(t, err)
@ -112,7 +104,8 @@ func TestAccountRead(t *testing.T) {
func TestAccountString(t *testing.T) { func TestAccountString(t *testing.T) {
in := ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3})) in := ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3}))
acc := NewAccountSizeName(in, 3, "test") stats := NewStats()
acc := newAccountSizeName(stats, in, 3, "test")
// FIXME not an exhaustive test! // FIXME not an exhaustive test!
@ -131,7 +124,8 @@ func TestAccountString(t *testing.T) {
// Test the Accounter interface methods on Account and accountStream // Test the Accounter interface methods on Account and accountStream
func TestAccountAccounter(t *testing.T) { func TestAccountAccounter(t *testing.T) {
in := ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3})) in := ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3}))
acc := NewAccountSizeName(in, 3, "test") stats := NewStats()
acc := newAccountSizeName(stats, in, 3, "test")
assert.True(t, in == acc.OldStream()) assert.True(t, in == acc.OldStream())
@ -192,10 +186,10 @@ func TestAccountMaxTransfer(t *testing.T) {
defer func() { defer func() {
fs.Config.MaxTransfer = old fs.Config.MaxTransfer = old
}() }()
Stats.ResetCounters()
in := ioutil.NopCloser(bytes.NewBuffer(make([]byte, 100))) in := ioutil.NopCloser(bytes.NewBuffer(make([]byte, 100)))
acc := NewAccountSizeName(in, 1, "test") stats := NewStats()
acc := newAccountSizeName(stats, in, 1, "test")
var b = make([]byte, 10) var b = make([]byte, 10)

View file

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"sort"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -87,8 +88,8 @@ type StatsInfo struct {
renameQueue int renameQueue int
renameQueueSize int64 renameQueueSize int64
deletes int64 deletes int64
start time.Time
inProgress *inProgress inProgress *inProgress
startedTransfers []*Transfer
} }
// NewStats creates an initialised StatsInfo // NewStats creates an initialised StatsInfo
@ -96,7 +97,6 @@ func NewStats() *StatsInfo {
return &StatsInfo{ return &StatsInfo{
checking: newStringSet(fs.Config.Checkers, "checking"), checking: newStringSet(fs.Config.Checkers, "checking"),
transferring: newStringSet(fs.Config.Transfers, "transferring"), transferring: newStringSet(fs.Config.Transfers, "transferring"),
start: time.Now(),
inProgress: newInProgress(), inProgress: newInProgress(),
} }
} }
@ -105,7 +105,7 @@ func NewStats() *StatsInfo {
func (s *StatsInfo) RemoteStats(ctx context.Context, in rc.Params) (out rc.Params, err error) { func (s *StatsInfo) RemoteStats(ctx context.Context, in rc.Params) (out rc.Params, err error) {
out = make(rc.Params) out = make(rc.Params)
s.mu.RLock() s.mu.RLock()
dt := time.Now().Sub(s.start) dt := s.totalDuration()
dtSeconds := dt.Seconds() dtSeconds := dt.Seconds()
speed := 0.0 speed := 0.0
if dt > 0 { if dt > 0 {
@ -149,6 +149,52 @@ func (s *StatsInfo) RemoteStats(ctx context.Context, in rc.Params) (out rc.Param
return out, nil return out, nil
} }
type timeRange struct {
start time.Time
end time.Time
}
// Total duration is union of durations of all transfers belonging to this
// object.
// Needs to be protected by mutex.
func (s *StatsInfo) totalDuration() time.Duration {
now := time.Now()
// Extract time ranges of all transfers.
timeRanges := make([]timeRange, len(s.startedTransfers))
for i := range s.startedTransfers {
start, end := s.startedTransfers[i].TimeRange()
if end.IsZero() {
end = now
}
timeRanges[i] = timeRange{start, end}
}
// Sort by the starting time.
sort.Slice(timeRanges, func(i, j int) bool {
return timeRanges[i].start.Before(timeRanges[j].start)
})
// Merge overlaps and add distinctive ranges together for total.
var total time.Duration
var i, j = 0, 1
for i < len(timeRanges) {
if j < len(timeRanges)-1 {
if timeRanges[j].start.Before(timeRanges[i].end) {
if timeRanges[i].end.Before(timeRanges[j].end) {
timeRanges[i].end = timeRanges[j].end
}
j++
continue
}
}
total += timeRanges[i].end.Sub(timeRanges[i].start)
i = j
j++
}
return total
}
// eta returns the ETA of the current operation, // eta returns the ETA of the current operation,
// rounded to full seconds. // rounded to full seconds.
// If the ETA cannot be determined 'ok' returns false. // If the ETA cannot be determined 'ok' returns false.
@ -195,7 +241,7 @@ func (s *StatsInfo) String() string {
s.mu.RLock() s.mu.RLock()
dt := time.Now().Sub(s.start) dt := s.totalDuration()
dtSeconds := dt.Seconds() dtSeconds := dt.Seconds()
speed := 0.0 speed := 0.0
if dt > 0 { if dt > 0 {
@ -456,9 +502,16 @@ func (s *StatsInfo) GetTransfers() int64 {
return s.transfers return s.transfers
} }
// Transferring adds a transfer into the stats // NewTransfer adds a transfer to the stats from the object.
func (s *StatsInfo) Transferring(remote string) { func (s *StatsInfo) NewTransfer(obj fs.Object) *Transfer {
s.transferring.add(obj.Remote())
return newTransfer(s, obj)
}
// NewTransferRemoteSize adds a transfer to the stats based on remote and size.
func (s *StatsInfo) NewTransferRemoteSize(remote string, size int64) *Transfer {
s.transferring.add(remote) s.transferring.add(remote)
return newTransferRemoteSize(s, remote, size)
} }
// DoneTransferring removes a transfer from the stats // DoneTransferring removes a transfer from the stats
@ -496,3 +549,10 @@ func (s *StatsInfo) SetRenameQueue(n int, size int64) {
s.renameQueueSize = size s.renameQueueSize = size
s.mu.Unlock() s.mu.Unlock()
} }
// AddTransfer adds reference to the started transfer.
func (s *StatsInfo) AddTransfer(transfer *Transfer) {
s.mu.Lock()
s.startedTransfers = append(s.startedTransfers, transfer)
s.mu.Unlock()
}

View file

@ -128,3 +128,54 @@ func TestStatsError(t *testing.T) {
assert.False(t, s.HadRetryError()) assert.False(t, s.HadRetryError())
assert.Equal(t, time.Time{}, s.RetryAfter()) assert.Equal(t, time.Time{}, s.RetryAfter())
} }
func TestStatsTotalDuration(t *testing.T) {
time1 := time.Now().Add(-40 * time.Second)
time2 := time1.Add(10 * time.Second)
time3 := time2.Add(10 * time.Second)
time4 := time3.Add(10 * time.Second)
s := NewStats()
s.AddTransfer(&Transfer{
startedAt: time2,
completedAt: time3,
})
s.AddTransfer(&Transfer{
startedAt: time2,
completedAt: time2.Add(time.Second),
})
s.AddTransfer(&Transfer{
startedAt: time1,
completedAt: time3,
})
s.AddTransfer(&Transfer{
startedAt: time3,
completedAt: time4,
})
s.AddTransfer(&Transfer{
startedAt: time.Now(),
})
time.Sleep(time.Millisecond)
s.mu.Lock()
total := s.totalDuration()
s.mu.Unlock()
assert.True(t, 30*time.Second < total && total < 31*time.Second, total)
}
func TestStatsTotalDuration2(t *testing.T) {
time1 := time.Now().Add(-40 * time.Second)
time2 := time1.Add(10 * time.Second)
s := NewStats()
s.AddTransfer(&Transfer{
startedAt: time1,
completedAt: time2,
})
s.mu.Lock()
total := s.totalDuration()
s.mu.Unlock()
assert.Equal(t, 10*time.Second, total)
}

72
fs/accounting/transfer.go Normal file
View file

@ -0,0 +1,72 @@
package accounting
import (
"io"
"sync"
"time"
"github.com/ncw/rclone/fs"
)
// Transfer keeps track of initiated transfers and provides access to
// accounting functions.
// Transfer needs to be closed on completion.
type Transfer struct {
stats *StatsInfo
acc *Account
remote string
size int64
mu sync.Mutex
startedAt time.Time
completedAt time.Time
}
// newTransfer instantiates new transfer
func newTransfer(stats *StatsInfo, obj fs.Object) *Transfer {
return newTransferRemoteSize(stats, obj.Remote(), obj.Size())
}
func newTransferRemoteSize(stats *StatsInfo, remote string, size int64) *Transfer {
tr := &Transfer{
stats: stats,
remote: remote,
size: size,
startedAt: time.Now(),
}
stats.AddTransfer(tr)
return tr
}
// Done ends the transfer.
// Must be called after transfer is finished to run proper cleanups.
func (tr *Transfer) Done(err error) {
if err != nil {
tr.stats.Error(err)
}
if tr.acc != nil {
if err := tr.acc.Close(); err != nil {
fs.LogLevelPrintf(fs.Config.StatsLogLevel, nil, "can't close account: %+v\n", err)
}
}
tr.stats.DoneTransferring(tr.remote, err == nil)
tr.mu.Lock()
tr.completedAt = time.Now()
tr.mu.Unlock()
}
// Account returns reader that knows how to keep track of transfer progress.
func (tr *Transfer) Account(in io.ReadCloser) *Account {
if tr.acc != nil {
return tr.acc
}
return newAccountSizeName(tr.stats, in, tr.size, tr.remote)
}
// TimeRange returns the time transfer started and ended at. If not completed
// it will return zero time for end time.
func (tr *Transfer) TimeRange() (time.Time, time.Time) {
tr.mu.Lock()
defer tr.mu.Unlock()
return tr.startedAt, tr.completedAt
}

View file

@ -110,7 +110,7 @@ func (mc *multiThreadCopyState) calculateChunks() {
} }
// Copy src to (f, remote) using streams download threads and the OpenWriterAt feature // Copy src to (f, remote) using streams download threads and the OpenWriterAt feature
func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, streams int) (newDst fs.Object, err error) { func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, streams int, tr *accounting.Transfer) (newDst fs.Object, err error) {
openWriterAt := f.Features().OpenWriterAt openWriterAt := f.Features().OpenWriterAt
if openWriterAt == nil { if openWriterAt == nil {
return nil, errors.New("multi-thread copy: OpenWriterAt not supported") return nil, errors.New("multi-thread copy: OpenWriterAt not supported")
@ -132,8 +132,7 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object,
mc.calculateChunks() mc.calculateChunks()
// Make accounting // Make accounting
mc.acc = accounting.NewAccount(nil, src) mc.acc = tr.Account(nil)
defer fs.CheckClose(mc.acc, &err)
// create write file handle // create write file handle
mc.wc, err = openWriterAt(gCtx, remote, mc.size) mc.wc, err = openWriterAt(gCtx, remote, mc.size)

View file

@ -5,6 +5,8 @@ import (
"fmt" "fmt"
"testing" "testing"
"github.com/ncw/rclone/fs/accounting"
"github.com/ncw/rclone/fs" "github.com/ncw/rclone/fs"
"github.com/ncw/rclone/fstest" "github.com/ncw/rclone/fstest"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -49,6 +51,7 @@ func TestMultithreadCopy(t *testing.T) {
{size: multithreadChunkSize*2 + 1, streams: 2}, {size: multithreadChunkSize*2 + 1, streams: 2},
} { } {
t.Run(fmt.Sprintf("%+v", test), func(t *testing.T) { t.Run(fmt.Sprintf("%+v", test), func(t *testing.T) {
var err error
contents := fstest.RandomString(test.size) contents := fstest.RandomString(test.size)
t1 := fstest.Time("2001-02-03T04:05:06.499999999Z") t1 := fstest.Time("2001-02-03T04:05:06.499999999Z")
file1 := r.WriteObject(context.Background(), "file1", contents, t1) file1 := r.WriteObject(context.Background(), "file1", contents, t1)
@ -57,8 +60,13 @@ func TestMultithreadCopy(t *testing.T) {
src, err := r.Fremote.NewObject(context.Background(), "file1") src, err := r.Fremote.NewObject(context.Background(), "file1")
require.NoError(t, err) require.NoError(t, err)
accounting.Stats.ResetCounters()
tr := accounting.Stats.NewTransfer(src)
dst, err := multiThreadCopy(context.Background(), r.Flocal, "file1", src, 2) defer func() {
tr.Done(err)
}()
dst, err := multiThreadCopy(context.Background(), r.Flocal, "file1", src, 2, tr)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, src.Size(), dst.Size()) assert.Equal(t, src.Size(), dst.Size())
assert.Equal(t, "file1", dst.Remote()) assert.Equal(t, "file1", dst.Remote())

View file

@ -251,9 +251,9 @@ var _ fs.MimeTyper = (*overrideRemoteObject)(nil)
// It returns the destination object if possible. Note that this may // It returns the destination object if possible. Note that this may
// be nil. // be nil.
func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) { func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) {
accounting.Stats.Transferring(src.Remote()) tr := accounting.Stats.NewTransfer(src)
defer func() { defer func() {
accounting.Stats.DoneTransferring(src.Remote(), err == nil) tr.Done(err)
}() }()
newDst = dst newDst = dst
if fs.Config.DryRun { if fs.Config.DryRun {
@ -304,7 +304,7 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
if streams < 2 { if streams < 2 {
streams = 2 streams = 2
} }
dst, err = multiThreadCopy(ctx, f, remote, src, int(streams)) dst, err = multiThreadCopy(ctx, f, remote, src, int(streams), tr)
if doUpdate { if doUpdate {
actionTaken = "Multi-thread Copied (replaced existing)" actionTaken = "Multi-thread Copied (replaced existing)"
} else { } else {
@ -326,7 +326,7 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
dst, err = Rcat(ctx, f, remote, in0, src.ModTime(ctx)) dst, err = Rcat(ctx, f, remote, in0, src.ModTime(ctx))
newDst = dst newDst = dst
} else { } else {
in := accounting.NewAccount(in0, src).WithBuffer() // account and buffer the transfer in := tr.Account(in0).WithBuffer() // account and buffer the transfer
var wrappedSrc fs.ObjectInfo = src var wrappedSrc fs.ObjectInfo = src
// We try to pass the original object if possible // We try to pass the original object if possible
if src.Remote() != remote { if src.Remote() != remote {
@ -854,17 +854,25 @@ func CheckIdentical(ctx context.Context, dst, src fs.Object) (differ bool, err e
if err != nil { if err != nil {
return true, errors.Wrapf(err, "failed to open %q", dst) return true, errors.Wrapf(err, "failed to open %q", dst)
} }
in1 = accounting.NewAccount(in1, dst).WithBuffer() // account and buffer the transfer tr1 := accounting.Stats.NewTransfer(dst)
defer fs.CheckClose(in1, &err) defer func() {
tr1.Done(err)
}()
in1 = tr1.Account(in1).WithBuffer() // account and buffer the transfer
in2, err := src.Open(ctx) in2, err := src.Open(ctx)
if err != nil { if err != nil {
return true, errors.Wrapf(err, "failed to open %q", src) return true, errors.Wrapf(err, "failed to open %q", src)
} }
in2 = accounting.NewAccount(in2, src).WithBuffer() // account and buffer the transfer tr2 := accounting.Stats.NewTransfer(dst)
defer fs.CheckClose(in2, &err) defer func() {
tr2.Done(err)
}()
in2 = tr2.Account(in2).WithBuffer() // account and buffer the transfer
return CheckEqualReaders(in1, in2) // To assign err variable before defer.
differ, err = CheckEqualReaders(in1, in2)
return
} }
// CheckDownload checks the files in fsrc and fdst according to Size // CheckDownload checks the files in fsrc and fdst according to Size
@ -1159,9 +1167,9 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64) error {
var mu sync.Mutex var mu sync.Mutex
return ListFn(ctx, f, func(o fs.Object) { return ListFn(ctx, f, func(o fs.Object) {
var err error var err error
accounting.Stats.Transferring(o.Remote()) tr := accounting.Stats.NewTransfer(o)
defer func() { defer func() {
accounting.Stats.DoneTransferring(o.Remote(), err == nil) tr.Done(err)
}() }()
opt := fs.RangeOption{Start: offset, End: -1} opt := fs.RangeOption{Start: offset, End: -1}
size := o.Size() size := o.Size()
@ -1183,19 +1191,8 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64) error {
} }
if count >= 0 { if count >= 0 {
in = &readCloser{Reader: &io.LimitedReader{R: in, N: count}, Closer: in} in = &readCloser{Reader: &io.LimitedReader{R: in, N: count}, Closer: in}
// reduce remaining size to count
if size > count {
size = count
}
} }
in = accounting.NewAccountSizeName(in, size, o.Remote()).WithBuffer() // account and buffer the transfer in = tr.Account(in).WithBuffer() // account and buffer the transfer
defer func() {
err = in.Close()
if err != nil {
fs.CountError(err)
fs.Errorf(o, "Failed to close: %v", err)
}
}()
// take the lock just before we output stuff, so at the last possible moment // take the lock just before we output stuff, so at the last possible moment
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
@ -1209,14 +1206,11 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64) error {
// Rcat reads data from the Reader until EOF and uploads it to a file on remote // Rcat reads data from the Reader until EOF and uploads it to a file on remote
func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, modTime time.Time) (dst fs.Object, err error) { func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, modTime time.Time) (dst fs.Object, err error) {
accounting.Stats.Transferring(dstFileName) tr := accounting.Stats.NewTransferRemoteSize(dstFileName, -1)
in = accounting.NewAccountSizeName(in, -1, dstFileName).WithBuffer()
defer func() { defer func() {
accounting.Stats.DoneTransferring(dstFileName, err == nil) tr.Done(err)
if otherErr := in.Close(); otherErr != nil {
fs.Debugf(fdst, "Rcat: failed to close source: %v", err)
}
}() }()
in = tr.Account(in).WithBuffer()
hashOption := &fs.HashesOption{Hashes: fdst.Hashes()} hashOption := &fs.HashesOption{Hashes: fdst.Hashes()}
hash, err := hash.NewMultiHasherTypes(fdst.Hashes()) hash, err := hash.NewMultiHasherTypes(fdst.Hashes())
@ -1531,10 +1525,14 @@ func RcatSize(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadClo
var obj fs.Object var obj fs.Object
if size >= 0 { if size >= 0 {
var err error
// Size known use Put // Size known use Put
accounting.Stats.Transferring(dstFileName) tr := accounting.Stats.NewTransferRemoteSize(dstFileName, size)
body := ioutil.NopCloser(in) // we let the server close the body defer func() {
in := accounting.NewAccountSizeName(body, size, dstFileName) // account the transfer (no buffering) tr.Done(err)
}()
body := ioutil.NopCloser(in) // we let the server close the body
in := tr.Account(body) // account the transfer (no buffering)
if fs.Config.DryRun { if fs.Config.DryRun {
fs.Logf("stdin", "Not uploading as --dry-run") fs.Logf("stdin", "Not uploading as --dry-run")
@ -1543,15 +1541,6 @@ func RcatSize(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadClo
return nil, err return nil, err
} }
var err error
defer func() {
closeErr := in.Close()
if closeErr != nil {
accounting.Stats.Error(closeErr)
fs.Errorf(dstFileName, "Post request: close failed: %v", closeErr)
}
accounting.Stats.DoneTransferring(dstFileName, err == nil)
}()
info := object.NewStaticObjectInfo(dstFileName, modTime, size, true, nil, fdst) info := object.NewStaticObjectInfo(dstFileName, modTime, size, true, nil, fdst)
obj, err = fdst.Put(ctx, in, info) obj, err = fdst.Put(ctx, in, info)
if err != nil { if err != nil {
@ -1675,14 +1664,15 @@ func moveOrCopyFile(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, dstFileName str
} }
return errors.Wrap(err, "error while attempting to move file to a temporary location") return errors.Wrap(err, "error while attempting to move file to a temporary location")
} }
accounting.Stats.Transferring(srcFileName) tr := accounting.Stats.NewTransfer(srcObj)
defer func() {
tr.Done(err)
}()
tmpObj, err := Op(ctx, fdst, nil, tmpObjName, srcObj) tmpObj, err := Op(ctx, fdst, nil, tmpObjName, srcObj)
if err != nil { if err != nil {
accounting.Stats.DoneTransferring(srcFileName, false)
return errors.Wrap(err, "error while moving file to temporary location") return errors.Wrap(err, "error while moving file to temporary location")
} }
_, err = Op(ctx, fdst, nil, dstFileName, tmpObj) _, err = Op(ctx, fdst, nil, dstFileName, tmpObj)
accounting.Stats.DoneTransferring(srcFileName, err == nil)
return err return err
} }

View file

@ -16,6 +16,7 @@ import (
// ReadFileHandle is an open for read file handle on a File // ReadFileHandle is an open for read file handle on a File
type ReadFileHandle struct { type ReadFileHandle struct {
baseHandle baseHandle
done func(err error)
mu sync.Mutex mu sync.Mutex
closed bool // set if handle has been closed closed bool // set if handle has been closed
r *accounting.Account r *accounting.Account
@ -70,9 +71,11 @@ func (fh *ReadFileHandle) openPending() (err error) {
if err != nil { if err != nil {
return err return err
} }
fh.r = accounting.NewAccount(r, o).WithBuffer() // account the transfer tr := accounting.Stats.NewTransfer(o)
fh.done = tr.Done
fh.r = tr.Account(r).WithBuffer() // account the transfer
fh.opened = true fh.opened = true
accounting.Stats.Transferring(o.Remote())
return nil return nil
} }
@ -347,9 +350,12 @@ func (fh *ReadFileHandle) close() error {
fh.closed = true fh.closed = true
if fh.opened { if fh.opened {
accounting.Stats.DoneTransferring(fh.remote, true) var err error
defer func() {
fh.done(err)
}()
// Close first so that we have hashes // Close first so that we have hashes
err := fh.r.Close() err = fh.r.Close()
if err != nil { if err != nil {
return err return err
} }

View file

@ -87,9 +87,11 @@ func newRWFileHandle(d *Dir, f *File, remote string, flags int) (fh *RWFileHandl
// copy an object to or from the remote while accounting for it // copy an object to or from the remote while accounting for it
func copyObj(f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) { func copyObj(f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) {
if operations.NeedTransfer(context.TODO(), dst, src) { if operations.NeedTransfer(context.TODO(), dst, src) {
accounting.Stats.Transferring(src.Remote()) tr := accounting.Stats.NewTransfer(src)
defer func() {
tr.Done(err)
}()
newDst, err = operations.Copy(context.TODO(), f, dst, remote, src) newDst, err = operations.Copy(context.TODO(), f, dst, remote, src)
accounting.Stats.DoneTransferring(src.Remote(), err == nil)
} else { } else {
newDst = dst newDst = dst
} }