fs/accounting: factor into separate files without changing functionality

This commit is contained in:
Nick Craig-Wood 2018-02-01 13:13:24 +00:00
parent 8722403b0d
commit bea02fcf52
5 changed files with 376 additions and 337 deletions

View file

@ -2,342 +2,16 @@
package accounting
import (
"bytes"
"fmt"
"io"
"sort"
"strings"
"sync"
"time"
"github.com/VividCortex/ewma"
"github.com/ncw/rclone/fs"
"github.com/ncw/rclone/fs/asyncreader"
"golang.org/x/net/context" // switch to "context" when we stop supporting go1.6
"golang.org/x/time/rate"
)
// Globals
var (
Stats = NewStats()
tokenBucketMu sync.Mutex // protects the token bucket variables
tokenBucket *rate.Limiter
prevTokenBucket = tokenBucket
bwLimitToggledOff = false
currLimitMu sync.Mutex // protects changes to the timeslot
currLimit fs.BwTimeSlot
)
func init() {
// Set the function pointer up in fs
fs.CountError = Stats.Error
}
const maxBurstSize = 1 * 1024 * 1024 // must be bigger than the biggest request
// make a new empty token bucket with the bandwidth given
func newTokenBucket(bandwidth fs.SizeSuffix) *rate.Limiter {
newTokenBucket := rate.NewLimiter(rate.Limit(bandwidth), maxBurstSize)
// empty the bucket
err := newTokenBucket.WaitN(context.Background(), maxBurstSize)
if err != nil {
fs.Errorf(nil, "Failed to empty token bucket: %v", err)
}
return newTokenBucket
}
// StartTokenBucket starts the token bucket if necessary
func StartTokenBucket() {
currLimitMu.Lock()
currLimit := fs.Config.BwLimit.LimitAt(time.Now())
currLimitMu.Unlock()
if currLimit.Bandwidth > 0 {
tokenBucket = newTokenBucket(currLimit.Bandwidth)
fs.Infof(nil, "Starting bandwidth limiter at %vBytes/s", &currLimit.Bandwidth)
// Start the SIGUSR2 signal handler to toggle bandwidth.
// This function does nothing in windows systems.
startSignalHandler()
}
}
// StartTokenTicker creates a ticker to update the bandwidth limiter every minute.
func StartTokenTicker() {
// If the timetable has a single entry or was not specified, we don't need
// a ticker to update the bandwidth.
if len(fs.Config.BwLimit) <= 1 {
return
}
ticker := time.NewTicker(time.Minute)
go func() {
for range ticker.C {
limitNow := fs.Config.BwLimit.LimitAt(time.Now())
currLimitMu.Lock()
if currLimit.Bandwidth != limitNow.Bandwidth {
tokenBucketMu.Lock()
// If bwlimit is toggled off, the change should only
// become active on the next toggle, which causes
// an exchange of tokenBucket <-> prevTokenBucket
var targetBucket **rate.Limiter
if bwLimitToggledOff {
targetBucket = &prevTokenBucket
} else {
targetBucket = &tokenBucket
}
// Set new bandwidth. If unlimited, set tokenbucket to nil.
if limitNow.Bandwidth > 0 {
*targetBucket = newTokenBucket(limitNow.Bandwidth)
if bwLimitToggledOff {
fs.Logf(nil, "Scheduled bandwidth change. "+
"Limit will be set to %vBytes/s when toggled on again.", &limitNow.Bandwidth)
} else {
fs.Logf(nil, "Scheduled bandwidth change. Limit set to %vBytes/s", &limitNow.Bandwidth)
}
} else {
*targetBucket = nil
fs.Logf(nil, "Scheduled bandwidth change. Bandwidth limits disabled")
}
currLimit = limitNow
tokenBucketMu.Unlock()
}
currLimitMu.Unlock()
}
}()
}
// stringSet holds a set of strings
type stringSet map[string]struct{}
// inProgress holds a synchronizes map of in progress transfers
type inProgress struct {
mu sync.Mutex
m map[string]*Account
}
// newInProgress makes a new inProgress object
func newInProgress() *inProgress {
return &inProgress{
m: make(map[string]*Account, fs.Config.Transfers),
}
}
// set marks the name as in progress
func (ip *inProgress) set(name string, acc *Account) {
ip.mu.Lock()
defer ip.mu.Unlock()
ip.m[name] = acc
}
// clear marks the name as no longer in progress
func (ip *inProgress) clear(name string) {
ip.mu.Lock()
defer ip.mu.Unlock()
delete(ip.m, name)
}
// get gets the account for name, of nil if not found
func (ip *inProgress) get(name string) *Account {
ip.mu.Lock()
defer ip.mu.Unlock()
return ip.m[name]
}
// Strings returns all the strings in the stringSet
func (ss stringSet) Strings() []string {
strings := make([]string, 0, len(ss))
for name := range ss {
var out string
if acc := Stats.inProgress.get(name); acc != nil {
out = acc.String()
} else {
out = name
}
strings = append(strings, " * "+out)
}
sorted := sort.StringSlice(strings)
sorted.Sort()
return sorted
}
// String returns all the file names in the stringSet joined by newline
func (ss stringSet) String() string {
return strings.Join(ss.Strings(), "\n")
}
// StatsInfo limits and accounts all transfers
type StatsInfo struct {
lock sync.RWMutex
bytes int64
errors int64
lastError error
checks int64
checking stringSet
transfers int64
transferring stringSet
start time.Time
inProgress *inProgress
}
// NewStats cretates an initialised StatsInfo
func NewStats() *StatsInfo {
return &StatsInfo{
checking: make(stringSet, fs.Config.Checkers),
transferring: make(stringSet, fs.Config.Transfers),
start: time.Now(),
inProgress: newInProgress(),
}
}
// String convert the StatsInfo to a string for printing
func (s *StatsInfo) String() string {
s.lock.RLock()
defer s.lock.RUnlock()
dt := time.Now().Sub(s.start)
dtSeconds := dt.Seconds()
speed := 0.0
if dt > 0 {
speed = float64(s.bytes) / dtSeconds
}
dtRounded := dt - (dt % (time.Second / 10))
buf := &bytes.Buffer{}
if fs.Config.DataRateUnit == "bits" {
speed = speed * 8
}
fmt.Fprintf(buf, `
Transferred: %10s (%s)
Errors: %10d
Checks: %10d
Transferred: %10d
Elapsed time: %10v
`,
fs.SizeSuffix(s.bytes).Unit("Bytes"), fs.SizeSuffix(speed).Unit(strings.Title(fs.Config.DataRateUnit)+"/s"),
s.errors,
s.checks,
s.transfers,
dtRounded)
if len(s.checking) > 0 {
fmt.Fprintf(buf, "Checking:\n%s\n", s.checking)
}
if len(s.transferring) > 0 {
fmt.Fprintf(buf, "Transferring:\n%s\n", s.transferring)
}
return buf.String()
}
// Log outputs the StatsInfo to the log
func (s *StatsInfo) Log() {
fs.LogLevelPrintf(fs.Config.StatsLogLevel, nil, "%v\n", s)
}
// Bytes updates the stats for bytes bytes
func (s *StatsInfo) Bytes(bytes int64) {
s.lock.Lock()
defer s.lock.Unlock()
s.bytes += bytes
}
// Errors updates the stats for errors
func (s *StatsInfo) Errors(errors int64) {
s.lock.Lock()
defer s.lock.Unlock()
s.errors += errors
}
// GetErrors reads the number of errors
func (s *StatsInfo) GetErrors() int64 {
s.lock.RLock()
defer s.lock.RUnlock()
return s.errors
}
// GetLastError returns the lastError
func (s *StatsInfo) GetLastError() error {
s.lock.RLock()
defer s.lock.RUnlock()
return s.lastError
}
// ResetCounters sets the counters (bytes, checks, errors, transfers) to 0
func (s *StatsInfo) ResetCounters() {
s.lock.RLock()
defer s.lock.RUnlock()
s.bytes = 0
s.errors = 0
s.checks = 0
s.transfers = 0
}
// ResetErrors sets the errors count to 0
func (s *StatsInfo) ResetErrors() {
s.lock.RLock()
defer s.lock.RUnlock()
s.errors = 0
}
// Errored returns whether there have been any errors
func (s *StatsInfo) Errored() bool {
s.lock.RLock()
defer s.lock.RUnlock()
return s.errors != 0
}
// Error adds a single error into the stats and assigns lastError
func (s *StatsInfo) Error(err error) {
s.lock.Lock()
defer s.lock.Unlock()
s.errors++
s.lastError = err
}
// Checking adds a check into the stats
func (s *StatsInfo) Checking(remote string) {
s.lock.Lock()
defer s.lock.Unlock()
s.checking[remote] = struct{}{}
}
// DoneChecking removes a check from the stats
func (s *StatsInfo) DoneChecking(remote string) {
s.lock.Lock()
defer s.lock.Unlock()
delete(s.checking, remote)
s.checks++
}
// GetTransfers reads the number of transfers
func (s *StatsInfo) GetTransfers() int64 {
s.lock.RLock()
defer s.lock.RUnlock()
return s.transfers
}
// Transferring adds a transfer into the stats
func (s *StatsInfo) Transferring(remote string) {
s.lock.Lock()
defer s.lock.Unlock()
s.transferring[remote] = struct{}{}
}
// DoneTransferring removes a transfer from the stats
//
// if ok is true then it increments the transfers count
func (s *StatsInfo) DoneTransferring(remote string, ok bool) {
s.lock.Lock()
defer s.lock.Unlock()
delete(s.transferring, remote)
if ok {
s.transfers++
}
}
// Account limits and accounts for one transfer
type Account struct {
// The mutex is to make sure Read() and Close() aren't called
@ -483,17 +157,7 @@ func (acc *Account) read(in io.Reader, p []byte) (n int, err error) {
Stats.Bytes(int64(n))
// Get the token bucket in use
tokenBucketMu.Lock()
// Limit the transfer speed if required
if tokenBucket != nil {
tbErr := tokenBucket.WaitN(context.Background(), n)
if tbErr != nil {
fs.Errorf(nil, "Token bucket error: %v", err)
}
}
tokenBucketMu.Unlock()
limitBandwidth(n)
return
}

View file

@ -0,0 +1,41 @@
package accounting
import (
"sync"
"github.com/ncw/rclone/fs"
)
// inProgress holds a synchronized map of in progress transfers
type inProgress struct {
mu sync.Mutex
m map[string]*Account
}
// newInProgress makes a new inProgress object
func newInProgress() *inProgress {
return &inProgress{
m: make(map[string]*Account, fs.Config.Transfers),
}
}
// set marks the name as in progress
func (ip *inProgress) set(name string, acc *Account) {
ip.mu.Lock()
defer ip.mu.Unlock()
ip.m[name] = acc
}
// clear marks the name as no longer in progress
func (ip *inProgress) clear(name string) {
ip.mu.Lock()
defer ip.mu.Unlock()
delete(ip.m, name)
}
// get gets the account for name, of nil if not found
func (ip *inProgress) get(name string) *Account {
ip.mu.Lock()
defer ip.mu.Unlock()
return ip.m[name]
}

189
fs/accounting/stats.go Normal file
View file

@ -0,0 +1,189 @@
package accounting
import (
"bytes"
"fmt"
"strings"
"sync"
"time"
"github.com/ncw/rclone/fs"
)
var (
// Stats is global statistics counter
Stats = NewStats()
)
func init() {
// Set the function pointer up in fs
fs.CountError = Stats.Error
}
// StatsInfo accounts all transfers
type StatsInfo struct {
lock sync.RWMutex
bytes int64
errors int64
lastError error
checks int64
checking stringSet
transfers int64
transferring stringSet
start time.Time
inProgress *inProgress
}
// NewStats cretates an initialised StatsInfo
func NewStats() *StatsInfo {
return &StatsInfo{
checking: make(stringSet, fs.Config.Checkers),
transferring: make(stringSet, fs.Config.Transfers),
start: time.Now(),
inProgress: newInProgress(),
}
}
// String convert the StatsInfo to a string for printing
func (s *StatsInfo) String() string {
s.lock.RLock()
defer s.lock.RUnlock()
dt := time.Now().Sub(s.start)
dtSeconds := dt.Seconds()
speed := 0.0
if dt > 0 {
speed = float64(s.bytes) / dtSeconds
}
dtRounded := dt - (dt % (time.Second / 10))
buf := &bytes.Buffer{}
if fs.Config.DataRateUnit == "bits" {
speed = speed * 8
}
fmt.Fprintf(buf, `
Transferred: %10s (%s)
Errors: %10d
Checks: %10d
Transferred: %10d
Elapsed time: %10v
`,
fs.SizeSuffix(s.bytes).Unit("Bytes"), fs.SizeSuffix(speed).Unit(strings.Title(fs.Config.DataRateUnit)+"/s"),
s.errors,
s.checks,
s.transfers,
dtRounded)
if len(s.checking) > 0 {
fmt.Fprintf(buf, "Checking:\n%s\n", s.checking)
}
if len(s.transferring) > 0 {
fmt.Fprintf(buf, "Transferring:\n%s\n", s.transferring)
}
return buf.String()
}
// Log outputs the StatsInfo to the log
func (s *StatsInfo) Log() {
fs.LogLevelPrintf(fs.Config.StatsLogLevel, nil, "%v\n", s)
}
// Bytes updates the stats for bytes bytes
func (s *StatsInfo) Bytes(bytes int64) {
s.lock.Lock()
defer s.lock.Unlock()
s.bytes += bytes
}
// Errors updates the stats for errors
func (s *StatsInfo) Errors(errors int64) {
s.lock.Lock()
defer s.lock.Unlock()
s.errors += errors
}
// GetErrors reads the number of errors
func (s *StatsInfo) GetErrors() int64 {
s.lock.RLock()
defer s.lock.RUnlock()
return s.errors
}
// GetLastError returns the lastError
func (s *StatsInfo) GetLastError() error {
s.lock.RLock()
defer s.lock.RUnlock()
return s.lastError
}
// ResetCounters sets the counters (bytes, checks, errors, transfers) to 0
func (s *StatsInfo) ResetCounters() {
s.lock.RLock()
defer s.lock.RUnlock()
s.bytes = 0
s.errors = 0
s.checks = 0
s.transfers = 0
}
// ResetErrors sets the errors count to 0
func (s *StatsInfo) ResetErrors() {
s.lock.RLock()
defer s.lock.RUnlock()
s.errors = 0
}
// Errored returns whether there have been any errors
func (s *StatsInfo) Errored() bool {
s.lock.RLock()
defer s.lock.RUnlock()
return s.errors != 0
}
// Error adds a single error into the stats and assigns lastError
func (s *StatsInfo) Error(err error) {
s.lock.Lock()
defer s.lock.Unlock()
s.errors++
s.lastError = err
}
// Checking adds a check into the stats
func (s *StatsInfo) Checking(remote string) {
s.lock.Lock()
defer s.lock.Unlock()
s.checking[remote] = struct{}{}
}
// DoneChecking removes a check from the stats
func (s *StatsInfo) DoneChecking(remote string) {
s.lock.Lock()
defer s.lock.Unlock()
delete(s.checking, remote)
s.checks++
}
// GetTransfers reads the number of transfers
func (s *StatsInfo) GetTransfers() int64 {
s.lock.RLock()
defer s.lock.RUnlock()
return s.transfers
}
// Transferring adds a transfer into the stats
func (s *StatsInfo) Transferring(remote string) {
s.lock.Lock()
defer s.lock.Unlock()
s.transferring[remote] = struct{}{}
}
// DoneTransferring removes a transfer from the stats
//
// if ok is true then it increments the transfers count
func (s *StatsInfo) DoneTransferring(remote string, ok bool) {
s.lock.Lock()
defer s.lock.Unlock()
delete(s.transferring, remote)
if ok {
s.transfers++
}
}

View file

@ -0,0 +1,31 @@
package accounting
import (
"sort"
"strings"
)
// stringSet holds a set of strings
type stringSet map[string]struct{}
// Strings returns all the strings in the stringSet
func (ss stringSet) Strings() []string {
strings := make([]string, 0, len(ss))
for name := range ss {
var out string
if acc := Stats.inProgress.get(name); acc != nil {
out = acc.String()
} else {
out = name
}
strings = append(strings, " * "+out)
}
sorted := sort.StringSlice(strings)
sorted.Sort()
return sorted
}
// String returns all the file names in the stringSet joined by newline
func (ss stringSet) String() string {
return strings.Join(ss.Strings(), "\n")
}

View file

@ -0,0 +1,114 @@
package accounting
import (
"sync"
"time"
"github.com/ncw/rclone/fs"
"golang.org/x/net/context" // switch to "context" when we stop supporting go1.6
"golang.org/x/time/rate"
)
// Globals
var (
tokenBucketMu sync.Mutex // protects the token bucket variables
tokenBucket *rate.Limiter
prevTokenBucket = tokenBucket
bwLimitToggledOff = false
currLimitMu sync.Mutex // protects changes to the timeslot
currLimit fs.BwTimeSlot
)
const maxBurstSize = 1 * 1024 * 1024 // must be bigger than the biggest request
// make a new empty token bucket with the bandwidth given
func newTokenBucket(bandwidth fs.SizeSuffix) *rate.Limiter {
newTokenBucket := rate.NewLimiter(rate.Limit(bandwidth), maxBurstSize)
// empty the bucket
err := newTokenBucket.WaitN(context.Background(), maxBurstSize)
if err != nil {
fs.Errorf(nil, "Failed to empty token bucket: %v", err)
}
return newTokenBucket
}
// StartTokenBucket starts the token bucket if necessary
func StartTokenBucket() {
currLimitMu.Lock()
currLimit := fs.Config.BwLimit.LimitAt(time.Now())
currLimitMu.Unlock()
if currLimit.Bandwidth > 0 {
tokenBucket = newTokenBucket(currLimit.Bandwidth)
fs.Infof(nil, "Starting bandwidth limiter at %vBytes/s", &currLimit.Bandwidth)
// Start the SIGUSR2 signal handler to toggle bandwidth.
// This function does nothing in windows systems.
startSignalHandler()
}
}
// StartTokenTicker creates a ticker to update the bandwidth limiter every minute.
func StartTokenTicker() {
// If the timetable has a single entry or was not specified, we don't need
// a ticker to update the bandwidth.
if len(fs.Config.BwLimit) <= 1 {
return
}
ticker := time.NewTicker(time.Minute)
go func() {
for range ticker.C {
limitNow := fs.Config.BwLimit.LimitAt(time.Now())
currLimitMu.Lock()
if currLimit.Bandwidth != limitNow.Bandwidth {
tokenBucketMu.Lock()
// If bwlimit is toggled off, the change should only
// become active on the next toggle, which causes
// an exchange of tokenBucket <-> prevTokenBucket
var targetBucket **rate.Limiter
if bwLimitToggledOff {
targetBucket = &prevTokenBucket
} else {
targetBucket = &tokenBucket
}
// Set new bandwidth. If unlimited, set tokenbucket to nil.
if limitNow.Bandwidth > 0 {
*targetBucket = newTokenBucket(limitNow.Bandwidth)
if bwLimitToggledOff {
fs.Logf(nil, "Scheduled bandwidth change. "+
"Limit will be set to %vBytes/s when toggled on again.", &limitNow.Bandwidth)
} else {
fs.Logf(nil, "Scheduled bandwidth change. Limit set to %vBytes/s", &limitNow.Bandwidth)
}
} else {
*targetBucket = nil
fs.Logf(nil, "Scheduled bandwidth change. Bandwidth limits disabled")
}
currLimit = limitNow
tokenBucketMu.Unlock()
}
currLimitMu.Unlock()
}
}()
}
// limitBandwith sleeps for the correct amount of time for the passage
// of n bytes according to the current bandwidth limit
func limitBandwidth(n int) {
tokenBucketMu.Lock()
// Limit the transfer speed if required
if tokenBucket != nil {
err := tokenBucket.WaitN(context.Background(), n)
if err != nil {
fs.Errorf(nil, "Token bucket error: %v", err)
}
}
tokenBucketMu.Unlock()
}