rclone/fs/accounting/transfer.go
Nick Craig-Wood 91b54aafcc rc: add srcFs and dstFs to core/stats and core/transferred stats
Before this change it wasn't possible to see where transfers were
going from and to in core/stats and core/transferred.

When use in rclone mount in particular this made interpreting the
stats very hard.
2024-02-02 11:43:10 +00:00

219 lines
5.4 KiB
Go

package accounting
import (
"context"
"encoding/json"
"io"
"sync"
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/rc"
)
// TransferSnapshot represents state of an account at point in time.
type TransferSnapshot struct {
Name string `json:"name"`
Size int64 `json:"size"`
Bytes int64 `json:"bytes"`
Checked bool `json:"checked"`
StartedAt time.Time `json:"started_at"`
CompletedAt time.Time `json:"completed_at,omitempty"`
Error error `json:"-"`
Group string `json:"group"`
SrcFs string `json:"srcFs,omitempty"`
DstFs string `json:"dstFs,omitempty"`
}
// MarshalJSON implements json.Marshaler interface.
func (as TransferSnapshot) MarshalJSON() ([]byte, error) {
err := ""
if as.Error != nil {
err = as.Error.Error()
}
type Alias TransferSnapshot
return json.Marshal(&struct {
Error string `json:"error"`
Alias
}{
Error: err,
Alias: (Alias)(as),
})
}
// Transfer keeps track of initiated transfers and provides access to
// accounting functions.
// Transfer needs to be closed on completion.
type Transfer struct {
// these are initialised at creation and may be accessed without locking
stats *StatsInfo
remote string
size int64
startedAt time.Time
checking bool
what string // what kind of transfer this is
srcFs fs.Fs // source Fs - may be nil
dstFs fs.Fs // destination Fs - may be nil
// Protects all below
//
// NB to avoid deadlocks we must release this lock before
// calling any methods on Transfer.stats. This is because
// StatsInfo calls back into Transfer.
mu sync.RWMutex
acc *Account
err error
completedAt time.Time
}
// newCheckingTransfer instantiates new checking of the object.
func newCheckingTransfer(stats *StatsInfo, obj fs.DirEntry, what string) *Transfer {
return newTransferRemoteSize(stats, obj.Remote(), obj.Size(), true, what, nil, nil)
}
// newTransfer instantiates new transfer.
func newTransfer(stats *StatsInfo, obj fs.DirEntry, srcFs, dstFs fs.Fs) *Transfer {
return newTransferRemoteSize(stats, obj.Remote(), obj.Size(), false, "", srcFs, dstFs)
}
func newTransferRemoteSize(stats *StatsInfo, remote string, size int64, checking bool, what string, srcFs, dstFs fs.Fs) *Transfer {
tr := &Transfer{
stats: stats,
remote: remote,
size: size,
startedAt: time.Now(),
checking: checking,
what: what,
srcFs: srcFs,
dstFs: dstFs,
}
stats.AddTransfer(tr)
return tr
}
// Done ends the transfer.
// Must be called after transfer is finished to run proper cleanups.
func (tr *Transfer) Done(ctx context.Context, err error) {
if err != nil {
err = tr.stats.Error(err)
tr.mu.Lock()
tr.err = err
tr.mu.Unlock()
}
tr.mu.RLock()
acc := tr.acc
tr.mu.RUnlock()
ci := fs.GetConfig(ctx)
if acc != nil {
// Close the file if it is still open
if err := acc.Close(); err != nil {
fs.LogLevelPrintf(ci.StatsLogLevel, nil, "can't close account: %+v\n", err)
}
// Signal done with accounting
acc.Done()
// free the account since we may keep the transfer
acc = nil
}
tr.mu.Lock()
tr.completedAt = time.Now()
tr.mu.Unlock()
if tr.checking {
tr.stats.DoneChecking(tr.remote)
} else {
tr.stats.DoneTransferring(tr.remote, err == nil)
}
tr.stats.PruneTransfers()
}
// Reset allows to switch the Account to another transfer method.
func (tr *Transfer) Reset(ctx context.Context) {
tr.mu.RLock()
acc := tr.acc
tr.acc = nil
tr.mu.RUnlock()
ci := fs.GetConfig(ctx)
if acc != nil {
acc.Done()
if err := acc.Close(); err != nil {
fs.LogLevelPrintf(ci.StatsLogLevel, nil, "can't close account: %+v\n", err)
}
}
}
// Account returns reader that knows how to keep track of transfer progress.
func (tr *Transfer) Account(ctx context.Context, in io.ReadCloser) *Account {
tr.mu.Lock()
if tr.acc == nil {
tr.acc = newAccountSizeName(ctx, tr.stats, in, tr.size, tr.remote)
} else {
tr.acc.UpdateReader(ctx, in)
}
tr.acc.checking = tr.checking
tr.mu.Unlock()
return tr.acc
}
// TimeRange returns the time transfer started and ended at. If not completed
// it will return zero time for end time.
func (tr *Transfer) TimeRange() (time.Time, time.Time) {
tr.mu.RLock()
defer tr.mu.RUnlock()
return tr.startedAt, tr.completedAt
}
// IsDone returns true if transfer is completed.
func (tr *Transfer) IsDone() bool {
tr.mu.RLock()
defer tr.mu.RUnlock()
return !tr.completedAt.IsZero()
}
// Snapshot produces stats for this account at point in time.
func (tr *Transfer) Snapshot() TransferSnapshot {
tr.mu.RLock()
defer tr.mu.RUnlock()
var s, b int64 = tr.size, 0
if tr.acc != nil {
b, s = tr.acc.progress()
}
snapshot := TransferSnapshot{
Name: tr.remote,
Checked: tr.checking,
Size: s,
Bytes: b,
StartedAt: tr.startedAt,
CompletedAt: tr.completedAt,
Error: tr.err,
Group: tr.stats.group,
}
if tr.srcFs != nil {
snapshot.SrcFs = fs.ConfigString(tr.srcFs)
}
if tr.dstFs != nil {
snapshot.DstFs = fs.ConfigString(tr.dstFs)
}
return snapshot
}
// rcStats returns stats for the transfer suitable for the rc
func (tr *Transfer) rcStats() rc.Params {
out := rc.Params{
"name": tr.remote, // no locking needed to access this
"size": tr.size,
}
if tr.srcFs != nil {
out["srcFs"] = fs.ConfigString(tr.srcFs)
}
if tr.dstFs != nil {
out["dstFs"] = fs.ConfigString(tr.dstFs)
}
return out
}