Add time-based bandwidth limits.

- Change the --bwlimit command line parameter to accept both a limit (as
  before) or a full timetable (formatted as "hh:mm,limit
  hh:mm,limit...")
- The timetable is checked once a minute by a ticker function. A new
  tokenBucket is created every time a bandwidth change is necessary.
- This change is compatible with the SIGUSR2 change to toggle bandwidth
  limits.

This resolves #221.
This commit is contained in:
Marco Paganini 2017-01-02 18:52:41 -08:00 committed by Nick Craig-Wood
parent aaeab58ce6
commit 3b0f944e23
4 changed files with 299 additions and 11 deletions

View file

@ -188,15 +188,34 @@ for bytes, `k` for kBytes, `M` for MBytes and `G` for GBytes may be
used. These are the binary units, eg 1, 2\*\*10, 2\*\*20, 2\*\*30 used. These are the binary units, eg 1, 2\*\*10, 2\*\*20, 2\*\*30
respectively. respectively.
### --bwlimit=SIZE ### ### --bwlimit=BANDWIDTH_SPEC ###
Bandwidth limit in kBytes/s, or use suffix b|k|M|G. The default is `0` This option controls the bandwidth limit. Limits can be specified
which means to not limit bandwidth. in two ways: As a single limit, or as a timetable.
Single limits last for the duration of the session. To use a single limit,
specify the desired bandwidth in kBytes/s, or use a suffix b|k|M|G. The
default is `0` which means to not limit bandwidth.
For example to limit bandwidth usage to 10 MBytes/s use `--bwlimit 10M` For example to limit bandwidth usage to 10 MBytes/s use `--bwlimit 10M`
This only limits the bandwidth of the data transfer, it doesn't limit It is also possible to specify a "timetable" of limits, which will cause
the bandwith of the directory listings etc. certain limits to be applied at certain times. To specify a timetable, format your
entries as "HH:MM,BANDWIDTH HH:MM,BANDWITH...".
An example of a typical timetable to avoid link saturation during daytime
working hours could be:
`--bwlimit "08:00,512 12:00,10M 13:00,512 18:00,30M 23:00,off"`
In this example, the transfer bandwidth will be set to 512kBytes/sec at 8am.
At noon, it will raise to 10Mbytes/s, and drop back to 512kBytes/sec at 1pm.
At 6pm, the bandwidth limit will be set to 30MBytes/s, and at 11pm it will be
completely disabled (full speed). Anything between 11pm and 8am will remain
unlimited.
Bandwidth limits only apply to the data transfer. The don't apply to the
bandwith of the directory listings etc.
Note that the units are Bytes/s not Bits/s. Typically connections are Note that the units are Bytes/s not Bits/s. Typically connections are
measured in Bits/s - to convert divide by 8. For example let's say measured in Bits/s - to convert divide by 8. For example let's say

View file

@ -21,13 +21,19 @@ var (
tokenBucketMu sync.Mutex // protects the token bucket variables tokenBucketMu sync.Mutex // protects the token bucket variables
tokenBucket *tb.Bucket tokenBucket *tb.Bucket
prevTokenBucket = tokenBucket prevTokenBucket = tokenBucket
currLimitMu sync.Mutex // protects changes to the timeslot
currLimit BwTimeSlot
) )
// Start the token bucket if necessary // Start the token bucket if necessary
func startTokenBucket() { func startTokenBucket() {
if bwLimit > 0 { currLimitMu.Lock()
tokenBucket = tb.NewBucket(int64(bwLimit), 100*time.Millisecond) currLimit := bwLimit.LimitAt(time.Now())
Log(nil, "Starting bandwidth limiter at %vBytes/s", &bwLimit) currLimitMu.Unlock()
if currLimit.bandwidth > 0 {
tokenBucket = tb.NewBucket(int64(currLimit.bandwidth), 100*time.Millisecond)
Log(nil, "Starting bandwidth limiter at %vBytes/s", &currLimit.bandwidth)
// Start the SIGUSR2 signal handler to toggle bandwidth. // Start the SIGUSR2 signal handler to toggle bandwidth.
// This function does nothing in windows systems. // This function does nothing in windows systems.
@ -35,6 +41,40 @@ func startTokenBucket() {
} }
} }
// startTokenTicker creates a ticker to update the bandwidth limiter every minute.
func startTokenTicker() {
ticker := time.NewTicker(time.Minute)
go func() {
for range ticker.C {
limitNow := bwLimit.LimitAt(time.Now())
currLimitMu.Lock()
if currLimit.bandwidth != limitNow.bandwidth {
tokenBucketMu.Lock()
if tokenBucket != nil {
err := tokenBucket.Close()
if err != nil {
Log(nil, "Error closing token bucket: %v", err)
}
}
// Set new bandwidth. If unlimited, set tokenbucket to nil.
if limitNow.bandwidth > 0 {
tokenBucket = tb.NewBucket(int64(limitNow.bandwidth), 100*time.Millisecond)
Log(nil, "Scheduled bandwidth change. Limit set to %vBytes/s", &limitNow.bandwidth)
} else {
tokenBucket = nil
Log(nil, "Scheduled bandwidth change. Bandwidth limits disabled")
}
currLimit = limitNow
tokenBucketMu.Unlock()
}
currLimitMu.Unlock()
}
}()
}
// stringSet holds a set of strings // stringSet holds a set of strings
type stringSet map[string]struct{} type stringSet map[string]struct{}
@ -394,12 +434,12 @@ func (acc *Account) read(in io.Reader, p []byte) (n int, err error) {
// Get the token bucket in use // Get the token bucket in use
tokenBucketMu.Lock() tokenBucketMu.Lock()
tb := tokenBucket tb := tokenBucket
tokenBucketMu.Unlock()
// Limit the transfer speed if required // Limit the transfer speed if required
if tb != nil { if tb != nil {
tb.Wait(int64(n)) tb.Wait(int64(n))
} }
tokenBucketMu.Unlock()
return return
} }

