517e7d9271
This was caused by nested calls to NewTransfer/Done. This fixes the problem by only incrementing transfers if the remote is present in the transferMap which means we only increment it once.
821 lines
20 KiB
Go
821 lines
20 KiB
Go
package accounting
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/rclone/rclone/fs"
|
|
"github.com/rclone/rclone/fs/fserrors"
|
|
"github.com/rclone/rclone/fs/rc"
|
|
"github.com/rclone/rclone/lib/terminal"
|
|
)
|
|
|
|
const (
|
|
averagePeriodLength = time.Second
|
|
averageStopAfter = time.Minute
|
|
)
|
|
|
|
// MaxCompletedTransfers specifies maximum number of completed transfers in startedTransfers list
|
|
var MaxCompletedTransfers = 100
|
|
|
|
// StatsInfo accounts all transfers
|
|
// N.B.: if this struct is modified, please remember to also update sum() function in stats_groups
|
|
// to correctly count the updated fields
|
|
type StatsInfo struct {
|
|
mu sync.RWMutex
|
|
ctx context.Context
|
|
ci *fs.ConfigInfo
|
|
bytes int64
|
|
errors int64
|
|
lastError error
|
|
fatalError bool
|
|
retryError bool
|
|
retryAfter time.Time
|
|
checks int64
|
|
checking *transferMap
|
|
checkQueue int
|
|
checkQueueSize int64
|
|
transfers int64
|
|
transferring *transferMap
|
|
transferQueue int
|
|
transferQueueSize int64
|
|
renames int64
|
|
renameQueue int
|
|
renameQueueSize int64
|
|
deletes int64
|
|
deletedDirs int64
|
|
inProgress *inProgress
|
|
startedTransfers []*Transfer // currently active transfers
|
|
oldTimeRanges timeRanges // a merged list of time ranges for the transfers
|
|
oldDuration time.Duration // duration of transfers we have culled
|
|
group string
|
|
startTime time.Time // the moment these stats were initialized or reset
|
|
average averageValues
|
|
}
|
|
|
|
type averageValues struct {
|
|
mu sync.Mutex
|
|
lpBytes int64
|
|
lpTime time.Time
|
|
speed float64
|
|
stop chan bool
|
|
stopped sync.WaitGroup
|
|
startOnce sync.Once
|
|
stopOnce sync.Once
|
|
}
|
|
|
|
// NewStats creates an initialised StatsInfo
|
|
func NewStats(ctx context.Context) *StatsInfo {
|
|
ci := fs.GetConfig(ctx)
|
|
return &StatsInfo{
|
|
ctx: ctx,
|
|
ci: ci,
|
|
checking: newTransferMap(ci.Checkers, "checking"),
|
|
transferring: newTransferMap(ci.Transfers, "transferring"),
|
|
inProgress: newInProgress(ctx),
|
|
startTime: time.Now(),
|
|
average: averageValues{stop: make(chan bool)},
|
|
}
|
|
}
|
|
|
|
// RemoteStats returns stats for rc
|
|
func (s *StatsInfo) RemoteStats() (out rc.Params, err error) {
|
|
// NB if adding values here - make sure you update the docs in
|
|
// stats_groups.go
|
|
|
|
out = make(rc.Params)
|
|
|
|
ts := s.calculateTransferStats()
|
|
out["totalChecks"] = ts.totalChecks
|
|
out["totalTransfers"] = ts.totalTransfers
|
|
out["totalBytes"] = ts.totalBytes
|
|
out["transferTime"] = ts.transferTime
|
|
out["speed"] = ts.speed
|
|
|
|
s.mu.RLock()
|
|
out["bytes"] = s.bytes
|
|
out["errors"] = s.errors
|
|
out["fatalError"] = s.fatalError
|
|
out["retryError"] = s.retryError
|
|
out["checks"] = s.checks
|
|
out["transfers"] = s.transfers
|
|
out["deletes"] = s.deletes
|
|
out["deletedDirs"] = s.deletedDirs
|
|
out["renames"] = s.renames
|
|
out["elapsedTime"] = time.Since(s.startTime).Seconds()
|
|
eta, etaOK := eta(s.bytes, ts.totalBytes, ts.speed)
|
|
if etaOK {
|
|
out["eta"] = eta.Seconds()
|
|
} else {
|
|
out["eta"] = nil
|
|
}
|
|
s.mu.RUnlock()
|
|
|
|
if !s.checking.empty() {
|
|
out["checking"] = s.checking.remotes()
|
|
}
|
|
if !s.transferring.empty() {
|
|
out["transferring"] = s.transferring.rcStats(s.inProgress)
|
|
}
|
|
if s.errors > 0 {
|
|
out["lastError"] = s.lastError.Error()
|
|
}
|
|
|
|
return out, nil
|
|
}
|
|
|
|
// speed returns the average speed of the transfer in bytes/second
|
|
//
|
|
// Call with lock held
|
|
func (s *StatsInfo) speed() float64 {
|
|
dt := s.totalDuration()
|
|
dtSeconds := dt.Seconds()
|
|
speed := 0.0
|
|
if dt > 0 {
|
|
speed = float64(s.bytes) / dtSeconds
|
|
}
|
|
return speed
|
|
}
|
|
|
|
// timeRange is a start and end time of a transfer
|
|
type timeRange struct {
|
|
start time.Time
|
|
end time.Time
|
|
}
|
|
|
|
// timeRanges is a list of non-overlapping start and end times for
|
|
// transfers
|
|
type timeRanges []timeRange
|
|
|
|
// merge all the overlapping time ranges
|
|
func (trs *timeRanges) merge() {
|
|
Trs := *trs
|
|
|
|
// Sort by the starting time.
|
|
sort.Slice(Trs, func(i, j int) bool {
|
|
return Trs[i].start.Before(Trs[j].start)
|
|
})
|
|
|
|
// Merge overlaps and add distinctive ranges together
|
|
var (
|
|
newTrs = Trs[:0]
|
|
i, j = 0, 1
|
|
)
|
|
for i < len(Trs) {
|
|
if j < len(Trs) {
|
|
if !Trs[i].end.Before(Trs[j].start) {
|
|
if Trs[i].end.Before(Trs[j].end) {
|
|
Trs[i].end = Trs[j].end
|
|
}
|
|
j++
|
|
continue
|
|
}
|
|
}
|
|
newTrs = append(newTrs, Trs[i])
|
|
i = j
|
|
j++
|
|
}
|
|
|
|
*trs = newTrs
|
|
}
|
|
|
|
// cull remove any ranges whose start and end are before cutoff
|
|
// returning their duration sum
|
|
func (trs *timeRanges) cull(cutoff time.Time) (d time.Duration) {
|
|
var newTrs = (*trs)[:0]
|
|
for _, tr := range *trs {
|
|
if cutoff.Before(tr.start) || cutoff.Before(tr.end) {
|
|
newTrs = append(newTrs, tr)
|
|
} else {
|
|
d += tr.end.Sub(tr.start)
|
|
}
|
|
}
|
|
*trs = newTrs
|
|
return d
|
|
}
|
|
|
|
// total the time out of the time ranges
|
|
func (trs timeRanges) total() (total time.Duration) {
|
|
for _, tr := range trs {
|
|
total += tr.end.Sub(tr.start)
|
|
}
|
|
return total
|
|
}
|
|
|
|
// Total duration is union of durations of all transfers belonging to this
|
|
// object.
|
|
// Needs to be protected by mutex.
|
|
func (s *StatsInfo) totalDuration() time.Duration {
|
|
// copy of s.oldTimeRanges with extra room for the current transfers
|
|
timeRanges := make(timeRanges, len(s.oldTimeRanges), len(s.oldTimeRanges)+len(s.startedTransfers))
|
|
copy(timeRanges, s.oldTimeRanges)
|
|
|
|
// Extract time ranges of all transfers.
|
|
now := time.Now()
|
|
for i := range s.startedTransfers {
|
|
start, end := s.startedTransfers[i].TimeRange()
|
|
if end.IsZero() {
|
|
end = now
|
|
}
|
|
timeRanges = append(timeRanges, timeRange{start, end})
|
|
}
|
|
|
|
timeRanges.merge()
|
|
return s.oldDuration + timeRanges.total()
|
|
}
|
|
|
|
// eta returns the ETA of the current operation,
|
|
// rounded to full seconds.
|
|
// If the ETA cannot be determined 'ok' returns false.
|
|
func eta(size, total int64, rate float64) (eta time.Duration, ok bool) {
|
|
if total <= 0 || size < 0 || rate <= 0 {
|
|
return 0, false
|
|
}
|
|
remaining := total - size
|
|
if remaining < 0 {
|
|
return 0, false
|
|
}
|
|
seconds := float64(remaining) / rate
|
|
if seconds < 0 {
|
|
seconds = 0
|
|
}
|
|
return time.Second * time.Duration(seconds), true
|
|
}
|
|
|
|
// etaString returns the ETA of the current operation,
|
|
// rounded to full seconds.
|
|
// If the ETA cannot be determined it returns "-"
|
|
func etaString(done, total int64, rate float64) string {
|
|
d, ok := eta(done, total, rate)
|
|
if !ok {
|
|
return "-"
|
|
}
|
|
return fs.Duration(d).ReadableString()
|
|
}
|
|
|
|
// percent returns a/b as a percentage rounded to the nearest integer
|
|
// as a string
|
|
//
|
|
// if the percentage is invalid it returns "-"
|
|
func percent(a int64, b int64) string {
|
|
if a < 0 || b <= 0 {
|
|
return "-"
|
|
}
|
|
return fmt.Sprintf("%d%%", int(float64(a)*100/float64(b)+0.5))
|
|
}
|
|
|
|
// returned from calculateTransferStats
|
|
type transferStats struct {
|
|
totalChecks int64
|
|
totalTransfers int64
|
|
totalBytes int64
|
|
transferTime float64
|
|
speed float64
|
|
}
|
|
|
|
// calculateTransferStats calculates some addtional transfer stats not
|
|
// stored directly in StatsInfo
|
|
func (s *StatsInfo) calculateTransferStats() (ts transferStats) {
|
|
// checking and transferring have their own locking so read
|
|
// here before lock to prevent deadlock on GetBytes
|
|
transferring, checking := s.transferring.count(), s.checking.count()
|
|
transferringBytesDone, transferringBytesTotal := s.transferring.progress(s)
|
|
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
ts.totalChecks = int64(s.checkQueue) + s.checks + int64(checking)
|
|
ts.totalTransfers = int64(s.transferQueue) + s.transfers + int64(transferring)
|
|
// note that s.bytes already includes transferringBytesDone so
|
|
// we take it off here to avoid double counting
|
|
ts.totalBytes = s.transferQueueSize + s.bytes + transferringBytesTotal - transferringBytesDone
|
|
ts.speed = s.average.speed
|
|
|
|
return ts
|
|
}
|
|
|
|
func (s *StatsInfo) averageLoop() {
|
|
var period float64
|
|
|
|
ticker := time.NewTicker(averagePeriodLength)
|
|
defer ticker.Stop()
|
|
|
|
startTime := time.Now()
|
|
a := &s.average
|
|
defer a.stopped.Done()
|
|
for {
|
|
select {
|
|
case now := <-ticker.C:
|
|
a.mu.Lock()
|
|
var elapsed float64
|
|
if a.lpTime.IsZero() {
|
|
elapsed = now.Sub(startTime).Seconds()
|
|
} else {
|
|
elapsed = now.Sub(a.lpTime).Seconds()
|
|
}
|
|
avg := 0.0
|
|
if elapsed > 0 {
|
|
avg = float64(a.lpBytes) / elapsed
|
|
}
|
|
if period < averagePeriod {
|
|
period++
|
|
}
|
|
a.speed = (avg + a.speed*(period-1)) / period
|
|
a.lpBytes = 0
|
|
a.lpTime = now
|
|
a.mu.Unlock()
|
|
case <-a.stop:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *StatsInfo) startAverageLoop() {
|
|
s.average.startOnce.Do(func() {
|
|
s.average.stopped.Add(1)
|
|
go s.averageLoop()
|
|
})
|
|
}
|
|
|
|
func (s *StatsInfo) stopAverageLoop() {
|
|
s.average.stopOnce.Do(func() {
|
|
close(s.average.stop)
|
|
s.average.stopped.Wait()
|
|
})
|
|
}
|
|
|
|
// String convert the StatsInfo to a string for printing
|
|
func (s *StatsInfo) String() string {
|
|
// NB if adding more stats in here, remember to add them into
|
|
// RemoteStats() too.
|
|
|
|
ts := s.calculateTransferStats()
|
|
|
|
s.mu.RLock()
|
|
|
|
var (
|
|
buf = &bytes.Buffer{}
|
|
xfrchkString = ""
|
|
dateString = ""
|
|
elapsedTime = time.Since(s.startTime)
|
|
elapsedTimeSecondsOnly = elapsedTime.Truncate(time.Second/10) % time.Minute
|
|
displaySpeedString string
|
|
)
|
|
|
|
if s.ci.DataRateUnit == "bits" {
|
|
displaySpeedString = fs.SizeSuffix(ts.speed * 8).BitRateUnit()
|
|
} else {
|
|
displaySpeedString = fs.SizeSuffix(ts.speed).ByteRateUnit()
|
|
}
|
|
|
|
if !s.ci.StatsOneLine {
|
|
_, _ = fmt.Fprintf(buf, "\nTransferred: ")
|
|
} else {
|
|
xfrchk := []string{}
|
|
if ts.totalTransfers > 0 && s.transferQueue > 0 {
|
|
xfrchk = append(xfrchk, fmt.Sprintf("xfr#%d/%d", s.transfers, ts.totalTransfers))
|
|
}
|
|
if ts.totalChecks > 0 && s.checkQueue > 0 {
|
|
xfrchk = append(xfrchk, fmt.Sprintf("chk#%d/%d", s.checks, ts.totalChecks))
|
|
}
|
|
if len(xfrchk) > 0 {
|
|
xfrchkString = fmt.Sprintf(" (%s)", strings.Join(xfrchk, ", "))
|
|
}
|
|
if s.ci.StatsOneLineDate {
|
|
t := time.Now()
|
|
dateString = t.Format(s.ci.StatsOneLineDateFormat) // Including the separator so people can customize it
|
|
}
|
|
}
|
|
|
|
_, _ = fmt.Fprintf(buf, "%s%13s / %s, %s, %s, ETA %s%s",
|
|
dateString,
|
|
fs.SizeSuffix(s.bytes).ByteUnit(),
|
|
fs.SizeSuffix(ts.totalBytes).ByteUnit(),
|
|
percent(s.bytes, ts.totalBytes),
|
|
displaySpeedString,
|
|
etaString(s.bytes, ts.totalBytes, ts.speed),
|
|
xfrchkString,
|
|
)
|
|
|
|
if s.ci.ProgressTerminalTitle {
|
|
// Writes ETA to the terminal title
|
|
terminal.WriteTerminalTitle("ETA: " + etaString(s.bytes, ts.totalBytes, ts.speed))
|
|
}
|
|
|
|
if !s.ci.StatsOneLine {
|
|
_, _ = buf.WriteRune('\n')
|
|
errorDetails := ""
|
|
switch {
|
|
case s.fatalError:
|
|
errorDetails = " (fatal error encountered)"
|
|
case s.retryError:
|
|
errorDetails = " (retrying may help)"
|
|
case s.errors != 0:
|
|
errorDetails = " (no need to retry)"
|
|
|
|
}
|
|
|
|
// Add only non zero stats
|
|
if s.errors != 0 {
|
|
_, _ = fmt.Fprintf(buf, "Errors: %10d%s\n",
|
|
s.errors, errorDetails)
|
|
}
|
|
if s.checks != 0 || ts.totalChecks != 0 {
|
|
_, _ = fmt.Fprintf(buf, "Checks: %10d / %d, %s\n",
|
|
s.checks, ts.totalChecks, percent(s.checks, ts.totalChecks))
|
|
}
|
|
if s.deletes != 0 || s.deletedDirs != 0 {
|
|
_, _ = fmt.Fprintf(buf, "Deleted: %10d (files), %d (dirs)\n", s.deletes, s.deletedDirs)
|
|
}
|
|
if s.renames != 0 {
|
|
_, _ = fmt.Fprintf(buf, "Renamed: %10d\n", s.renames)
|
|
}
|
|
if s.transfers != 0 || ts.totalTransfers != 0 {
|
|
_, _ = fmt.Fprintf(buf, "Transferred: %10d / %d, %s\n",
|
|
s.transfers, ts.totalTransfers, percent(s.transfers, ts.totalTransfers))
|
|
}
|
|
_, _ = fmt.Fprintf(buf, "Elapsed time: %10ss\n", strings.TrimRight(elapsedTime.Truncate(time.Minute).String(), "0s")+fmt.Sprintf("%.1f", elapsedTimeSecondsOnly.Seconds()))
|
|
}
|
|
|
|
// checking and transferring have their own locking so unlock
|
|
// here to prevent deadlock on GetBytes
|
|
s.mu.RUnlock()
|
|
|
|
// Add per transfer stats if required
|
|
if !s.ci.StatsOneLine {
|
|
if !s.checking.empty() {
|
|
_, _ = fmt.Fprintf(buf, "Checking:\n%s\n", s.checking.String(s.ctx, s.inProgress, s.transferring))
|
|
}
|
|
if !s.transferring.empty() {
|
|
_, _ = fmt.Fprintf(buf, "Transferring:\n%s\n", s.transferring.String(s.ctx, s.inProgress, nil))
|
|
}
|
|
}
|
|
|
|
return buf.String()
|
|
}
|
|
|
|
// Transferred returns list of all completed transfers including checked and
|
|
// failed ones.
|
|
func (s *StatsInfo) Transferred() []TransferSnapshot {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
ts := make([]TransferSnapshot, 0, len(s.startedTransfers))
|
|
|
|
for _, tr := range s.startedTransfers {
|
|
if tr.IsDone() {
|
|
ts = append(ts, tr.Snapshot())
|
|
}
|
|
}
|
|
|
|
return ts
|
|
}
|
|
|
|
// Log outputs the StatsInfo to the log
|
|
func (s *StatsInfo) Log() {
|
|
if s.ci.UseJSONLog {
|
|
out, _ := s.RemoteStats()
|
|
fs.LogLevelPrintf(s.ci.StatsLogLevel, nil, "%v%v\n", s, fs.LogValueHide("stats", out))
|
|
} else {
|
|
fs.LogLevelPrintf(s.ci.StatsLogLevel, nil, "%v\n", s)
|
|
}
|
|
|
|
}
|
|
|
|
// Bytes updates the stats for bytes bytes
|
|
func (s *StatsInfo) Bytes(bytes int64) {
|
|
s.average.mu.Lock()
|
|
s.average.lpBytes += bytes
|
|
s.average.mu.Unlock()
|
|
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.bytes += bytes
|
|
}
|
|
|
|
// GetBytes returns the number of bytes transferred so far
|
|
func (s *StatsInfo) GetBytes() int64 {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.bytes
|
|
}
|
|
|
|
// GetBytesWithPending returns the number of bytes transferred and remaining transfers
|
|
func (s *StatsInfo) GetBytesWithPending() int64 {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
pending := int64(0)
|
|
for _, tr := range s.startedTransfers {
|
|
if tr.acc != nil {
|
|
bytes, size := tr.acc.progress()
|
|
if bytes < size {
|
|
pending += size - bytes
|
|
}
|
|
}
|
|
}
|
|
return s.bytes + pending
|
|
}
|
|
|
|
// Errors updates the stats for errors
|
|
func (s *StatsInfo) Errors(errors int64) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.errors += errors
|
|
}
|
|
|
|
// GetErrors reads the number of errors
|
|
func (s *StatsInfo) GetErrors() int64 {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.errors
|
|
}
|
|
|
|
// GetLastError returns the lastError
|
|
func (s *StatsInfo) GetLastError() error {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.lastError
|
|
}
|
|
|
|
// GetChecks returns the number of checks
|
|
func (s *StatsInfo) GetChecks() int64 {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.checks
|
|
}
|
|
|
|
// FatalError sets the fatalError flag
|
|
func (s *StatsInfo) FatalError() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.fatalError = true
|
|
}
|
|
|
|
// HadFatalError returns whether there has been at least one FatalError
|
|
func (s *StatsInfo) HadFatalError() bool {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.fatalError
|
|
}
|
|
|
|
// RetryError sets the retryError flag
|
|
func (s *StatsInfo) RetryError() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.retryError = true
|
|
}
|
|
|
|
// HadRetryError returns whether there has been at least one non-NoRetryError
|
|
func (s *StatsInfo) HadRetryError() bool {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.retryError
|
|
}
|
|
|
|
// Deletes updates the stats for deletes
|
|
func (s *StatsInfo) Deletes(deletes int64) int64 {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.deletes += deletes
|
|
return s.deletes
|
|
}
|
|
|
|
// DeletedDirs updates the stats for deletedDirs
|
|
func (s *StatsInfo) DeletedDirs(deletedDirs int64) int64 {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.deletedDirs += deletedDirs
|
|
return s.deletedDirs
|
|
}
|
|
|
|
// Renames updates the stats for renames
|
|
func (s *StatsInfo) Renames(renames int64) int64 {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.renames += renames
|
|
return s.renames
|
|
}
|
|
|
|
// ResetCounters sets the counters (bytes, checks, errors, transfers, deletes, renames) to 0 and resets lastError, fatalError and retryError
|
|
func (s *StatsInfo) ResetCounters() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.bytes = 0
|
|
s.errors = 0
|
|
s.lastError = nil
|
|
s.fatalError = false
|
|
s.retryError = false
|
|
s.retryAfter = time.Time{}
|
|
s.checks = 0
|
|
s.transfers = 0
|
|
s.deletes = 0
|
|
s.deletedDirs = 0
|
|
s.renames = 0
|
|
s.startedTransfers = nil
|
|
s.oldDuration = 0
|
|
|
|
s.stopAverageLoop()
|
|
s.average = averageValues{stop: make(chan bool)}
|
|
}
|
|
|
|
// ResetErrors sets the errors count to 0 and resets lastError, fatalError and retryError
|
|
func (s *StatsInfo) ResetErrors() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.errors = 0
|
|
s.lastError = nil
|
|
s.fatalError = false
|
|
s.retryError = false
|
|
s.retryAfter = time.Time{}
|
|
}
|
|
|
|
// Errored returns whether there have been any errors
|
|
func (s *StatsInfo) Errored() bool {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.errors != 0
|
|
}
|
|
|
|
// Error adds a single error into the stats, assigns lastError and eventually sets fatalError or retryError
|
|
func (s *StatsInfo) Error(err error) error {
|
|
if err == nil || fserrors.IsCounted(err) {
|
|
return err
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.errors++
|
|
s.lastError = err
|
|
err = fserrors.FsError(err)
|
|
fserrors.Count(err)
|
|
switch {
|
|
case fserrors.IsFatalError(err):
|
|
s.fatalError = true
|
|
case fserrors.IsRetryAfterError(err):
|
|
retryAfter := fserrors.RetryAfterErrorTime(err)
|
|
if s.retryAfter.IsZero() || retryAfter.Sub(s.retryAfter) > 0 {
|
|
s.retryAfter = retryAfter
|
|
}
|
|
s.retryError = true
|
|
case !fserrors.IsNoRetryError(err):
|
|
s.retryError = true
|
|
}
|
|
return err
|
|
}
|
|
|
|
// RetryAfter returns the time to retry after if it is set. It will
|
|
// be Zero if it isn't set.
|
|
func (s *StatsInfo) RetryAfter() time.Time {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
return s.retryAfter
|
|
}
|
|
|
|
// NewCheckingTransfer adds a checking transfer to the stats, from the object.
|
|
func (s *StatsInfo) NewCheckingTransfer(obj fs.Object) *Transfer {
|
|
tr := newCheckingTransfer(s, obj)
|
|
s.checking.add(tr)
|
|
return tr
|
|
}
|
|
|
|
// DoneChecking removes a check from the stats
|
|
func (s *StatsInfo) DoneChecking(remote string) {
|
|
s.checking.del(remote)
|
|
s.mu.Lock()
|
|
s.checks++
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// GetTransfers reads the number of transfers
|
|
func (s *StatsInfo) GetTransfers() int64 {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.transfers
|
|
}
|
|
|
|
// NewTransfer adds a transfer to the stats from the object.
|
|
func (s *StatsInfo) NewTransfer(obj fs.Object) *Transfer {
|
|
tr := newTransfer(s, obj)
|
|
s.transferring.add(tr)
|
|
s.startAverageLoop()
|
|
return tr
|
|
}
|
|
|
|
// NewTransferRemoteSize adds a transfer to the stats based on remote and size.
|
|
func (s *StatsInfo) NewTransferRemoteSize(remote string, size int64) *Transfer {
|
|
tr := newTransferRemoteSize(s, remote, size, false)
|
|
s.transferring.add(tr)
|
|
s.startAverageLoop()
|
|
return tr
|
|
}
|
|
|
|
// DoneTransferring removes a transfer from the stats
|
|
//
|
|
// if ok is true and it was in the transfermap (to avoid incrementing in case of nested calls, #6213) then it increments the transfers count
|
|
func (s *StatsInfo) DoneTransferring(remote string, ok bool) {
|
|
existed := s.transferring.del(remote)
|
|
if ok && existed {
|
|
s.mu.Lock()
|
|
s.transfers++
|
|
s.mu.Unlock()
|
|
}
|
|
if s.transferring.empty() {
|
|
time.AfterFunc(averageStopAfter, s.stopAverageLoop)
|
|
}
|
|
}
|
|
|
|
// SetCheckQueue sets the number of queued checks
|
|
func (s *StatsInfo) SetCheckQueue(n int, size int64) {
|
|
s.mu.Lock()
|
|
s.checkQueue = n
|
|
s.checkQueueSize = size
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// SetTransferQueue sets the number of queued transfers
|
|
func (s *StatsInfo) SetTransferQueue(n int, size int64) {
|
|
s.mu.Lock()
|
|
s.transferQueue = n
|
|
s.transferQueueSize = size
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// SetRenameQueue sets the number of queued transfers
|
|
func (s *StatsInfo) SetRenameQueue(n int, size int64) {
|
|
s.mu.Lock()
|
|
s.renameQueue = n
|
|
s.renameQueueSize = size
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// AddTransfer adds reference to the started transfer.
|
|
func (s *StatsInfo) AddTransfer(transfer *Transfer) {
|
|
s.mu.Lock()
|
|
s.startedTransfers = append(s.startedTransfers, transfer)
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// removeTransfer removes a reference to the started transfer in
|
|
// position i.
|
|
//
|
|
// Must be called with the lock held
|
|
func (s *StatsInfo) removeTransfer(transfer *Transfer, i int) {
|
|
now := time.Now()
|
|
|
|
// add finished transfer onto old time ranges
|
|
start, end := transfer.TimeRange()
|
|
if end.IsZero() {
|
|
end = now
|
|
}
|
|
s.oldTimeRanges = append(s.oldTimeRanges, timeRange{start, end})
|
|
s.oldTimeRanges.merge()
|
|
|
|
// remove the found entry
|
|
s.startedTransfers = append(s.startedTransfers[:i], s.startedTransfers[i+1:]...)
|
|
|
|
// Find youngest active transfer
|
|
oldestStart := now
|
|
for i := range s.startedTransfers {
|
|
start, _ := s.startedTransfers[i].TimeRange()
|
|
if start.Before(oldestStart) {
|
|
oldestStart = start
|
|
}
|
|
}
|
|
|
|
// remove old entries older than that
|
|
s.oldDuration += s.oldTimeRanges.cull(oldestStart)
|
|
}
|
|
|
|
// RemoveTransfer removes a reference to the started transfer.
|
|
func (s *StatsInfo) RemoveTransfer(transfer *Transfer) {
|
|
s.mu.Lock()
|
|
for i, tr := range s.startedTransfers {
|
|
if tr == transfer {
|
|
s.removeTransfer(tr, i)
|
|
break
|
|
}
|
|
}
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// PruneTransfers makes sure there aren't too many old transfers by removing
|
|
// single finished transfer.
|
|
func (s *StatsInfo) PruneTransfers() {
|
|
if MaxCompletedTransfers < 0 {
|
|
return
|
|
}
|
|
s.mu.Lock()
|
|
// remove a transfer from the start if we are over quota
|
|
if len(s.startedTransfers) > MaxCompletedTransfers+s.ci.Transfers {
|
|
for i, tr := range s.startedTransfers {
|
|
if tr.IsDone() {
|
|
s.removeTransfer(tr, i)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
s.mu.Unlock()
|
|
}
|