accounting: refactor bwlimit code to allow for multiple slots
This commit is contained in:
parent
31de631b22
commit
3b6df71838
7 changed files with 127 additions and 90 deletions
|
@ -10,7 +10,6 @@ import (
|
||||||
"unicode/utf8"
|
"unicode/utf8"
|
||||||
|
|
||||||
"github.com/rclone/rclone/fs/rc"
|
"github.com/rclone/rclone/fs/rc"
|
||||||
"golang.org/x/time/rate"
|
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/rclone/rclone/fs"
|
"github.com/rclone/rclone/fs"
|
||||||
|
@ -50,7 +49,7 @@ type Account struct {
|
||||||
exit chan struct{} // channel that will be closed when transfer is finished
|
exit chan struct{} // channel that will be closed when transfer is finished
|
||||||
withBuf bool // is using a buffered in
|
withBuf bool // is using a buffered in
|
||||||
|
|
||||||
tokenBucket *rate.Limiter // per file bandwidth limiter (may be nil)
|
tokenBucket buckets // per file bandwidth limiter (may be nil)
|
||||||
|
|
||||||
values accountValues
|
values accountValues
|
||||||
}
|
}
|
||||||
|
@ -289,7 +288,7 @@ func (acc *Account) DryRun(n int64) {
|
||||||
// Account for n bytes from the current file bandwidth limit (if any)
|
// Account for n bytes from the current file bandwidth limit (if any)
|
||||||
func (acc *Account) limitPerFileBandwidth(n int) {
|
func (acc *Account) limitPerFileBandwidth(n int) {
|
||||||
acc.values.mu.Lock()
|
acc.values.mu.Lock()
|
||||||
tokenBucket := acc.tokenBucket
|
tokenBucket := acc.tokenBucket[TokenBucketSlotAccounting]
|
||||||
acc.values.mu.Unlock()
|
acc.values.mu.Unlock()
|
||||||
|
|
||||||
if tokenBucket != nil {
|
if tokenBucket != nil {
|
||||||
|
@ -310,7 +309,7 @@ func (acc *Account) accountRead(n int) {
|
||||||
|
|
||||||
acc.stats.Bytes(int64(n))
|
acc.stats.Bytes(int64(n))
|
||||||
|
|
||||||
limitBandwidth(n)
|
TokenBucket.LimitBandwidth(TokenBucketSlotAccounting, n)
|
||||||
acc.limitPerFileBandwidth(n)
|
acc.limitPerFileBandwidth(n)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7,4 +7,4 @@ package accounting
|
||||||
|
|
||||||
// startSignalHandler() is Unix specific and does nothing under non-Unix
|
// startSignalHandler() is Unix specific and does nothing under non-Unix
|
||||||
// platforms.
|
// platforms.
|
||||||
func startSignalHandler() {}
|
func (tb *tokenBucket) startSignalHandler() {}
|
||||||
|
|
|
@ -14,7 +14,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// startSignalHandler() sets a signal handler to catch SIGUSR2 and toggle throttling.
|
// startSignalHandler() sets a signal handler to catch SIGUSR2 and toggle throttling.
|
||||||
func startSignalHandler() {
|
func (tb *tokenBucket) startSignalHandler() {
|
||||||
signals := make(chan os.Signal, 1)
|
signals := make(chan os.Signal, 1)
|
||||||
signal.Notify(signals, syscall.SIGUSR2)
|
signal.Notify(signals, syscall.SIGUSR2)
|
||||||
|
|
||||||
|
@ -22,14 +22,14 @@ func startSignalHandler() {
|
||||||
// This runs forever, but blocks until the signal is received.
|
// This runs forever, but blocks until the signal is received.
|
||||||
for {
|
for {
|
||||||
<-signals
|
<-signals
|
||||||
tokenBucketMu.Lock()
|
tb.mu.Lock()
|
||||||
bwLimitToggledOff = !bwLimitToggledOff
|
tb.toggledOff = !tb.toggledOff
|
||||||
tokenBucket, prevTokenBucket = prevTokenBucket, tokenBucket
|
tb.curr, tb.prev = tb.prev, tb.curr
|
||||||
s := "disabled"
|
s := "disabled"
|
||||||
if tokenBucket != nil {
|
if !tb.curr._isOff() {
|
||||||
s = "enabled"
|
s = "enabled"
|
||||||
}
|
}
|
||||||
tokenBucketMu.Unlock()
|
tb.mu.Unlock()
|
||||||
fs.Logf(nil, "Bandwidth limit %s by user", s)
|
fs.Logf(nil, "Bandwidth limit %s by user", s)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
|
@ -11,48 +11,79 @@ import (
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Globals
|
// TokenBucket holds the global token bucket limiter
|
||||||
var (
|
var TokenBucket tokenBucket
|
||||||
tokenBucketMu sync.Mutex // protects the token bucket variables
|
|
||||||
tokenBucket *rate.Limiter
|
// TokenBucketSlot is the type to select which token bucket to use
|
||||||
prevTokenBucket = tokenBucket
|
type TokenBucketSlot int
|
||||||
bwLimitToggledOff = false
|
|
||||||
currLimitMu sync.Mutex // protects changes to the timeslot
|
// Slots for the token bucket
|
||||||
currLimit fs.BwTimeSlot
|
const (
|
||||||
|
TokenBucketSlotAccounting TokenBucketSlot = iota
|
||||||
|
TokenBucketSlots
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type buckets [TokenBucketSlots]*rate.Limiter
|
||||||
|
|
||||||
|
// tokenBucket holds info about the rate limiters in use
|
||||||
|
type tokenBucket struct {
|
||||||
|
mu sync.RWMutex // protects the token bucket variables
|
||||||
|
curr buckets
|
||||||
|
prev buckets
|
||||||
|
toggledOff bool
|
||||||
|
currLimitMu sync.Mutex // protects changes to the timeslot
|
||||||
|
currLimit fs.BwTimeSlot
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return true if limit is disabled
|
||||||
|
//
|
||||||
|
// Call with lock held
|
||||||
|
func (bs *buckets) _isOff() bool {
|
||||||
|
return bs[0] == nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Disable the limits
|
||||||
|
//
|
||||||
|
// Call with lock held
|
||||||
|
func (bs *buckets) _setOff() {
|
||||||
|
for i := range bs {
|
||||||
|
bs[i] = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
const maxBurstSize = 4 * 1024 * 1024 // must be bigger than the biggest request
|
const maxBurstSize = 4 * 1024 * 1024 // must be bigger than the biggest request
|
||||||
|
|
||||||
// make a new empty token bucket with the bandwidth given
|
// make a new empty token bucket with the bandwidth given
|
||||||
func newTokenBucket(bandwidth fs.SizeSuffix) *rate.Limiter {
|
func newTokenBucket(bandwidth fs.SizeSuffix) (newTokenBucket buckets) {
|
||||||
newTokenBucket := rate.NewLimiter(rate.Limit(bandwidth), maxBurstSize)
|
for i := range newTokenBucket {
|
||||||
// empty the bucket
|
newTokenBucket[i] = rate.NewLimiter(rate.Limit(bandwidth), maxBurstSize)
|
||||||
err := newTokenBucket.WaitN(context.Background(), maxBurstSize)
|
// empty the bucket
|
||||||
if err != nil {
|
err := newTokenBucket[i].WaitN(context.Background(), maxBurstSize)
|
||||||
fs.Errorf(nil, "Failed to empty token bucket: %v", err)
|
if err != nil {
|
||||||
|
fs.Errorf(nil, "Failed to empty token bucket: %v", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return newTokenBucket
|
return newTokenBucket
|
||||||
}
|
}
|
||||||
|
|
||||||
// StartTokenBucket starts the token bucket if necessary
|
// StartTokenBucket starts the token bucket if necessary
|
||||||
func StartTokenBucket(ctx context.Context) {
|
func (tb *tokenBucket) StartTokenBucket(ctx context.Context) {
|
||||||
|
tb.mu.Lock()
|
||||||
|
defer tb.mu.Unlock()
|
||||||
ci := fs.GetConfig(ctx)
|
ci := fs.GetConfig(ctx)
|
||||||
currLimitMu.Lock()
|
tb.currLimit = ci.BwLimit.LimitAt(time.Now())
|
||||||
currLimit := ci.BwLimit.LimitAt(time.Now())
|
if tb.currLimit.Bandwidth > 0 {
|
||||||
currLimitMu.Unlock()
|
tb.curr = newTokenBucket(tb.currLimit.Bandwidth)
|
||||||
|
fs.Infof(nil, "Starting bandwidth limiter at %vBytes/s", &tb.currLimit.Bandwidth)
|
||||||
if currLimit.Bandwidth > 0 {
|
|
||||||
tokenBucket = newTokenBucket(currLimit.Bandwidth)
|
|
||||||
fs.Infof(nil, "Starting bandwidth limiter at %vBytes/s", &currLimit.Bandwidth)
|
|
||||||
|
|
||||||
// Start the SIGUSR2 signal handler to toggle bandwidth.
|
// Start the SIGUSR2 signal handler to toggle bandwidth.
|
||||||
// This function does nothing in windows systems.
|
// This function does nothing in windows systems.
|
||||||
startSignalHandler()
|
tb.startSignalHandler()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// StartTokenTicker creates a ticker to update the bandwidth limiter every minute.
|
// StartTokenTicker creates a ticker to update the bandwidth limiter every minute.
|
||||||
func StartTokenTicker(ctx context.Context) {
|
func (tb *tokenBucket) StartTokenTicker(ctx context.Context) {
|
||||||
ci := fs.GetConfig(ctx)
|
ci := fs.GetConfig(ctx)
|
||||||
// If the timetable has a single entry or was not specified, we don't need
|
// If the timetable has a single entry or was not specified, we don't need
|
||||||
// a ticker to update the bandwidth.
|
// a ticker to update the bandwidth.
|
||||||
|
@ -64,102 +95,109 @@ func StartTokenTicker(ctx context.Context) {
|
||||||
go func() {
|
go func() {
|
||||||
for range ticker.C {
|
for range ticker.C {
|
||||||
limitNow := ci.BwLimit.LimitAt(time.Now())
|
limitNow := ci.BwLimit.LimitAt(time.Now())
|
||||||
currLimitMu.Lock()
|
tb.currLimitMu.Lock()
|
||||||
|
|
||||||
if currLimit.Bandwidth != limitNow.Bandwidth {
|
if tb.currLimit.Bandwidth != limitNow.Bandwidth {
|
||||||
tokenBucketMu.Lock()
|
tb.mu.Lock()
|
||||||
|
|
||||||
// If bwlimit is toggled off, the change should only
|
// If bwlimit is toggled off, the change should only
|
||||||
// become active on the next toggle, which causes
|
// become active on the next toggle, which causes
|
||||||
// an exchange of tokenBucket <-> prevTokenBucket
|
// an exchange of tb.curr <-> tb.prev
|
||||||
var targetBucket **rate.Limiter
|
var targetBucket *buckets
|
||||||
if bwLimitToggledOff {
|
if tb.toggledOff {
|
||||||
targetBucket = &prevTokenBucket
|
targetBucket = &tb.prev
|
||||||
} else {
|
} else {
|
||||||
targetBucket = &tokenBucket
|
targetBucket = &tb.curr
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set new bandwidth. If unlimited, set tokenbucket to nil.
|
// Set new bandwidth. If unlimited, set tokenbucket to nil.
|
||||||
if limitNow.Bandwidth > 0 {
|
if limitNow.Bandwidth > 0 {
|
||||||
*targetBucket = newTokenBucket(limitNow.Bandwidth)
|
*targetBucket = newTokenBucket(limitNow.Bandwidth)
|
||||||
if bwLimitToggledOff {
|
if tb.toggledOff {
|
||||||
fs.Logf(nil, "Scheduled bandwidth change. "+
|
fs.Logf(nil, "Scheduled bandwidth change. "+
|
||||||
"Limit will be set to %vBytes/s when toggled on again.", &limitNow.Bandwidth)
|
"Limit will be set to %vBytes/s when toggled on again.", &limitNow.Bandwidth)
|
||||||
} else {
|
} else {
|
||||||
fs.Logf(nil, "Scheduled bandwidth change. Limit set to %vBytes/s", &limitNow.Bandwidth)
|
fs.Logf(nil, "Scheduled bandwidth change. Limit set to %vBytes/s", &limitNow.Bandwidth)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
*targetBucket = nil
|
targetBucket._setOff()
|
||||||
fs.Logf(nil, "Scheduled bandwidth change. Bandwidth limits disabled")
|
fs.Logf(nil, "Scheduled bandwidth change. Bandwidth limits disabled")
|
||||||
}
|
}
|
||||||
|
|
||||||
currLimit = limitNow
|
tb.currLimit = limitNow
|
||||||
tokenBucketMu.Unlock()
|
tb.mu.Unlock()
|
||||||
}
|
}
|
||||||
currLimitMu.Unlock()
|
tb.currLimitMu.Unlock()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
// limitBandwidth sleeps for the correct amount of time for the passage
|
// LimitBandwidth sleeps for the correct amount of time for the passage
|
||||||
// of n bytes according to the current bandwidth limit
|
// of n bytes according to the current bandwidth limit
|
||||||
func limitBandwidth(n int) {
|
func (tb *tokenBucket) LimitBandwidth(i TokenBucketSlot, n int) {
|
||||||
tokenBucketMu.Lock()
|
tb.mu.RLock()
|
||||||
|
|
||||||
// Limit the transfer speed if required
|
// Limit the transfer speed if required
|
||||||
if tokenBucket != nil {
|
if !tb.curr._isOff() {
|
||||||
err := tokenBucket.WaitN(context.Background(), n)
|
err := tb.curr[i].WaitN(context.Background(), n)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fs.Errorf(nil, "Token bucket error: %v", err)
|
fs.Errorf(nil, "Token bucket error: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tokenBucketMu.Unlock()
|
tb.mu.RUnlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetBwLimit sets the current bandwidth limit
|
// SetBwLimit sets the current bandwidth limit
|
||||||
func SetBwLimit(bandwidth fs.SizeSuffix) {
|
func (tb *tokenBucket) SetBwLimit(bandwidth fs.SizeSuffix) {
|
||||||
tokenBucketMu.Lock()
|
tb.mu.Lock()
|
||||||
defer tokenBucketMu.Unlock()
|
defer tb.mu.Unlock()
|
||||||
if bandwidth > 0 {
|
if bandwidth > 0 {
|
||||||
tokenBucket = newTokenBucket(bandwidth)
|
tb.curr = newTokenBucket(bandwidth)
|
||||||
fs.Logf(nil, "Bandwidth limit set to %v", bandwidth)
|
fs.Logf(nil, "Bandwidth limit set to %v", bandwidth)
|
||||||
} else {
|
} else {
|
||||||
tokenBucket = nil
|
tb.curr._setOff()
|
||||||
fs.Logf(nil, "Bandwidth limit reset to unlimited")
|
fs.Logf(nil, "Bandwidth limit reset to unlimited")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// read and set the bandwidth limits
|
||||||
|
func (tb *tokenBucket) rcBwlimit(ctx context.Context, in rc.Params) (out rc.Params, err error) {
|
||||||
|
if in["rate"] != nil {
|
||||||
|
bwlimit, err := in.GetString("rate")
|
||||||
|
if err != nil {
|
||||||
|
return out, err
|
||||||
|
}
|
||||||
|
var bws fs.BwTimetable
|
||||||
|
err = bws.Set(bwlimit)
|
||||||
|
if err != nil {
|
||||||
|
return out, errors.Wrap(err, "bad bwlimit")
|
||||||
|
}
|
||||||
|
if len(bws) != 1 {
|
||||||
|
return out, errors.New("need exactly 1 bandwidth setting")
|
||||||
|
}
|
||||||
|
bw := bws[0]
|
||||||
|
tb.SetBwLimit(bw.Bandwidth)
|
||||||
|
}
|
||||||
|
tb.mu.RLock()
|
||||||
|
bytesPerSecond := int64(-1)
|
||||||
|
if !tb.curr._isOff() {
|
||||||
|
bytesPerSecond = int64(tb.curr[0].Limit())
|
||||||
|
}
|
||||||
|
tb.mu.RUnlock()
|
||||||
|
out = rc.Params{
|
||||||
|
"rate": fs.SizeSuffix(bytesPerSecond).String(),
|
||||||
|
"bytesPerSecond": bytesPerSecond,
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Remote control for the token bucket
|
// Remote control for the token bucket
|
||||||
func init() {
|
func init() {
|
||||||
rc.Add(rc.Call{
|
rc.Add(rc.Call{
|
||||||
Path: "core/bwlimit",
|
Path: "core/bwlimit",
|
||||||
Fn: func(ctx context.Context, in rc.Params) (out rc.Params, err error) {
|
Fn: func(ctx context.Context, in rc.Params) (out rc.Params, err error) {
|
||||||
if in["rate"] != nil {
|
return TokenBucket.rcBwlimit(ctx, in)
|
||||||
bwlimit, err := in.GetString("rate")
|
|
||||||
if err != nil {
|
|
||||||
return out, err
|
|
||||||
}
|
|
||||||
var bws fs.BwTimetable
|
|
||||||
err = bws.Set(bwlimit)
|
|
||||||
if err != nil {
|
|
||||||
return out, errors.Wrap(err, "bad bwlimit")
|
|
||||||
}
|
|
||||||
if len(bws) != 1 {
|
|
||||||
return out, errors.New("need exactly 1 bandwidth setting")
|
|
||||||
}
|
|
||||||
bw := bws[0]
|
|
||||||
SetBwLimit(bw.Bandwidth)
|
|
||||||
}
|
|
||||||
bytesPerSecond := int64(-1)
|
|
||||||
if tokenBucket != nil {
|
|
||||||
bytesPerSecond = int64(tokenBucket.Limit())
|
|
||||||
}
|
|
||||||
out = rc.Params{
|
|
||||||
"rate": fs.SizeSuffix(bytesPerSecond).String(),
|
|
||||||
"bytesPerSecond": bytesPerSecond,
|
|
||||||
}
|
|
||||||
return out, nil
|
|
||||||
},
|
},
|
||||||
Title: "Set the bandwidth limit.",
|
Title: "Set the bandwidth limit.",
|
||||||
Help: `
|
Help: `
|
||||||
|
|
|
@ -24,7 +24,7 @@ func TestRcBwLimit(t *testing.T) {
|
||||||
"bytesPerSecond": int64(1048576),
|
"bytesPerSecond": int64(1048576),
|
||||||
"rate": "1M",
|
"rate": "1M",
|
||||||
}, out)
|
}, out)
|
||||||
assert.Equal(t, rate.Limit(1048576), tokenBucket.Limit())
|
assert.Equal(t, rate.Limit(1048576), TokenBucket.curr[0].Limit())
|
||||||
|
|
||||||
// Query
|
// Query
|
||||||
in = rc.Params{}
|
in = rc.Params{}
|
||||||
|
@ -45,7 +45,7 @@ func TestRcBwLimit(t *testing.T) {
|
||||||
"bytesPerSecond": int64(-1),
|
"bytesPerSecond": int64(-1),
|
||||||
"rate": "off",
|
"rate": "off",
|
||||||
}, out)
|
}, out)
|
||||||
assert.Nil(t, tokenBucket)
|
assert.Nil(t, TokenBucket.curr[0])
|
||||||
|
|
||||||
// Query
|
// Query
|
||||||
in = rc.Params{}
|
in = rc.Params{}
|
||||||
|
|
|
@ -229,10 +229,10 @@ func LoadConfig(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start the token bucket limiter
|
// Start the token bucket limiter
|
||||||
accounting.StartTokenBucket(ctx)
|
accounting.TokenBucket.StartTokenBucket(ctx)
|
||||||
|
|
||||||
// Start the bandwidth update ticker
|
// Start the bandwidth update ticker
|
||||||
accounting.StartTokenTicker(ctx)
|
accounting.TokenBucket.StartTokenTicker(ctx)
|
||||||
|
|
||||||
// Start the transactions per second limiter
|
// Start the transactions per second limiter
|
||||||
accounting.StartLimitTPS(ctx)
|
accounting.StartLimitTPS(ctx)
|
||||||
|
|
|
@ -1079,13 +1079,13 @@ func TestSyncWithMaxDuration(t *testing.T) {
|
||||||
maxDuration := 250 * time.Millisecond
|
maxDuration := 250 * time.Millisecond
|
||||||
ci.MaxDuration = maxDuration
|
ci.MaxDuration = maxDuration
|
||||||
bytesPerSecond := 300
|
bytesPerSecond := 300
|
||||||
accounting.SetBwLimit(fs.SizeSuffix(bytesPerSecond))
|
accounting.TokenBucket.SetBwLimit(fs.SizeSuffix(bytesPerSecond))
|
||||||
oldTransfers := ci.Transfers
|
oldTransfers := ci.Transfers
|
||||||
ci.Transfers = 1
|
ci.Transfers = 1
|
||||||
defer func() {
|
defer func() {
|
||||||
ci.MaxDuration = 0 // reset back to default
|
ci.MaxDuration = 0 // reset back to default
|
||||||
ci.Transfers = oldTransfers
|
ci.Transfers = oldTransfers
|
||||||
accounting.SetBwLimit(fs.SizeSuffix(0))
|
accounting.TokenBucket.SetBwLimit(fs.SizeSuffix(0))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// 5 files of 60 bytes at 60 bytes/s 5 seconds
|
// 5 files of 60 bytes at 60 bytes/s 5 seconds
|
||||||
|
|
Loading…
Add table
Reference in a new issue