View file

@ -50,6 +50,15 @@ const (
// SizeSuffix is parsed by flag with k/M/G suffixes // SizeSuffix is parsed by flag with k/M/G suffixes
type SizeSuffix int64 type SizeSuffix int64
// BwTimeSlot represents a bandwidth configuration at a point in time.
type BwTimeSlot struct {
hhmm int
bandwidth SizeSuffix
}
// BwTimetable contains all configured time slots.
type BwTimetable []BwTimeSlot
// Global // Global
var ( var (
// ConfigFile is the config file data structure // ConfigFile is the config file data structure
@ -90,7 +99,7 @@ var (
ignoreSize = pflag.BoolP("ignore-size", "", false, "Ignore size when skipping use mod-time or checksum.") ignoreSize = pflag.BoolP("ignore-size", "", false, "Ignore size when skipping use mod-time or checksum.")
noTraverse = pflag.BoolP("no-traverse", "", false, "Don't traverse destination file system on copy.") noTraverse = pflag.BoolP("no-traverse", "", false, "Don't traverse destination file system on copy.")
noUpdateModTime = pflag.BoolP("no-update-modtime", "", false, "Don't update destination mod-time if files identical.") noUpdateModTime = pflag.BoolP("no-update-modtime", "", false, "Don't update destination mod-time if files identical.")
bwLimit SizeSuffix bwLimit BwTimetable
// Key to use for password en/decryption. // Key to use for password en/decryption.
// When nil, no encryption will be used for saving. // When nil, no encryption will be used for saving.
@ -98,7 +107,7 @@ var (
) )
func init() { func init() {
pflag.VarP(&bwLimit, "bwlimit", "", "Bandwidth limit in kBytes/s, or use suffix b|k|M|G") pflag.VarP(&bwLimit, "bwlimit", "", "Bandwidth limit in kBytes/s, or use suffix b|k|M|G or a full timetable.")
} }
// Turn SizeSuffix into a string and a suffix // Turn SizeSuffix into a string and a suffix
@ -192,6 +201,122 @@ func (x *SizeSuffix) Type() string {
// Check it satisfies the interface // Check it satisfies the interface
var _ pflag.Value = (*SizeSuffix)(nil) var _ pflag.Value = (*SizeSuffix)(nil)
// String returns a printable representation of BwTimetable.
func (x BwTimetable) String() string {
ret := []string{}
for _, ts := range x {
ret = append(ret, fmt.Sprintf("%04.4d,%s", ts.hhmm, ts.bandwidth.String()))
}
return strings.Join(ret, " ")
}
// Set the bandwidth timetable.
func (x *BwTimetable) Set(s string) error {
// The timetable is formatted as:
// "hh:mm,bandwidth hh:mm,banwidth..." ex: "10:00,10G 11:30,1G 18:00,off"
// If only a single bandwidth identifier is provided, we assume constant bandwidth.
if len(s) == 0 {
return errors.New("empty string")
}
// Single value without time specification.
if !strings.Contains(s, " ") && !strings.Contains(s, ",") {
ts := BwTimeSlot{}
if err := ts.bandwidth.Set(s); err != nil {
return err
}
ts.hhmm = 0
*x = BwTimetable{ts}
return nil
}
for _, tok := range strings.Split(s, " ") {
tv := strings.Split(tok, ",")
// Format must be HH:MM,BW
if len(tv) != 2 {
return errors.Errorf("invalid time/bandwidth specification: %q", tok)
}
// Basic timespec sanity checking
hhmm := tv[0]
if len(hhmm) != 5 {
return errors.Errorf("invalid time specification (hh:mm): %q", hhmm)
}
hh, err := strconv.Atoi(hhmm[0:2])
if err != nil {
return errors.Errorf("invalid hour in time specification %q: %v", hhmm, err)
}
if hh < 0 || hh > 23 {
return errors.Errorf("invalid hour (must be between 00 and 23): %q", hh)
}
mm, err := strconv.Atoi(hhmm[3:])
if err != nil {
return errors.Errorf("invalid minute in time specification: %q: %v", hhmm, err)
}
if mm < 0 || mm > 59 {
return errors.Errorf("invalid minute (must be between 00 and 59): %q", hh)
}
ts := BwTimeSlot{
hhmm: (hh * 100) + mm,
}
// Bandwidth limit for this time slot.
if err := ts.bandwidth.Set(tv[1]); err != nil {
return err
}
*x = append(*x, ts)
}
return nil
}
// LimitAt returns a BwTimeSlot for the time requested.
func (x BwTimetable) LimitAt(tt time.Time) BwTimeSlot {
// If the timetable is empty, we return an unlimited BwTimeSlot starting at midnight.
if len(x) == 0 {
return BwTimeSlot{hhmm: 0, bandwidth: -1}
}
hhmm := tt.Hour()*100 + tt.Minute()
// By default, we return the last element in the timetable. This
// satisfies two conditions: 1) If there's only one element it
// will always be selected, and 2) The last element of the table
// will "wrap around" until overriden by an earlier time slot.
// there's only one time slot in the timetable.
ret := x[len(x)-1]
mindif := 0
first := true
// Look for most recent time slot.
for _, ts := range x {
// Ignore the past
if hhmm < ts.hhmm {
continue
}
dif := ((hhmm / 100 * 60) + (hhmm % 100)) - ((ts.hhmm / 100 * 60) + (ts.hhmm % 100))
if first {
mindif = dif
first = false
}
if dif <= mindif {
mindif = dif
ret = ts
}
}
return ret
}
// Type of the value
func (x BwTimetable) Type() string {
return "BwTimetable"
}
// Check it satisfies the interface
var _ pflag.Value = (*BwTimetable)(nil)
// crypt internals // crypt internals
var ( var (
cryptKey = []byte{ cryptKey = []byte{
@ -396,6 +521,9 @@ func LoadConfig() {
// Start the token bucket limiter // Start the token bucket limiter
startTokenBucket() startTokenBucket()
// Start the bandwidth update ticker
startTokenTicker()
} }
var errorConfigFileNotFound = errors.New("config file not found") var errorConfigFileNotFound = errors.New("config file not found")

View file

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"crypto/rand" "crypto/rand"
"testing" "testing"
"time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -87,6 +88,106 @@ func TestSizeSuffixSet(t *testing.T) {
} }
} }
func TestBwTimetableSet(t *testing.T) {
for _, test := range []struct {
in string
want BwTimetable
err bool
}{
{"", BwTimetable{}, true},
{"0", BwTimetable{BwTimeSlot{hhmm: 0, bandwidth: 0}}, false},
{"666", BwTimetable{BwTimeSlot{hhmm: 0, bandwidth: 666 * 1024}}, false},
{"10:20,666", BwTimetable{BwTimeSlot{hhmm: 1020, bandwidth: 666 * 1024}}, false},
{
"11:00,333 13:40,666 23:50,10M 23:59,off",
BwTimetable{
BwTimeSlot{hhmm: 1100, bandwidth: 333 * 1024},
BwTimeSlot{hhmm: 1340, bandwidth: 666 * 1024},
BwTimeSlot{hhmm: 2350, bandwidth: 10 * 1024 * 1024},
BwTimeSlot{hhmm: 2359, bandwidth: -1},
},
false,
},
{"bad,bad", BwTimetable{}, true},
{"bad bad", BwTimetable{}, true},
{"bad", BwTimetable{}, true},
{"1000X", BwTimetable{}, true},
{"2401,666", BwTimetable{}, true},
{"1061,666", BwTimetable{}, true},
} {
tt := BwTimetable{}
err := tt.Set(test.in)
if test.err {
require.Error(t, err)
} else {
require.NoError(t, err)
}
assert.Equal(t, test.want, tt)
}
}
func TestBwTimetableLimitAt(t *testing.T) {
for _, test := range []struct {
tt BwTimetable
now time.Time
want BwTimeSlot
}{
{
BwTimetable{},
time.Date(2017, time.April, 20, 15, 0, 0, 0, time.UTC),
BwTimeSlot{hhmm: 0, bandwidth: -1},
},
{
BwTimetable{BwTimeSlot{hhmm: 1100, bandwidth: 333 * 1024}},
time.Date(2017, time.April, 20, 15, 0, 0, 0, time.UTC),
BwTimeSlot{hhmm: 1100, bandwidth: 333 * 1024},
},
{
BwTimetable{
BwTimeSlot{hhmm: 1100, bandwidth: 333 * 1024},
BwTimeSlot{hhmm: 1300, bandwidth: 666 * 1024},
BwTimeSlot{hhmm: 2301, bandwidth: 1024 * 1024},
BwTimeSlot{hhmm: 2350, bandwidth: -1},
},
time.Date(2017, time.April, 20, 10, 15, 0, 0, time.UTC),
BwTimeSlot{hhmm: 2350, bandwidth: -1},
},
{
BwTimetable{
BwTimeSlot{hhmm: 1100, bandwidth: 333 * 1024},
BwTimeSlot{hhmm: 1300, bandwidth: 666 * 1024},
BwTimeSlot{hhmm: 2301, bandwidth: 1024 * 1024},
BwTimeSlot{hhmm: 2350, bandwidth: -1},
},
time.Date(2017, time.April, 20, 11, 0, 0, 0, time.UTC),
BwTimeSlot{hhmm: 1100, bandwidth: 333 * 1024},
},
{
BwTimetable{
BwTimeSlot{hhmm: 1100, bandwidth: 333 * 1024},
BwTimeSlot{hhmm: 1300, bandwidth: 666 * 1024},
BwTimeSlot{hhmm: 2301, bandwidth: 1024 * 1024},
BwTimeSlot{hhmm: 2350, bandwidth: -1},
},
time.Date(2017, time.April, 20, 13, 1, 0, 0, time.UTC),
BwTimeSlot{hhmm: 1300, bandwidth: 666 * 1024},
},
{
BwTimetable{
BwTimeSlot{hhmm: 1100, bandwidth: 333 * 1024},
BwTimeSlot{hhmm: 1300, bandwidth: 666 * 1024},
BwTimeSlot{hhmm: 2301, bandwidth: 1024 * 1024},
BwTimeSlot{hhmm: 2350, bandwidth: -1},
},
time.Date(2017, time.April, 20, 23, 59, 0, 0, time.UTC),
BwTimeSlot{hhmm: 2350, bandwidth: -1},
},
} {
slot := test.tt.LimitAt(test.now)
assert.Equal(t, test.want, slot)
}
}
func TestObscure(t *testing.T) { func TestObscure(t *testing.T) {
for _, test := range []struct { for _, test := range []struct {
in string in string