rc: add srcFs and dstFs to core/stats and core/transferred stats

Before this change it wasn't possible to see where transfers were
going from and to in core/stats and core/transferred.

When use in rclone mount in particular this made interpreting the
stats very hard.
This commit is contained in:
Nick Craig-Wood 2024-01-18 16:44:13 +00:00
parent 81a29e6895
commit 91b54aafcc
16 changed files with 137 additions and 40 deletions

View file

@ -339,7 +339,7 @@ func (d *driver) ListDir(sctx *ftp.Context, path string, callback func(iofs.File
}
// Account the transfer
tr := accounting.GlobalStats().NewTransferRemoteSize(path, node.Size())
tr := accounting.GlobalStats().NewTransferRemoteSize(path, node.Size(), d.f, nil)
defer func() {
tr.Done(d.ctx, err)
}()
@ -448,7 +448,7 @@ func (d *driver) GetFile(sctx *ftp.Context, path string, offset int64) (size int
}
// Account the transfer
tr := accounting.GlobalStats().NewTransferRemoteSize(path, node.Size())
tr := accounting.GlobalStats().NewTransferRemoteSize(path, node.Size(), d.f, nil)
defer tr.Done(d.ctx, nil)
return node.Size(), handle, nil

View file

@ -297,7 +297,7 @@ func (s *HTTP) serveFile(w http.ResponseWriter, r *http.Request, remote string)
}()
// Account the transfer
tr := accounting.Stats(r.Context()).NewTransfer(obj)
tr := accounting.Stats(r.Context()).NewTransfer(obj, nil)
defer tr.Done(r.Context(), nil)
// FIXME in = fs.NewAccount(in, obj).WithBuffer() // account the transfer

View file

@ -847,12 +847,15 @@ Returns the following values:
[
{
"bytes": total transferred bytes for this file,
"eta": estimated time in seconds until file transfer completion
"eta": estimated time in seconds until file transfer completion (may be nil)
"name": name of the file,
"percentage": progress of the file transfer in percent,
"speed": average speed over the whole transfer in bytes per second,
"speedAvg": current speed in bytes per second as an exponentially weighted moving average,
"size": size of the file in bytes
"group": stats group this transfer is part of
"srcFs": name of the source remote (not present if not known)
"dstFs": name of the destination remote (not present if not known)
}
],
"checking": an array of names of currently active file checks
@ -904,9 +907,12 @@ Returns the following values:
"size": size of the file in bytes,
"bytes": total transferred bytes for this file,
"checked": if the transfer is only checked (skipped, deleted),
"timestamp": integer representing millisecond unix epoch,
"started_at": time the transfer was started at (RFC3339 format, eg `"2000-01-01T01:00:00.085742121Z"`),
"completed_at": time the transfer was completed at (RFC3339 format, only present if transfer is completed),
"error": string description of the error (empty if successful),
"jobid": id of the job that this transfer belongs to
"group": string representing which stats group this is part of,
"srcFs": name of the source remote (not present if not known),
"dstFs": name of the destination remote (not present if not known),
}
]
}

View file

