accounting: add reference to completed transfers

Add core/transferred call that lists completed transfers and their
status.
This commit is contained in:
Aleksandar Jankovic 2019-07-22 21:11:46 +02:00 committed by Nick Craig-Wood
parent 8243ff8bc8
commit 53a1a0e3ef
7 changed files with 218 additions and 55 deletions

View file

@ -548,12 +548,12 @@ func (f *Fs) newObjectWithInfo(ctx context.Context, remote string, info *api.Fil
if info != nil {
err := o.decodeMetaData(info)
if err != nil {
return nil, err
return o, err
}
} else {
err := o.readMetaData(ctx) // reads info and headers, returning an error
if err != nil {
return nil, err
return o, err
}
}
return o, nil
@ -1084,16 +1084,25 @@ func (f *Fs) purge(ctx context.Context, oldOnly bool) error {
go func() {
defer wg.Done()
for object := range toBeDeleted {
accounting.Stats(ctx).Checking(object.Name)
checkErr(f.deleteByID(object.ID, object.Name))
accounting.Stats(ctx).DoneChecking(object.Name)
oi, err := f.newObjectWithInfo(ctx, object.Name, object)
if err != nil {
fs.Errorf(object, "Can't create object %+v", err)
}
tr := accounting.Stats(ctx).NewCheckingTransfer(oi)
err = f.deleteByID(object.ID, object.Name)
checkErr(err)
tr.Done(err)
}
}()
}
last := ""
checkErr(f.list(ctx, "", true, "", 0, true, func(remote string, object *api.File, isDirectory bool) error {
if !isDirectory {
accounting.Stats(ctx).Checking(remote)
oi, err := f.newObjectWithInfo(ctx, object.Name, object)
if err != nil {
fs.Errorf(object, "Can't create object %+v", err)
}
tr := accounting.Stats(ctx).NewCheckingTransfer(oi)
if oldOnly && last != remote {
if object.Action == "hide" {
fs.Debugf(remote, "Deleting current version (id %q) as it is a hide marker", object.ID)
@ -1109,7 +1118,7 @@ func (f *Fs) purge(ctx context.Context, oldOnly bool) error {
toBeDeleted <- object
}
last = remote
accounting.Stats(ctx).DoneChecking(remote)
tr.Done(nil)
}
return nil
}))

View file

@ -280,6 +280,20 @@ Elapsed time: %10v
return buf.String()
}
// Transferred returns list of all completed transfers including checked and
// failed ones.
func (s *StatsInfo) Transferred() []TransferSnapshot {
ts := make([]TransferSnapshot, 0, len(s.startedTransfers))
for _, tr := range s.startedTransfers {
if tr.IsDone() {
ts = append(ts, tr.Snapshot())
}
}
return ts
}
// Log outputs the StatsInfo to the log
func (s *StatsInfo) Log() {
fs.LogLevelPrintf(fs.Config.StatsLogLevel, nil, "%v\n", s)
@ -376,6 +390,7 @@ func (s *StatsInfo) ResetCounters() {
s.checks = 0
s.transfers = 0
s.deletes = 0
s.startedTransfers = nil
}
// ResetErrors sets the errors count to 0 and resets lastError, fatalError and retryError
@ -427,9 +442,10 @@ func (s *StatsInfo) RetryAfter() time.Time {
return s.retryAfter
}
// Checking adds a check into the stats
func (s *StatsInfo) Checking(remote string) {
s.checking.add(remote)
// NewCheckingTransfer adds a checking transfer to the stats, from the object.
func (s *StatsInfo) NewCheckingTransfer(obj fs.Object) *Transfer {
s.checking.add(obj.Remote())
return newCheckingTransfer(s, obj)
}
// DoneChecking removes a check from the stats
@ -456,7 +472,7 @@ func (s *StatsInfo) NewTransfer(obj fs.Object) *Transfer {
// NewTransferRemoteSize adds a transfer to the stats based on remote and size.
func (s *StatsInfo) NewTransferRemoteSize(remote string, size int64) *Transfer {
s.transferring.add(remote)
return newTransferRemoteSize(s, remote, size)
return newTransferRemoteSize(s, remote, size, false)
}
// DoneTransferring removes a transfer from the stats

View file

@ -26,6 +26,23 @@ func remoteStats(ctx context.Context, in rc.Params) (rc.Params, error) {
return groups.sum().RemoteStats()
}
func transferredStats(ctx context.Context, in rc.Params) (rc.Params, error) {
// Check to see if we should filter by group.
group, err := in.GetString("group")
if rc.NotErrParamNotFound(err) {
return rc.Params{}, err
}
out := make(rc.Params)
if group != "" {
out["transferred"] = StatsGroup(group).Transferred()
} else {
out["transferred"] = groups.sum().Transferred()
}
return out, nil
}
func init() {
// Init stats container
groups = newStatsGroups()
@ -74,6 +91,40 @@ Returns the following values:
` + "```" + `
Values for "transferring", "checking" and "lastError" are only assigned if data is available.
The value for "eta" is null if an eta cannot be determined.
`,
})
rc.Add(rc.Call{
Path: "core/transferred",
Fn: transferredStats,
Title: "Returns stats about completed transfers.",
Help: `
This returns stats about completed transfers:
rclone rc core/transferred
If group is not provided then completed transfers for all groups will be
returned.
Parameters
- group - name of the stats group (string)
Returns the following values:
` + "```" + `
{
"transferred": an array of completed transfers (including failed ones):
[
{
"name": name of the file,
"size": size of the file in bytes,
"bytes": total transferred bytes for this file,
"checked": if the transfer is only checked (skipped, deleted),
"timestamp": integer representing millisecond unix epoch,
"error": string description of the error (empty if successfull),
"jobid": id of the job that this transfer belongs to
}
]
}
`,
})
}
@ -184,6 +235,7 @@ func (sg *statsGroups) sum() *StatsInfo {
if sum.lastError == nil && stats.lastError != nil {
sum.lastError = stats.lastError
}
sum.startedTransfers = append(sum.startedTransfers, stats.startedTransfers...)
}
return sum
}

View file

@ -1,6 +1,7 @@
package accounting
import (
"encoding/json"
"io"
"sync"
"time"
@ -8,31 +9,67 @@ import (
"github.com/ncw/rclone/fs"
)
// TransferSnapshot represents state of an account at point in time.
type TransferSnapshot struct {
Name string `json:"name"`
Size int64 `json:"size"`
Bytes int64 `json:"bytes"`
Checked bool `json:"checked"`
StartedAt time.Time `json:"started_at"`
CompletedAt time.Time `json:"completed_at,omitempty"`
Error error `json:"-"`
}
// MarshalJSON implements json.Marshaler interface.
func (as TransferSnapshot) MarshalJSON() ([]byte, error) {
err := ""
if as.Error != nil {
err = as.Error.Error()
}
type Alias TransferSnapshot
return json.Marshal(&struct {
Error string `json:"error"`
Alias
}{
Error: err,
Alias: (Alias)(as),
})
}
// Transfer keeps track of initiated transfers and provides access to
// accounting functions.
// Transfer needs to be closed on completion.
type Transfer struct {
stats *StatsInfo
acc *Account
remote string
size int64
stats *StatsInfo
remote string
size int64
checking bool
// Protects all bellow
mu sync.Mutex
acc *Account
err error
startedAt time.Time
completedAt time.Time
}
// newTransfer instantiates new transfer
func newTransfer(stats *StatsInfo, obj fs.Object) *Transfer {
return newTransferRemoteSize(stats, obj.Remote(), obj.Size())
// newCheckingTransfer instantiates new checking of the object.
func newCheckingTransfer(stats *StatsInfo, obj fs.Object) *Transfer {
return newTransferRemoteSize(stats, obj.Remote(), obj.Size(), true)
}
func newTransferRemoteSize(stats *StatsInfo, remote string, size int64) *Transfer {
// newTransfer instantiates new transfer.
func newTransfer(stats *StatsInfo, obj fs.Object) *Transfer {
return newTransferRemoteSize(stats, obj.Remote(), obj.Size(), false)
}
func newTransferRemoteSize(stats *StatsInfo, remote string, size int64, checking bool) *Transfer {
tr := &Transfer{
stats: stats,
remote: remote,
size: size,
startedAt: time.Now(),
checking: checking,
}
stats.AddTransfer(tr)
return tr
@ -41,26 +78,37 @@ func newTransferRemoteSize(stats *StatsInfo, remote string, size int64) *Transfe
// Done ends the transfer.
// Must be called after transfer is finished to run proper cleanups.
func (tr *Transfer) Done(err error) {
tr.mu.Lock()
defer tr.mu.Unlock()
if err != nil {
tr.stats.Error(err)
tr.err = err
}
if tr.acc != nil {
if err := tr.acc.Close(); err != nil {
fs.LogLevelPrintf(fs.Config.StatsLogLevel, nil, "can't close account: %+v\n", err)
}
}
tr.stats.DoneTransferring(tr.remote, err == nil)
tr.mu.Lock()
if tr.checking {
tr.stats.DoneChecking(tr.remote)
} else {
tr.stats.DoneTransferring(tr.remote, err == nil)
}
tr.completedAt = time.Now()
tr.mu.Unlock()
}
// Account returns reader that knows how to keep track of transfer progress.
func (tr *Transfer) Account(in io.ReadCloser) *Account {
if tr.acc != nil {
return tr.acc
tr.mu.Lock()
defer tr.mu.Unlock()
if tr.acc == nil {
tr.acc = newAccountSizeName(tr.stats, in, tr.size, tr.remote)
}
return newAccountSizeName(tr.stats, in, tr.size, tr.remote)
return tr.acc
}
// TimeRange returns the time transfer started and ended at. If not completed
@ -70,3 +118,30 @@ func (tr *Transfer) TimeRange() (time.Time, time.Time) {
defer tr.mu.Unlock()
return tr.startedAt, tr.completedAt
}
// IsDone returns true if transfer is completed.
func (tr *Transfer) IsDone() bool {
tr.mu.Lock()
defer tr.mu.Unlock()
return !tr.completedAt.IsZero()
}
// Snapshot produces stats for this account at point in time.
func (tr *Transfer) Snapshot() TransferSnapshot {
tr.mu.Lock()
defer tr.mu.Unlock()
var s, b int64 = tr.size, 0
if tr.acc != nil {
b, s = tr.acc.progress()
}
return TransferSnapshot{
Name: tr.remote,
Checked: tr.checking,
Size: s,
Bytes: b,
StartedAt: tr.startedAt,
CompletedAt: tr.completedAt,
Error: tr.err,
}
}

View file

@ -176,7 +176,7 @@ func equal(ctx context.Context, src fs.ObjectInfo, dst fs.Object, sizeOnly, chec
// Size and hash the same but mtime different
// Error if objects are treated as immutable
if fs.Config.Immutable {
fs.Errorf(dst, "Timestamp mismatch between immutable objects")
fs.Errorf(dst, "StartedAt mismatch between immutable objects")
return false
}
// Update the mtime of the dst object here
@ -428,9 +428,9 @@ func SameObject(src, dst fs.Object) bool {
// It returns the destination object if possible. Note that this may
// be nil.
func Move(ctx context.Context, fdst fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) {
accounting.Stats(ctx).Checking(src.Remote())
tr := accounting.Stats(ctx).NewCheckingTransfer(src)
defer func() {
accounting.Stats(ctx).DoneChecking(src.Remote())
tr.Done(err)
}()
newDst = dst
if fs.Config.DryRun {
@ -501,7 +501,10 @@ func SuffixName(remote string) string {
// If backupDir is set then it moves the file to there instead of
// deleting
func DeleteFileWithBackupDir(ctx context.Context, dst fs.Object, backupDir fs.Fs) (err error) {
accounting.Stats(ctx).Checking(dst.Remote())
tr := accounting.Stats(ctx).NewCheckingTransfer(dst)
defer func() {
tr.Done(err)
}()
numDeletes := accounting.Stats(ctx).Deletes(1)
if fs.Config.MaxDelete != -1 && numDeletes > fs.Config.MaxDelete {
return fserrors.FatalError(errors.New("--max-delete threshold reached"))
@ -523,7 +526,6 @@ func DeleteFileWithBackupDir(ctx context.Context, dst fs.Object, backupDir fs.Fs
} else if !fs.Config.DryRun {
fs.Infof(dst, actioned)
}
accounting.Stats(ctx).DoneChecking(dst.Remote())
return err
}
@ -709,10 +711,13 @@ func (c *checkMarch) SrcOnly(src fs.DirEntry) (recurse bool) {
// check to see if two objects are identical using the check function
func (c *checkMarch) checkIdentical(ctx context.Context, dst, src fs.Object) (differ bool, noHash bool) {
accounting.Stats(ctx).Checking(src.Remote())
defer accounting.Stats(ctx).DoneChecking(src.Remote())
var err error
tr := accounting.Stats(ctx).NewCheckingTransfer(src)
defer func() {
tr.Done(err)
}()
if sizeDiffers(src, dst) {
err := errors.Errorf("Sizes differ")
err = errors.Errorf("Sizes differ")
fs.Errorf(src, "%v", err)
fs.CountError(err)
return true, false
@ -930,9 +935,11 @@ func List(ctx context.Context, f fs.Fs, w io.Writer) error {
// Lists in parallel which may get them out of order
func ListLong(ctx context.Context, f fs.Fs, w io.Writer) error {
return ListFn(ctx, f, func(o fs.Object) {
accounting.Stats(ctx).Checking(o.Remote())
tr := accounting.Stats(ctx).NewCheckingTransfer(o)
defer func() {
tr.Done(nil)
}()
modTime := o.ModTime(ctx)
accounting.Stats(ctx).DoneChecking(o.Remote())
syncFprintf(w, "%9d %s %s\n", o.Size(), modTime.Local().Format("2006-01-02 15:04:05.000000000"), o.Remote())
})
}
@ -968,9 +975,12 @@ func DropboxHashSum(ctx context.Context, f fs.Fs, w io.Writer) error {
// hashSum returns the human readable hash for ht passed in. This may
// be UNSUPPORTED or ERROR.
func hashSum(ctx context.Context, ht hash.Type, o fs.Object) string {
accounting.Stats(ctx).Checking(o.Remote())
var err error
tr := accounting.Stats(ctx).NewCheckingTransfer(o)
defer func() {
tr.Done(err)
}()
sum, err := o.Hash(ctx, ht)
accounting.Stats(ctx).DoneChecking(o.Remote())
if err == hash.ErrUnsupported {
sum = "UNSUPPORTED"
} else if err != nil {
@ -1711,11 +1721,11 @@ func moveOrCopyFile(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, dstFileName str
_, err = Op(ctx, fdst, dstObj, dstFileName, srcObj)
} else {
accounting.Stats(ctx).Checking(srcFileName)
tr := accounting.Stats(ctx).NewCheckingTransfer(srcObj)
if !cp {
err = DeleteFile(ctx, srcObj)
}
defer accounting.Stats(ctx).DoneChecking(srcFileName)
tr.Done(err)
}
return err
}

View file

@ -215,7 +215,8 @@ func (s *syncCopyMove) pairChecker(in *pipe, out *pipe, wg *sync.WaitGroup) {
return
}
src := pair.Src
accounting.Stats(s.ctx).Checking(src.Remote())
var err error
tr := accounting.Stats(s.ctx).NewCheckingTransfer(src)
// Check to see if can store this
if src.Storable() {
NoNeedTransfer, err := operations.CompareOrCopyDest(s.ctx, s.fdst, pair.Dst, pair.Src, s.compareCopyDest, s.backupDir)
@ -256,7 +257,7 @@ func (s *syncCopyMove) pairChecker(in *pipe, out *pipe, wg *sync.WaitGroup) {
}
}
}
accounting.Stats(s.ctx).DoneChecking(src.Remote())
tr.Done(err)
}
}
@ -587,12 +588,12 @@ func (s *syncCopyMove) makeRenameMap() {
for obj := range in {
// only create hash for dst fs.Object if its size could match
if _, found := possibleSizes[obj.Size()]; found {
accounting.Stats(s.ctx).Checking(obj.Remote())
tr := accounting.Stats(s.ctx).NewCheckingTransfer(obj)
hash := s.renameHash(obj)
if hash != "" {
s.pushRenameMap(hash, obj)
}
accounting.Stats(s.ctx).DoneChecking(obj.Remote())
tr.Done(nil)
}
}
}()

View file

@ -1233,7 +1233,7 @@ func TestSyncCompareDest(t *testing.T) {
file1 := r.WriteFile("one", "one", t1)
fstest.CheckItems(t, r.Flocal, file1)
accounting.Stats.ResetCounters()
accounting.GlobalStats().ResetCounters()
err = Sync(context.Background(), fdst, r.Flocal, false)
require.NoError(t, err)
@ -1247,7 +1247,7 @@ func TestSyncCompareDest(t *testing.T) {
fstest.CheckItems(t, r.Fremote, file1dst)
fstest.CheckItems(t, r.Flocal, file1b)
accounting.Stats.ResetCounters()
accounting.GlobalStats().ResetCounters()
err = Sync(context.Background(), fdst, r.Flocal, false)
require.NoError(t, err)
@ -1263,7 +1263,7 @@ func TestSyncCompareDest(t *testing.T) {
fstest.CheckItems(t, r.Fremote, file2, file3)
fstest.CheckItems(t, r.Flocal, file1c)
accounting.Stats.ResetCounters()
accounting.GlobalStats().ResetCounters()
err = Sync(context.Background(), fdst, r.Flocal, false)
require.NoError(t, err)
@ -1275,14 +1275,14 @@ func TestSyncCompareDest(t *testing.T) {
fstest.CheckItems(t, r.Fremote, file2, file3, file4)
fstest.CheckItems(t, r.Flocal, file1c, file5)
accounting.Stats.ResetCounters()
accounting.GlobalStats().ResetCounters()
err = Sync(context.Background(), fdst, r.Flocal, false)
require.NoError(t, err)
fstest.CheckItems(t, r.Fremote, file2, file3, file4)
// check new dest, new compare
accounting.Stats.ResetCounters()
accounting.GlobalStats().ResetCounters()
err = Sync(context.Background(), fdst, r.Flocal, false)
require.NoError(t, err)
@ -1293,7 +1293,7 @@ func TestSyncCompareDest(t *testing.T) {
fstest.CheckItems(t, r.Fremote, file2, file3, file4)
fstest.CheckItems(t, r.Flocal, file1c, file5b)
accounting.Stats.ResetCounters()
accounting.GlobalStats().ResetCounters()
err = Sync(context.Background(), fdst, r.Flocal, false)
require.NoError(t, err)
@ -1324,7 +1324,7 @@ func TestSyncCopyDest(t *testing.T) {
file1 := r.WriteFile("one", "one", t1)
fstest.CheckItems(t, r.Flocal, file1)
accounting.Stats.ResetCounters()
accounting.GlobalStats().ResetCounters()
err = Sync(context.Background(), fdst, r.Flocal, false)
require.NoError(t, err)
@ -1338,7 +1338,7 @@ func TestSyncCopyDest(t *testing.T) {
fstest.CheckItems(t, r.Fremote, file1dst)
fstest.CheckItems(t, r.Flocal, file1b)
accounting.Stats.ResetCounters()
accounting.GlobalStats().ResetCounters()
err = Sync(context.Background(), fdst, r.Flocal, false)
require.NoError(t, err)
@ -1357,7 +1357,7 @@ func TestSyncCopyDest(t *testing.T) {
fstest.CheckItems(t, r.Fremote, file2, file3)
fstest.CheckItems(t, r.Flocal, file1c)
accounting.Stats.ResetCounters()
accounting.GlobalStats().ResetCounters()
err = Sync(context.Background(), fdst, r.Flocal, false)
require.NoError(t, err)
@ -1374,7 +1374,7 @@ func TestSyncCopyDest(t *testing.T) {
fstest.CheckItems(t, r.Fremote, file2, file2dst, file3, file4)
fstest.CheckItems(t, r.Flocal, file1c, file5)
accounting.Stats.ResetCounters()
accounting.GlobalStats().ResetCounters()
err = Sync(context.Background(), fdst, r.Flocal, false)
require.NoError(t, err)
@ -1384,7 +1384,7 @@ func TestSyncCopyDest(t *testing.T) {
fstest.CheckItems(t, r.Fremote, file2, file2dst, file3, file4, file4dst)
// check new dest, new copy
accounting.Stats.ResetCounters()
accounting.GlobalStats().ResetCounters()
err = Sync(context.Background(), fdst, r.Flocal, false)
require.NoError(t, err)
@ -1396,7 +1396,7 @@ func TestSyncCopyDest(t *testing.T) {
fstest.CheckItems(t, r.Fremote, file2, file2dst, file3, file4, file4dst, file6)
fstest.CheckItems(t, r.Flocal, file1c, file5, file7)
accounting.Stats.ResetCounters()
accounting.GlobalStats().ResetCounters()
err = Sync(context.Background(), fdst, r.Flocal, false)
require.NoError(t, err)