de185de215
Before this change we showed both server side moves and server side copies as bytes transferred. This made a nice easy to use stats display, but also caused confusion for users who saw unrealistic transfer times. It also caused a problem with --max-transfer and chunker which renames each chunk after uploading which was counted as a transfer byte. This patch instead accounts the server side move and copy statistics as a seperate lines in the stats display which will only appear if there are any server side moves / copies. This is also output in the rc. This gives users something to look at when transfers are running which was the point of the original change but it now means that transfer bytes represents data transfers through this rclone instance only. Fixes #7183
892 lines
23 KiB
Go
892 lines
23 KiB
Go
package accounting
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/rclone/rclone/fs"
|
|
"github.com/rclone/rclone/fs/fserrors"
|
|
"github.com/rclone/rclone/fs/rc"
|
|
"github.com/rclone/rclone/lib/terminal"
|
|
)
|
|
|
|
const (
|
|
averagePeriodLength = time.Second
|
|
averageStopAfter = time.Minute
|
|
)
|
|
|
|
// MaxCompletedTransfers specifies maximum number of completed transfers in startedTransfers list
|
|
var MaxCompletedTransfers = 100
|
|
|
|
// StatsInfo accounts all transfers
|
|
// N.B.: if this struct is modified, please remember to also update sum() function in stats_groups
|
|
// to correctly count the updated fields
|
|
type StatsInfo struct {
|
|
mu sync.RWMutex
|
|
ctx context.Context
|
|
ci *fs.ConfigInfo
|
|
bytes int64
|
|
errors int64
|
|
lastError error
|
|
fatalError bool
|
|
retryError bool
|
|
retryAfter time.Time
|
|
checks int64
|
|
checking *transferMap
|
|
checkQueue int
|
|
checkQueueSize int64
|
|
transfers int64
|
|
transferring *transferMap
|
|
transferQueue int
|
|
transferQueueSize int64
|
|
renames int64
|
|
renameQueue int
|
|
renameQueueSize int64
|
|
deletes int64
|
|
deletesSize int64
|
|
deletedDirs int64
|
|
inProgress *inProgress
|
|
startedTransfers []*Transfer // currently active transfers
|
|
oldTimeRanges timeRanges // a merged list of time ranges for the transfers
|
|
oldDuration time.Duration // duration of transfers we have culled
|
|
group string
|
|
startTime time.Time // the moment these stats were initialized or reset
|
|
average averageValues
|
|
serverSideCopies int64
|
|
serverSideCopyBytes int64
|
|
serverSideMoves int64
|
|
serverSideMoveBytes int64
|
|
}
|
|
|
|
type averageValues struct {
|
|
mu sync.Mutex
|
|
lpBytes int64
|
|
lpTime time.Time
|
|
speed float64
|
|
stop chan bool
|
|
stopped sync.WaitGroup
|
|
startOnce sync.Once
|
|
stopOnce sync.Once
|
|
}
|
|
|
|
// NewStats creates an initialised StatsInfo
|
|
func NewStats(ctx context.Context) *StatsInfo {
|
|
ci := fs.GetConfig(ctx)
|
|
return &StatsInfo{
|
|
ctx: ctx,
|
|
ci: ci,
|
|
checking: newTransferMap(ci.Checkers, "checking"),
|
|
transferring: newTransferMap(ci.Transfers, "transferring"),
|
|
inProgress: newInProgress(ctx),
|
|
startTime: time.Now(),
|
|
average: averageValues{stop: make(chan bool)},
|
|
}
|
|
}
|
|
|
|
// RemoteStats returns stats for rc
|
|
func (s *StatsInfo) RemoteStats() (out rc.Params, err error) {
|
|
// NB if adding values here - make sure you update the docs in
|
|
// stats_groups.go
|
|
|
|
out = make(rc.Params)
|
|
|
|
ts := s.calculateTransferStats()
|
|
out["totalChecks"] = ts.totalChecks
|
|
out["totalTransfers"] = ts.totalTransfers
|
|
out["totalBytes"] = ts.totalBytes
|
|
out["transferTime"] = ts.transferTime
|
|
out["speed"] = ts.speed
|
|
|
|
s.mu.RLock()
|
|
out["bytes"] = s.bytes
|
|
out["errors"] = s.errors
|
|
out["fatalError"] = s.fatalError
|
|
out["retryError"] = s.retryError
|
|
out["checks"] = s.checks
|
|
out["transfers"] = s.transfers
|
|
out["deletes"] = s.deletes
|
|
out["deletedDirs"] = s.deletedDirs
|
|
out["renames"] = s.renames
|
|
out["elapsedTime"] = time.Since(s.startTime).Seconds()
|
|
out["serverSideCopies"] = s.serverSideCopies
|
|
out["serverSideCopyBytes"] = s.serverSideCopyBytes
|
|
out["serverSideMoves"] = s.serverSideMoves
|
|
out["serverSideMoveBytes"] = s.serverSideMoveBytes
|
|
eta, etaOK := eta(s.bytes, ts.totalBytes, ts.speed)
|
|
if etaOK {
|
|
out["eta"] = eta.Seconds()
|
|
} else {
|
|
out["eta"] = nil
|
|
}
|
|
s.mu.RUnlock()
|
|
|
|
if !s.checking.empty() {
|
|
out["checking"] = s.checking.remotes()
|
|
}
|
|
if !s.transferring.empty() {
|
|
out["transferring"] = s.transferring.rcStats(s.inProgress)
|
|
}
|
|
if s.errors > 0 {
|
|
out["lastError"] = s.lastError.Error()
|
|
}
|
|
|
|
return out, nil
|
|
}
|
|
|
|
// speed returns the average speed of the transfer in bytes/second
|
|
//
|
|
// Call with lock held
|
|
func (s *StatsInfo) speed() float64 {
|
|
return s.average.speed
|
|
}
|
|
|
|
// timeRange is a start and end time of a transfer
|
|
type timeRange struct {
|
|
start time.Time
|
|
end time.Time
|
|
}
|
|
|
|
// timeRanges is a list of non-overlapping start and end times for
|
|
// transfers
|
|
type timeRanges []timeRange
|
|
|
|
// merge all the overlapping time ranges
|
|
func (trs *timeRanges) merge() {
|
|
Trs := *trs
|
|
|
|
// Sort by the starting time.
|
|
sort.Slice(Trs, func(i, j int) bool {
|
|
return Trs[i].start.Before(Trs[j].start)
|
|
})
|
|
|
|
// Merge overlaps and add distinctive ranges together
|
|
var (
|
|
newTrs = Trs[:0]
|
|
i, j = 0, 1
|
|
)
|
|
for i < len(Trs) {
|
|
if j < len(Trs) {
|
|
if !Trs[i].end.Before(Trs[j].start) {
|
|
if Trs[i].end.Before(Trs[j].end) {
|
|
Trs[i].end = Trs[j].end
|
|
}
|
|
j++
|
|
continue
|
|
}
|
|
}
|
|
newTrs = append(newTrs, Trs[i])
|
|
i = j
|
|
j++
|
|
}
|
|
|
|
*trs = newTrs
|
|
}
|
|
|
|
// cull remove any ranges whose start and end are before cutoff
|
|
// returning their duration sum
|
|
func (trs *timeRanges) cull(cutoff time.Time) (d time.Duration) {
|
|
var newTrs = (*trs)[:0]
|
|
for _, tr := range *trs {
|
|
if cutoff.Before(tr.start) || cutoff.Before(tr.end) {
|
|
newTrs = append(newTrs, tr)
|
|
} else {
|
|
d += tr.end.Sub(tr.start)
|
|
}
|
|
}
|
|
*trs = newTrs
|
|
return d
|
|
}
|
|
|
|
// total the time out of the time ranges
|
|
func (trs timeRanges) total() (total time.Duration) {
|
|
for _, tr := range trs {
|
|
total += tr.end.Sub(tr.start)
|
|
}
|
|
return total
|
|
}
|
|
|
|
// Total duration is union of durations of all transfers belonging to this
|
|
// object.
|
|
// Needs to be protected by mutex.
|
|
func (s *StatsInfo) totalDuration() time.Duration {
|
|
// copy of s.oldTimeRanges with extra room for the current transfers
|
|
timeRanges := make(timeRanges, len(s.oldTimeRanges), len(s.oldTimeRanges)+len(s.startedTransfers))
|
|
copy(timeRanges, s.oldTimeRanges)
|
|
|
|
// Extract time ranges of all transfers.
|
|
now := time.Now()
|
|
for i := range s.startedTransfers {
|
|
start, end := s.startedTransfers[i].TimeRange()
|
|
if end.IsZero() {
|
|
end = now
|
|
}
|
|
timeRanges = append(timeRanges, timeRange{start, end})
|
|
}
|
|
|
|
timeRanges.merge()
|
|
return s.oldDuration + timeRanges.total()
|
|
}
|
|
|
|
const (
|
|
etaMaxSeconds = (1<<63 - 1) / int64(time.Second) // Largest possible ETA as number of seconds
|
|
etaMax = time.Duration(etaMaxSeconds) * time.Second // Largest possible ETA, which is in second precision, representing "292y24w3d23h47m16s"
|
|
)
|
|
|
|
// eta returns the ETA of the current operation,
|
|
// rounded to full seconds.
|
|
// If the ETA cannot be determined 'ok' returns false.
|
|
func eta(size, total int64, rate float64) (eta time.Duration, ok bool) {
|
|
if total <= 0 || size < 0 || rate <= 0 {
|
|
return 0, false
|
|
}
|
|
remaining := total - size
|
|
if remaining < 0 {
|
|
return 0, false
|
|
}
|
|
seconds := int64(float64(remaining) / rate)
|
|
if seconds < 0 {
|
|
// Got Int64 overflow
|
|
eta = etaMax
|
|
} else if seconds >= etaMaxSeconds {
|
|
// Would get Int64 overflow if converting from seconds to Duration (nanoseconds)
|
|
eta = etaMax
|
|
} else {
|
|
eta = time.Duration(seconds) * time.Second
|
|
}
|
|
return eta, true
|
|
}
|
|
|
|
// etaString returns the ETA of the current operation,
|
|
// rounded to full seconds.
|
|
// If the ETA cannot be determined it returns "-"
|
|
func etaString(done, total int64, rate float64) string {
|
|
d, ok := eta(done, total, rate)
|
|
if !ok {
|
|
return "-"
|
|
}
|
|
if d == etaMax {
|
|
return "-"
|
|
}
|
|
return fs.Duration(d).ShortReadableString()
|
|
}
|
|
|
|
// percent returns a/b as a percentage rounded to the nearest integer
|
|
// as a string
|
|
//
|
|
// if the percentage is invalid it returns "-"
|
|
func percent(a int64, b int64) string {
|
|
if a < 0 || b <= 0 {
|
|
return "-"
|
|
}
|
|
return fmt.Sprintf("%d%%", int(float64(a)*100/float64(b)+0.5))
|
|
}
|
|
|
|
// returned from calculateTransferStats
|
|
type transferStats struct {
|
|
totalChecks int64
|
|
totalTransfers int64
|
|
totalBytes int64
|
|
transferTime float64
|
|
speed float64
|
|
}
|
|
|
|
// calculateTransferStats calculates some additional transfer stats not
|
|
// stored directly in StatsInfo
|
|
func (s *StatsInfo) calculateTransferStats() (ts transferStats) {
|
|
// checking and transferring have their own locking so read
|
|
// here before lock to prevent deadlock on GetBytes
|
|
transferring, checking := s.transferring.count(), s.checking.count()
|
|
transferringBytesDone, transferringBytesTotal := s.transferring.progress(s)
|
|
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
ts.totalChecks = int64(s.checkQueue) + s.checks + int64(checking)
|
|
ts.totalTransfers = int64(s.transferQueue) + s.transfers + int64(transferring)
|
|
// note that s.bytes already includes transferringBytesDone so
|
|
// we take it off here to avoid double counting
|
|
ts.totalBytes = s.transferQueueSize + s.bytes + transferringBytesTotal - transferringBytesDone
|
|
ts.speed = s.average.speed
|
|
|
|
return ts
|
|
}
|
|
|
|
func (s *StatsInfo) averageLoop() {
|
|
var period float64
|
|
|
|
ticker := time.NewTicker(averagePeriodLength)
|
|
defer ticker.Stop()
|
|
|
|
startTime := time.Now()
|
|
a := &s.average
|
|
defer a.stopped.Done()
|
|
for {
|
|
select {
|
|
case now := <-ticker.C:
|
|
a.mu.Lock()
|
|
var elapsed float64
|
|
if a.lpTime.IsZero() {
|
|
elapsed = now.Sub(startTime).Seconds()
|
|
} else {
|
|
elapsed = now.Sub(a.lpTime).Seconds()
|
|
}
|
|
avg := 0.0
|
|
if elapsed > 0 {
|
|
avg = float64(a.lpBytes) / elapsed
|
|
}
|
|
if period < averagePeriod {
|
|
period++
|
|
}
|
|
a.speed = (avg + a.speed*(period-1)) / period
|
|
a.lpBytes = 0
|
|
a.lpTime = now
|
|
a.mu.Unlock()
|
|
case <-a.stop:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *StatsInfo) startAverageLoop() {
|
|
s.average.startOnce.Do(func() {
|
|
s.average.stopped.Add(1)
|
|
go s.averageLoop()
|
|
})
|
|
}
|
|
|
|
func (s *StatsInfo) stopAverageLoop() {
|
|
s.average.stopOnce.Do(func() {
|
|
close(s.average.stop)
|
|
s.average.stopped.Wait()
|
|
})
|
|
}
|
|
|
|
// String convert the StatsInfo to a string for printing
|
|
func (s *StatsInfo) String() string {
|
|
// NB if adding more stats in here, remember to add them into
|
|
// RemoteStats() too.
|
|
|
|
ts := s.calculateTransferStats()
|
|
|
|
s.mu.RLock()
|
|
|
|
var (
|
|
buf = &bytes.Buffer{}
|
|
xfrchkString = ""
|
|
dateString = ""
|
|
elapsedTime = time.Since(s.startTime)
|
|
elapsedTimeSecondsOnly = elapsedTime.Truncate(time.Second/10) % time.Minute
|
|
displaySpeedString string
|
|
)
|
|
|
|
if s.ci.DataRateUnit == "bits" {
|
|
displaySpeedString = fs.SizeSuffix(ts.speed * 8).BitRateUnit()
|
|
} else {
|
|
displaySpeedString = fs.SizeSuffix(ts.speed).ByteRateUnit()
|
|
}
|
|
|
|
if !s.ci.StatsOneLine {
|
|
_, _ = fmt.Fprintf(buf, "\nTransferred: ")
|
|
} else {
|
|
xfrchk := []string{}
|
|
if ts.totalTransfers > 0 && s.transferQueue > 0 {
|
|
xfrchk = append(xfrchk, fmt.Sprintf("xfr#%d/%d", s.transfers, ts.totalTransfers))
|
|
}
|
|
if ts.totalChecks > 0 && s.checkQueue > 0 {
|
|
xfrchk = append(xfrchk, fmt.Sprintf("chk#%d/%d", s.checks, ts.totalChecks))
|
|
}
|
|
if len(xfrchk) > 0 {
|
|
xfrchkString = fmt.Sprintf(" (%s)", strings.Join(xfrchk, ", "))
|
|
}
|
|
if s.ci.StatsOneLineDate {
|
|
t := time.Now()
|
|
dateString = t.Format(s.ci.StatsOneLineDateFormat) // Including the separator so people can customize it
|
|
}
|
|
}
|
|
|
|
_, _ = fmt.Fprintf(buf, "%s%13s / %s, %s, %s, ETA %s%s",
|
|
dateString,
|
|
fs.SizeSuffix(s.bytes).ByteUnit(),
|
|
fs.SizeSuffix(ts.totalBytes).ByteUnit(),
|
|
percent(s.bytes, ts.totalBytes),
|
|
displaySpeedString,
|
|
etaString(s.bytes, ts.totalBytes, ts.speed),
|
|
xfrchkString,
|
|
)
|
|
|
|
if s.ci.ProgressTerminalTitle {
|
|
// Writes ETA to the terminal title
|
|
terminal.WriteTerminalTitle("ETA: " + etaString(s.bytes, ts.totalBytes, ts.speed))
|
|
}
|
|
|
|
if !s.ci.StatsOneLine {
|
|
_, _ = buf.WriteRune('\n')
|
|
errorDetails := ""
|
|
switch {
|
|
case s.fatalError:
|
|
errorDetails = " (fatal error encountered)"
|
|
case s.retryError:
|
|
errorDetails = " (retrying may help)"
|
|
case s.errors != 0:
|
|
errorDetails = " (no need to retry)"
|
|
|
|
}
|
|
|
|
// Add only non zero stats
|
|
if s.errors != 0 {
|
|
_, _ = fmt.Fprintf(buf, "Errors: %10d%s\n",
|
|
s.errors, errorDetails)
|
|
}
|
|
if s.checks != 0 || ts.totalChecks != 0 {
|
|
_, _ = fmt.Fprintf(buf, "Checks: %10d / %d, %s\n",
|
|
s.checks, ts.totalChecks, percent(s.checks, ts.totalChecks))
|
|
}
|
|
if s.deletes != 0 || s.deletedDirs != 0 {
|
|
_, _ = fmt.Fprintf(buf, "Deleted: %10d (files), %d (dirs)\n", s.deletes, s.deletedDirs)
|
|
}
|
|
if s.renames != 0 {
|
|
_, _ = fmt.Fprintf(buf, "Renamed: %10d\n", s.renames)
|
|
}
|
|
if s.transfers != 0 || ts.totalTransfers != 0 {
|
|
_, _ = fmt.Fprintf(buf, "Transferred: %10d / %d, %s\n",
|
|
s.transfers, ts.totalTransfers, percent(s.transfers, ts.totalTransfers))
|
|
}
|
|
if s.serverSideCopies != 0 || s.serverSideCopyBytes != 0 {
|
|
_, _ = fmt.Fprintf(buf, "Server Side Copies:%6d @ %s\n",
|
|
s.serverSideCopies, fs.SizeSuffix(s.serverSideCopyBytes).ByteUnit(),
|
|
)
|
|
}
|
|
if s.serverSideMoves != 0 || s.serverSideMoveBytes != 0 {
|
|
_, _ = fmt.Fprintf(buf, "Server Side Moves:%7d @ %s\n",
|
|
s.serverSideMoves, fs.SizeSuffix(s.serverSideMoveBytes).ByteUnit(),
|
|
)
|
|
}
|
|
_, _ = fmt.Fprintf(buf, "Elapsed time: %10ss\n", strings.TrimRight(fs.Duration(elapsedTime.Truncate(time.Minute)).ReadableString(), "0s")+fmt.Sprintf("%.1f", elapsedTimeSecondsOnly.Seconds()))
|
|
}
|
|
|
|
// checking and transferring have their own locking so unlock
|
|
// here to prevent deadlock on GetBytes
|
|
s.mu.RUnlock()
|
|
|
|
// Add per transfer stats if required
|
|
if !s.ci.StatsOneLine {
|
|
if !s.checking.empty() {
|
|
_, _ = fmt.Fprintf(buf, "Checking:\n%s\n", s.checking.String(s.ctx, s.inProgress, s.transferring))
|
|
}
|
|
if !s.transferring.empty() {
|
|
_, _ = fmt.Fprintf(buf, "Transferring:\n%s\n", s.transferring.String(s.ctx, s.inProgress, nil))
|
|
}
|
|
}
|
|
|
|
return buf.String()
|
|
}
|
|
|
|
// Transferred returns list of all completed transfers including checked and
|
|
// failed ones.
|
|
func (s *StatsInfo) Transferred() []TransferSnapshot {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
ts := make([]TransferSnapshot, 0, len(s.startedTransfers))
|
|
|
|
for _, tr := range s.startedTransfers {
|
|
if tr.IsDone() {
|
|
ts = append(ts, tr.Snapshot())
|
|
}
|
|
}
|
|
|
|
return ts
|
|
}
|
|
|
|
// Log outputs the StatsInfo to the log
|
|
func (s *StatsInfo) Log() {
|
|
if s.ci.UseJSONLog {
|
|
out, _ := s.RemoteStats()
|
|
fs.LogLevelPrintf(s.ci.StatsLogLevel, nil, "%v%v\n", s, fs.LogValueHide("stats", out))
|
|
} else {
|
|
fs.LogLevelPrintf(s.ci.StatsLogLevel, nil, "%v\n", s)
|
|
}
|
|
|
|
}
|
|
|
|
// Bytes updates the stats for bytes bytes
|
|
func (s *StatsInfo) Bytes(bytes int64) {
|
|
s.average.mu.Lock()
|
|
s.average.lpBytes += bytes
|
|
s.average.mu.Unlock()
|
|
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.bytes += bytes
|
|
}
|
|
|
|
// GetBytes returns the number of bytes transferred so far
|
|
func (s *StatsInfo) GetBytes() int64 {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.bytes
|
|
}
|
|
|
|
// GetBytesWithPending returns the number of bytes transferred and remaining transfers
|
|
func (s *StatsInfo) GetBytesWithPending() int64 {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
pending := int64(0)
|
|
for _, tr := range s.startedTransfers {
|
|
if tr.acc != nil {
|
|
bytes, size := tr.acc.progress()
|
|
if bytes < size {
|
|
pending += size - bytes
|
|
}
|
|
}
|
|
}
|
|
return s.bytes + pending
|
|
}
|
|
|
|
// Errors updates the stats for errors
|
|
func (s *StatsInfo) Errors(errors int64) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.errors += errors
|
|
}
|
|
|
|
// GetErrors reads the number of errors
|
|
func (s *StatsInfo) GetErrors() int64 {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.errors
|
|
}
|
|
|
|
// GetLastError returns the lastError
|
|
func (s *StatsInfo) GetLastError() error {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.lastError
|
|
}
|
|
|
|
// GetChecks returns the number of checks
|
|
func (s *StatsInfo) GetChecks() int64 {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.checks
|
|
}
|
|
|
|
// FatalError sets the fatalError flag
|
|
func (s *StatsInfo) FatalError() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.fatalError = true
|
|
}
|
|
|
|
// HadFatalError returns whether there has been at least one FatalError
|
|
func (s *StatsInfo) HadFatalError() bool {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.fatalError
|
|
}
|
|
|
|
// RetryError sets the retryError flag
|
|
func (s *StatsInfo) RetryError() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.retryError = true
|
|
}
|
|
|
|
// HadRetryError returns whether there has been at least one non-NoRetryError
|
|
func (s *StatsInfo) HadRetryError() bool {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.retryError
|
|
}
|
|
|
|
var (
|
|
errMaxDelete = fserrors.FatalError(errors.New("--max-delete threshold reached"))
|
|
errMaxDeleteSize = fserrors.FatalError(errors.New("--max-delete-size threshold reached"))
|
|
)
|
|
|
|
// DeleteFile updates the stats for deleting a file
|
|
//
|
|
// It may return fatal errors if the threshold for --max-delete or
|
|
// --max-delete-size have been reached.
|
|
func (s *StatsInfo) DeleteFile(ctx context.Context, size int64) error {
|
|
ci := fs.GetConfig(ctx)
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if size < 0 {
|
|
size = 0
|
|
}
|
|
if ci.MaxDelete >= 0 && s.deletes+1 > ci.MaxDelete {
|
|
return errMaxDelete
|
|
}
|
|
if ci.MaxDeleteSize >= 0 && s.deletesSize+size > int64(ci.MaxDeleteSize) {
|
|
return errMaxDeleteSize
|
|
}
|
|
s.deletes++
|
|
s.deletesSize += size
|
|
return nil
|
|
}
|
|
|
|
// GetDeletes returns the number of deletes
|
|
func (s *StatsInfo) GetDeletes() int64 {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
return s.deletes
|
|
}
|
|
|
|
// DeletedDirs updates the stats for deletedDirs
|
|
func (s *StatsInfo) DeletedDirs(deletedDirs int64) int64 {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.deletedDirs += deletedDirs
|
|
return s.deletedDirs
|
|
}
|
|
|
|
// Renames updates the stats for renames
|
|
func (s *StatsInfo) Renames(renames int64) int64 {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.renames += renames
|
|
return s.renames
|
|
}
|
|
|
|
// ResetCounters sets the counters (bytes, checks, errors, transfers, deletes, renames) to 0 and resets lastError, fatalError and retryError
|
|
func (s *StatsInfo) ResetCounters() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.bytes = 0
|
|
s.errors = 0
|
|
s.lastError = nil
|
|
s.fatalError = false
|
|
s.retryError = false
|
|
s.retryAfter = time.Time{}
|
|
s.checks = 0
|
|
s.transfers = 0
|
|
s.deletes = 0
|
|
s.deletesSize = 0
|
|
s.deletedDirs = 0
|
|
s.renames = 0
|
|
s.startedTransfers = nil
|
|
s.oldDuration = 0
|
|
|
|
s.stopAverageLoop()
|
|
s.average = averageValues{stop: make(chan bool)}
|
|
}
|
|
|
|
// ResetErrors sets the errors count to 0 and resets lastError, fatalError and retryError
|
|
func (s *StatsInfo) ResetErrors() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.errors = 0
|
|
s.lastError = nil
|
|
s.fatalError = false
|
|
s.retryError = false
|
|
s.retryAfter = time.Time{}
|
|
}
|
|
|
|
// Errored returns whether there have been any errors
|
|
func (s *StatsInfo) Errored() bool {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.errors != 0
|
|
}
|
|
|
|
// Error adds a single error into the stats, assigns lastError and eventually sets fatalError or retryError
|
|
func (s *StatsInfo) Error(err error) error {
|
|
if err == nil || fserrors.IsCounted(err) {
|
|
return err
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.errors++
|
|
s.lastError = err
|
|
err = fserrors.FsError(err)
|
|
fserrors.Count(err)
|
|
switch {
|
|
case fserrors.IsFatalError(err):
|
|
s.fatalError = true
|
|
case fserrors.IsRetryAfterError(err):
|
|
retryAfter := fserrors.RetryAfterErrorTime(err)
|
|
if s.retryAfter.IsZero() || retryAfter.Sub(s.retryAfter) > 0 {
|
|
s.retryAfter = retryAfter
|
|
}
|
|
s.retryError = true
|
|
case !fserrors.IsNoRetryError(err):
|
|
s.retryError = true
|
|
}
|
|
return err
|
|
}
|
|
|
|
// RetryAfter returns the time to retry after if it is set. It will
|
|
// be Zero if it isn't set.
|
|
func (s *StatsInfo) RetryAfter() time.Time {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
return s.retryAfter
|
|
}
|
|
|
|
// NewCheckingTransfer adds a checking transfer to the stats, from the object.
|
|
func (s *StatsInfo) NewCheckingTransfer(obj fs.DirEntry, what string) *Transfer {
|
|
tr := newCheckingTransfer(s, obj, what)
|
|
s.checking.add(tr)
|
|
return tr
|
|
}
|
|
|
|
// DoneChecking removes a check from the stats
|
|
func (s *StatsInfo) DoneChecking(remote string) {
|
|
s.checking.del(remote)
|
|
s.mu.Lock()
|
|
s.checks++
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// GetTransfers reads the number of transfers
|
|
func (s *StatsInfo) GetTransfers() int64 {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.transfers
|
|
}
|
|
|
|
// NewTransfer adds a transfer to the stats from the object.
|
|
func (s *StatsInfo) NewTransfer(obj fs.DirEntry) *Transfer {
|
|
tr := newTransfer(s, obj)
|
|
s.transferring.add(tr)
|
|
s.startAverageLoop()
|
|
return tr
|
|
}
|
|
|
|
// NewTransferRemoteSize adds a transfer to the stats based on remote and size.
|
|
func (s *StatsInfo) NewTransferRemoteSize(remote string, size int64) *Transfer {
|
|
tr := newTransferRemoteSize(s, remote, size, false, "")
|
|
s.transferring.add(tr)
|
|
s.startAverageLoop()
|
|
return tr
|
|
}
|
|
|
|
// DoneTransferring removes a transfer from the stats
|
|
//
|
|
// if ok is true and it was in the transfermap (to avoid incrementing in case of nested calls, #6213) then it increments the transfers count
|
|
func (s *StatsInfo) DoneTransferring(remote string, ok bool) {
|
|
existed := s.transferring.del(remote)
|
|
if ok && existed {
|
|
s.mu.Lock()
|
|
s.transfers++
|
|
s.mu.Unlock()
|
|
}
|
|
if s.transferring.empty() {
|
|
time.AfterFunc(averageStopAfter, s.stopAverageLoop)
|
|
}
|
|
}
|
|
|
|
// SetCheckQueue sets the number of queued checks
|
|
func (s *StatsInfo) SetCheckQueue(n int, size int64) {
|
|
s.mu.Lock()
|
|
s.checkQueue = n
|
|
s.checkQueueSize = size
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// SetTransferQueue sets the number of queued transfers
|
|
func (s *StatsInfo) SetTransferQueue(n int, size int64) {
|
|
s.mu.Lock()
|
|
s.transferQueue = n
|
|
s.transferQueueSize = size
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// SetRenameQueue sets the number of queued transfers
|
|
func (s *StatsInfo) SetRenameQueue(n int, size int64) {
|
|
s.mu.Lock()
|
|
s.renameQueue = n
|
|
s.renameQueueSize = size
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// AddTransfer adds reference to the started transfer.
|
|
func (s *StatsInfo) AddTransfer(transfer *Transfer) {
|
|
s.mu.Lock()
|
|
s.startedTransfers = append(s.startedTransfers, transfer)
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// removeTransfer removes a reference to the started transfer in
|
|
// position i.
|
|
//
|
|
// Must be called with the lock held
|
|
func (s *StatsInfo) removeTransfer(transfer *Transfer, i int) {
|
|
now := time.Now()
|
|
|
|
// add finished transfer onto old time ranges
|
|
start, end := transfer.TimeRange()
|
|
if end.IsZero() {
|
|
end = now
|
|
}
|
|
s.oldTimeRanges = append(s.oldTimeRanges, timeRange{start, end})
|
|
s.oldTimeRanges.merge()
|
|
|
|
// remove the found entry
|
|
s.startedTransfers = append(s.startedTransfers[:i], s.startedTransfers[i+1:]...)
|
|
|
|
// Find youngest active transfer
|
|
oldestStart := now
|
|
for i := range s.startedTransfers {
|
|
start, _ := s.startedTransfers[i].TimeRange()
|
|
if start.Before(oldestStart) {
|
|
oldestStart = start
|
|
}
|
|
}
|
|
|
|
// remove old entries older than that
|
|
s.oldDuration += s.oldTimeRanges.cull(oldestStart)
|
|
}
|
|
|
|
// RemoveTransfer removes a reference to the started transfer.
|
|
func (s *StatsInfo) RemoveTransfer(transfer *Transfer) {
|
|
s.mu.Lock()
|
|
for i, tr := range s.startedTransfers {
|
|
if tr == transfer {
|
|
s.removeTransfer(tr, i)
|
|
break
|
|
}
|
|
}
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// PruneTransfers makes sure there aren't too many old transfers by removing
|
|
// single finished transfer.
|
|
func (s *StatsInfo) PruneTransfers() {
|
|
if MaxCompletedTransfers < 0 {
|
|
return
|
|
}
|
|
s.mu.Lock()
|
|
// remove a transfer from the start if we are over quota
|
|
if len(s.startedTransfers) > MaxCompletedTransfers+s.ci.Transfers {
|
|
for i, tr := range s.startedTransfers {
|
|
if tr.IsDone() {
|
|
s.removeTransfer(tr, i)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// AddServerSideMove counts a server side move
|
|
func (s *StatsInfo) AddServerSideMove(n int64) {
|
|
s.mu.Lock()
|
|
s.serverSideMoves += 1
|
|
s.serverSideMoveBytes += n
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// AddServerSideCopy counts a server side copy
|
|
func (s *StatsInfo) AddServerSideCopy(n int64) {
|
|
s.mu.Lock()
|
|
s.serverSideCopies += 1
|
|
s.serverSideCopyBytes += n
|
|
s.mu.Unlock()
|
|
}
|