@ -539,9 +539,8 @@ func (acc *Account) String() string {
)
}
// rcStats produces remote control stats for this file
func (acc *Account) rcStats() (out rc.Params) {
out = make(rc.Params)
// rcStats adds remote control stats for this file
func (acc *Account) rcStats(out rc.Params) {
a, b := acc.progress()
out["bytes"] = a
out["size"] = b
@ -563,8 +562,6 @@ func (acc *Account) rcStats() (out rc.Params) {
}
out["percentage"] = percentageDone
out["group"] = acc.stats.group
return out
}
// OldStream returns the top io.Reader

View file

@ -775,16 +775,24 @@ func (s *StatsInfo) GetTransfers() int64 {
}
// NewTransfer adds a transfer to the stats from the object.
func (s *StatsInfo) NewTransfer(obj fs.DirEntry) *Transfer {
tr := newTransfer(s, obj)
//
// The obj is uses as the srcFs, the dstFs must be supplied
func (s *StatsInfo) NewTransfer(obj fs.DirEntry, dstFs fs.Fs) *Transfer {
var srcFs fs.Fs
if oi, ok := obj.(fs.ObjectInfo); ok {
if f, ok := oi.Fs().(fs.Fs); ok {
srcFs = f
}
}
tr := newTransfer(s, obj, srcFs, dstFs)
s.transferring.add(tr)
s.startAverageLoop()
return tr
}
// NewTransferRemoteSize adds a transfer to the stats based on remote and size.
func (s *StatsInfo) NewTransferRemoteSize(remote string, size int64) *Transfer {
tr := newTransferRemoteSize(s, remote, size, false, "")
func (s *StatsInfo) NewTransferRemoteSize(remote string, size int64, srcFs, dstFs fs.Fs) *Transfer {
tr := newTransferRemoteSize(s, remote, size, false, "", srcFs, dstFs)
s.transferring.add(tr)
s.startAverageLoop()
return tr

View file

@ -21,6 +21,8 @@ type TransferSnapshot struct {
CompletedAt time.Time `json:"completed_at,omitempty"`
Error error `json:"-"`
Group string `json:"group"`
SrcFs string `json:"srcFs,omitempty"`
DstFs string `json:"dstFs,omitempty"`
}
// MarshalJSON implements json.Marshaler interface.
@ -51,6 +53,8 @@ type Transfer struct {
startedAt time.Time
checking bool
what string // what kind of transfer this is
srcFs fs.Fs // source Fs - may be nil
dstFs fs.Fs // destination Fs - may be nil
// Protects all below
//
@ -65,15 +69,15 @@ type Transfer struct {
// newCheckingTransfer instantiates new checking of the object.
func newCheckingTransfer(stats *StatsInfo, obj fs.DirEntry, what string) *Transfer {
return newTransferRemoteSize(stats, obj.Remote(), obj.Size(), true, what)
return newTransferRemoteSize(stats, obj.Remote(), obj.Size(), true, what, nil, nil)
}
// newTransfer instantiates new transfer.
func newTransfer(stats *StatsInfo, obj fs.DirEntry) *Transfer {
return newTransferRemoteSize(stats, obj.Remote(), obj.Size(), false, "")
func newTransfer(stats *StatsInfo, obj fs.DirEntry, srcFs, dstFs fs.Fs) *Transfer {
return newTransferRemoteSize(stats, obj.Remote(), obj.Size(), false, "", srcFs, dstFs)
}
func newTransferRemoteSize(stats *StatsInfo, remote string, size int64, checking bool, what string) *Transfer {
func newTransferRemoteSize(stats *StatsInfo, remote string, size int64, checking bool, what string, srcFs, dstFs fs.Fs) *Transfer {
tr := &Transfer{
stats: stats,
remote: remote,
@ -81,6 +85,8 @@ func newTransferRemoteSize(stats *StatsInfo, remote string, size int64, checking
startedAt: time.Now(),
checking: checking,
what: what,
srcFs: srcFs,
dstFs: dstFs,
}
stats.AddTransfer(tr)
return tr
@ -178,7 +184,7 @@ func (tr *Transfer) Snapshot() TransferSnapshot {
if tr.acc != nil {
b, s = tr.acc.progress()
}
return TransferSnapshot{
snapshot := TransferSnapshot{
Name: tr.remote,
Checked: tr.checking,
Size: s,
@ -188,12 +194,26 @@ func (tr *Transfer) Snapshot() TransferSnapshot {
Error: tr.err,
Group: tr.stats.group,
}
if tr.srcFs != nil {
snapshot.SrcFs = fs.ConfigString(tr.srcFs)
}
if tr.dstFs != nil {
snapshot.DstFs = fs.ConfigString(tr.dstFs)
}
return snapshot
}
// rcStats returns stats for the transfer suitable for the rc
func (tr *Transfer) rcStats() rc.Params {
return rc.Params{
out := rc.Params{
"name": tr.remote, // no locking needed to access this
"size": tr.size,
}
if tr.srcFs != nil {
out["srcFs"] = fs.ConfigString(tr.srcFs)
}
if tr.dstFs != nil {
out["dstFs"] = fs.ConfigString(tr.dstFs)
}
return out
}

View file

@ -0,0 +1,66 @@
package accounting
import (
"context"
"errors"
"io"
"testing"
"github.com/rclone/rclone/fs/rc"
"github.com/rclone/rclone/fstest/mockfs"
"github.com/rclone/rclone/fstest/mockobject"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestTransfer(t *testing.T) {
ctx := context.Background()
s := NewStats(ctx)
o := mockobject.Object("obj")
srcFs, err := mockfs.NewFs(ctx, "srcFs", "srcFs", nil)
require.NoError(t, err)
dstFs, err := mockfs.NewFs(ctx, "dstFs", "dstFs", nil)
require.NoError(t, err)
tr := newTransfer(s, o, srcFs, dstFs)
t.Run("Snapshot", func(t *testing.T) {
snap := tr.Snapshot()
assert.Equal(t, "obj", snap.Name)
assert.Equal(t, int64(0), snap.Size)
assert.Equal(t, int64(0), snap.Bytes)
assert.Equal(t, false, snap.Checked)
assert.Equal(t, false, snap.StartedAt.IsZero())
assert.Equal(t, true, snap.CompletedAt.IsZero())
assert.Equal(t, nil, snap.Error)
assert.Equal(t, "", snap.Group)
assert.Equal(t, "srcFs:srcFs", snap.SrcFs)
assert.Equal(t, "dstFs:dstFs", snap.DstFs)
})
t.Run("Done", func(t *testing.T) {
tr.Done(ctx, io.EOF)
snap := tr.Snapshot()
assert.Equal(t, "obj", snap.Name)
assert.Equal(t, int64(0), snap.Size)
assert.Equal(t, int64(0), snap.Bytes)
assert.Equal(t, false, snap.Checked)
assert.Equal(t, false, snap.StartedAt.IsZero())
assert.Equal(t, false, snap.CompletedAt.IsZero())
assert.Equal(t, true, errors.Is(snap.Error, io.EOF))
assert.Equal(t, "", snap.Group)
assert.Equal(t, "srcFs:srcFs", snap.SrcFs)
assert.Equal(t, "dstFs:dstFs", snap.DstFs)
})
t.Run("rcStats", func(t *testing.T) {
out := tr.rcStats()
assert.Equal(t, rc.Params{
"name": "obj",
"size": int64(0),
"srcFs": "srcFs:srcFs",
"dstFs": "dstFs:dstFs",
}, out)
})
}

View file

@ -159,11 +159,11 @@ func (tm *transferMap) rcStats(progress *inProgress) (t []rc.Params) {
tm.mu.RLock()
defer tm.mu.RUnlock()
for _, tr := range tm._sortedSlice() {
out := tr.rcStats() // basic stats
if acc := progress.get(tr.remote); acc != nil {
t = append(t, acc.rcStats())
} else {
t = append(t, tr.rcStats())
acc.rcStats(out) // add extended stats if have acc
}
t = append(t, out)
}
return t
}

View file

@ -341,7 +341,7 @@ func checkIdenticalDownload(ctx context.Context, dst, src fs.Object) (differ boo
if err != nil {
return true, fmt.Errorf("failed to open %q: %w", dst, err)
}
tr1 := accounting.Stats(ctx).NewTransfer(dst)
tr1 := accounting.Stats(ctx).NewTransfer(dst, nil)
defer func() {
tr1.Done(ctx, nil) // error handling is done by the caller
}()
@ -351,7 +351,7 @@ func checkIdenticalDownload(ctx context.Context, dst, src fs.Object) (differ boo
if err != nil {
return true, fmt.Errorf("failed to open %q: %w", src, err)
}
tr2 := accounting.Stats(ctx).NewTransfer(dst)
tr2 := accounting.Stats(ctx).NewTransfer(dst, nil)
defer func() {
tr2.Done(ctx, nil) // error handling is done by the caller
}()
@ -501,7 +501,7 @@ func (c *checkMarch) checkSum(ctx context.Context, obj fs.Object, download bool,
if in, err = Open(ctx, obj); err != nil {
return
}
tr := accounting.Stats(ctx).NewTransfer(obj)
tr := accounting.Stats(ctx).NewTransfer(obj, nil)
in = tr.Account(ctx, in).WithBuffer() // account and buffer the transfer
defer func() {
tr.Done(ctx, nil) // will close the stream

View file

@ -369,7 +369,7 @@ func (c *copy) copy(ctx context.Context) (newDst fs.Object, err error) {
// be nil.
func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) {
ci := fs.GetConfig(ctx)
tr := accounting.Stats(ctx).NewTransfer(src)
tr := accounting.Stats(ctx).NewTransfer(src, f)
defer func() {
tr.Done(ctx, err)
}()

View file

@ -183,7 +183,7 @@ func TestMultithreadCopy(t *testing.T) {
require.NoError(t, err)
accounting.GlobalStats().ResetCounters()
tr := accounting.GlobalStats().NewTransfer(src)
tr := accounting.GlobalStats().NewTransfer(src, nil)
defer func() {
tr.Done(ctx, err)
@ -284,7 +284,7 @@ func TestMultithreadCopyAbort(t *testing.T) {
src, err := r.Flocal.NewObject(ctx, fileName)
require.NoError(t, err)
accounting.GlobalStats().ResetCounters()
tr := accounting.GlobalStats().NewTransfer(src)
tr := accounting.GlobalStats().NewTransfer(src, nil)
defer func() {
tr.Done(ctx, err)

View file

@ -382,7 +382,7 @@ func move(ctx context.Context, fdst fs.Fs, dst fs.Object, remote string, src fs.
ci := fs.GetConfig(ctx)
var tr *accounting.Transfer
if isTransfer {
tr = accounting.Stats(ctx).NewTransfer(src)
tr = accounting.Stats(ctx).NewTransfer(src, fdst)
} else {
tr = accounting.Stats(ctx).NewCheckingTransfer(src, "moving")
}
@ -814,7 +814,7 @@ func HashSum(ctx context.Context, ht hash.Type, base64Encoded bool, downloadFlag
// Setup: Define accounting, open the file with NewReOpen to provide restarts, account for the transfer, and setup a multi-hasher with the appropriate type
// Execution: io.Copy file to hasher, get hash and encode in hex
tr := accounting.Stats(ctx).NewTransfer(o)
tr := accounting.Stats(ctx).NewTransfer(o, nil)
defer func() {
tr.Done(ctx, err)
}()
@ -1106,7 +1106,7 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64, sep []b
ci := fs.GetConfig(ctx)
return ListFn(ctx, f, func(o fs.Object) {
var err error
tr := accounting.Stats(ctx).NewTransfer(o)
tr := accounting.Stats(ctx).NewTransfer(o, nil)
defer func() {
tr.Done(ctx, err)
}()
@ -1157,7 +1157,7 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64, sep []b
// Rcat reads data from the Reader until EOF and uploads it to a file on remote
func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, modTime time.Time, meta fs.Metadata) (dst fs.Object, err error) {
ci := fs.GetConfig(ctx)
tr := accounting.Stats(ctx).NewTransferRemoteSize(dstFileName, -1)
tr := accounting.Stats(ctx).NewTransferRemoteSize(dstFileName, -1, nil, fdst)
defer func() {
tr.Done(ctx, err)
}()
@ -1603,7 +1603,7 @@ func RcatSize(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadClo
if size >= 0 {
var err error
// Size known use Put
tr := accounting.Stats(ctx).NewTransferRemoteSize(dstFileName, size)
tr := accounting.Stats(ctx).NewTransferRemoteSize(dstFileName, size, nil, fdst)
defer func() {
tr.Done(ctx, err)
}()
@ -1807,7 +1807,7 @@ func moveOrCopyFile(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, dstFileName str
logger(ctx, TransferError, nil, tmpObjFail, err)
return fmt.Errorf("error while attempting to move file to a temporary location: %w", err)
}
tr := accounting.Stats(ctx).NewTransfer(srcObj)
tr := accounting.Stats(ctx).NewTransfer(srcObj, fdst)
defer func() {
tr.Done(ctx, err)
}()

View file

@ -224,7 +224,7 @@ const (
// Serve serves a directory
func (d *Directory) Serve(w http.ResponseWriter, r *http.Request) {
// Account the transfer
tr := accounting.Stats(r.Context()).NewTransferRemoteSize(d.DirRemote, -1)
tr := accounting.Stats(r.Context()).NewTransferRemoteSize(d.DirRemote, -1, nil, nil)
defer tr.Done(r.Context(), nil)
fs.Infof(d.DirRemote, "%s: Serving directory", r.RemoteAddr)

View file

@ -79,7 +79,7 @@ func Object(w http.ResponseWriter, r *http.Request, o fs.Object) {
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
return
}
tr := accounting.Stats(r.Context()).NewTransfer(o)
tr := accounting.Stats(r.Context()).NewTransfer(o, nil)
defer func() {
tr.Done(r.Context(), err)
}()

View file

@ -79,7 +79,7 @@ func (fh *ReadFileHandle) openPending() (err error) {
if err != nil {
return err
}
tr := accounting.GlobalStats().NewTransfer(o)
tr := accounting.GlobalStats().NewTransfer(o, nil)
fh.done = tr.Done
fh.r = tr.Account(context.TODO(), r).WithBuffer() // account the transfer
fh.opened = true

View file

@ -518,7 +518,7 @@ loop:
// should be called on a fresh downloader
func (dl *downloader) open(offset int64) (err error) {
// defer log.Trace(dl.dls.src, "offset=%d", offset)("err=%v", &err)
dl.tr = accounting.Stats(dl.dls.ctx).NewTransfer(dl.dls.src)
dl.tr = accounting.Stats(dl.dls.ctx).NewTransfer(dl.dls.src, nil)
size := dl.dls.src.Size()
if size < 0 {