fs: Add support for --max-transfer-cutoff modes #2672
This also adds max transfer cut off check for server side copies too
This commit is contained in:
parent
7d70eb0346
commit
6f1766dd9e
9 changed files with 155 additions and 11 deletions
|
@ -736,6 +736,20 @@ When the limit is reached all transfers will stop immediately.
|
|||
|
||||
Rclone will exit with exit code 8 if the transfer limit is reached.
|
||||
|
||||
### --max-transfer-(hard,soft,cautious) ###
|
||||
|
||||
This modifies the behavior of `--max-transfer`
|
||||
Defaults to `--max-transfer-hard`.
|
||||
|
||||
Specifiying `--max-transfer-hard` will stop transferring immediately
|
||||
when Rclone reaches the limit.
|
||||
|
||||
Specifiying `--max-transfer-soft` will stop starting new transfers
|
||||
when Rclone reaches the limit.
|
||||
|
||||
Specifiying `--max-transfer-cautious` will try to prevent Rclone
|
||||
from reaching the limit.
|
||||
|
||||
### --modify-window=TIME ###
|
||||
|
||||
When checking whether a file has been modified, this is the maximum
|
||||
|
|
|
@ -61,7 +61,10 @@ func newAccountSizeName(stats *StatsInfo, in io.ReadCloser, size int64, name str
|
|||
exit: make(chan struct{}),
|
||||
avg: 0,
|
||||
lpTime: time.Now(),
|
||||
max: int64(fs.Config.MaxTransfer),
|
||||
max: -1,
|
||||
}
|
||||
if fs.Config.MaxTransferMode != fs.MaxTransferModeSoft {
|
||||
acc.max = int64((fs.Config.MaxTransfer))
|
||||
}
|
||||
go acc.averageLoop()
|
||||
stats.inProgress.set(acc.name, acc)
|
||||
|
|
|
@ -197,9 +197,12 @@ func TestAccountAccounter(t *testing.T) {
|
|||
|
||||
func TestAccountMaxTransfer(t *testing.T) {
|
||||
old := fs.Config.MaxTransfer
|
||||
oldMode := fs.Config.MaxTransferMode
|
||||
|
||||
fs.Config.MaxTransfer = 15
|
||||
defer func() {
|
||||
fs.Config.MaxTransfer = old
|
||||
fs.Config.MaxTransferMode = oldMode
|
||||
}()
|
||||
|
||||
in := ioutil.NopCloser(bytes.NewBuffer(make([]byte, 100)))
|
||||
|
@ -218,6 +221,20 @@ func TestAccountMaxTransfer(t *testing.T) {
|
|||
assert.Equal(t, 0, n)
|
||||
assert.Equal(t, ErrorMaxTransferLimitReached, err)
|
||||
assert.True(t, fserrors.IsFatalError(err))
|
||||
|
||||
fs.Config.MaxTransferMode = fs.MaxTransferModeSoft
|
||||
stats = NewStats()
|
||||
acc = newAccountSizeName(stats, in, 1, "test")
|
||||
|
||||
n, err = acc.Read(b)
|
||||
assert.Equal(t, 10, n)
|
||||
assert.NoError(t, err)
|
||||
n, err = acc.Read(b)
|
||||
assert.Equal(t, 10, n)
|
||||
assert.NoError(t, err)
|
||||
n, err = acc.Read(b)
|
||||
assert.Equal(t, 10, n)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestShortenName(t *testing.T) {
|
||||
|
|
|
@ -382,6 +382,22 @@ func (s *StatsInfo) GetBytes() int64 {
|
|||
return s.bytes
|
||||
}
|
||||
|
||||
// GetBytesWithPending returns the number of bytes transferred and remaining transfers
|
||||
func (s *StatsInfo) GetBytesWithPending() int64 {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
pending := int64(0)
|
||||
for _, tr := range s.startedTransfers {
|
||||
if tr.acc != nil {
|
||||
bytes, size := tr.acc.progress()
|
||||
if bytes < size {
|
||||
pending += size - bytes
|
||||
}
|
||||
}
|
||||
}
|
||||
return s.bytes + pending
|
||||
}
|
||||
|
||||
// Errors updates the stats for errors
|
||||
func (s *StatsInfo) Errors(errors int64) {
|
||||
s.mu.Lock()
|
||||
|
|
|
@ -93,6 +93,7 @@ type ConfigInfo struct {
|
|||
UseServerModTime bool
|
||||
MaxTransfer SizeSuffix
|
||||
MaxDuration time.Duration
|
||||
MaxTransferMode MaxTransferMode
|
||||
MaxBacklog int
|
||||
MaxStatsGroups int
|
||||
StatsOneLine bool
|
||||
|
|
|
@ -20,15 +20,18 @@ import (
|
|||
|
||||
var (
|
||||
// these will get interpreted into fs.Config via SetFlags() below
|
||||
verbose int
|
||||
quiet bool
|
||||
dumpHeaders bool
|
||||
dumpBodies bool
|
||||
deleteBefore bool
|
||||
deleteDuring bool
|
||||
deleteAfter bool
|
||||
bindAddr string
|
||||
disableFeatures string
|
||||
verbose int
|
||||
quiet bool
|
||||
dumpHeaders bool
|
||||
dumpBodies bool
|
||||
deleteBefore bool
|
||||
deleteDuring bool
|
||||
deleteAfter bool
|
||||
bindAddr string
|
||||
disableFeatures string
|
||||
maxTransferHard bool
|
||||
maxTransferSoft bool
|
||||
maxTransferCautious bool
|
||||
)
|
||||
|
||||
// AddFlags adds the non filing system specific flags to the command
|
||||
|
@ -94,6 +97,9 @@ func AddFlags(flagSet *pflag.FlagSet) {
|
|||
flags.FVarP(flagSet, &fs.Config.Dump, "dump", "", "List of items to dump from: "+fs.DumpFlagsList)
|
||||
flags.FVarP(flagSet, &fs.Config.MaxTransfer, "max-transfer", "", "Maximum size of data to transfer.")
|
||||
flags.DurationVarP(flagSet, &fs.Config.MaxDuration, "max-duration", "", 0, "Maximum duration rclone will transfer data for.")
|
||||
flags.BoolVarP(flagSet, &maxTransferHard, "max-transfer-hard", "", false, "When transferring, stop immediately when --max-transfer is reached")
|
||||
flags.BoolVarP(flagSet, &maxTransferSoft, "max-transfer-soft", "", false, "When transferring, stop starting new transfers when --max-transfer is reached")
|
||||
flags.BoolVarP(flagSet, &maxTransferCautious, "max-transfer-cautious", "", false, "When transferring, try to avoid reaching --max-transfer")
|
||||
flags.IntVarP(flagSet, &fs.Config.MaxBacklog, "max-backlog", "", fs.Config.MaxBacklog, "Maximum number of objects in sync or check backlog.")
|
||||
flags.IntVarP(flagSet, &fs.Config.MaxStatsGroups, "max-stats-groups", "", fs.Config.MaxStatsGroups, "Maximum number of stats groups to keep in memory. On max oldest is discarded.")
|
||||
flags.BoolVarP(flagSet, &fs.Config.StatsOneLine, "stats-one-line", "", fs.Config.StatsOneLine, "Make the stats fit on one line.")
|
||||
|
@ -178,6 +184,20 @@ func SetFlags() {
|
|||
fs.Config.DeleteMode = fs.DeleteModeDefault
|
||||
}
|
||||
|
||||
switch {
|
||||
case maxTransferHard && (maxTransferSoft || maxTransferCautious),
|
||||
maxTransferSoft && maxTransferCautious:
|
||||
log.Fatalf(`Only one of --max-trans fer-hard, --max-transfer-soft or --max-transfer-cautious can be used.`)
|
||||
case maxTransferHard:
|
||||
fs.Config.MaxTransferMode = fs.MaxTransferModeHard
|
||||
case maxTransferSoft:
|
||||
fs.Config.MaxTransferMode = fs.MaxTransferModeSoft
|
||||
case maxTransferCautious:
|
||||
fs.Config.MaxTransferMode = fs.MaxTransferModeCautious
|
||||
default:
|
||||
fs.Config.MaxTransferMode = fs.MaxTransferModeDefault
|
||||
}
|
||||
|
||||
if fs.Config.CompareDest != "" && fs.Config.CopyDest != "" {
|
||||
log.Fatalf(`Can't use --compare-dest with --copy-dest.`)
|
||||
}
|
||||
|
|
12
fs/maxtransfermode.go
Normal file
12
fs/maxtransfermode.go
Normal file
|
@ -0,0 +1,12 @@
|
|||
package fs
|
||||
|
||||
// MaxTransferMode describes the possible delete modes in the config
|
||||
type MaxTransferMode byte
|
||||
|
||||
// MaxTransferMode constants
|
||||
const (
|
||||
MaxTransferModeHard MaxTransferMode = iota
|
||||
MaxTransferModeSoft
|
||||
MaxTransferModeCautious
|
||||
MaxTransferModeDefault = MaxTransferModeHard
|
||||
)
|
|
@ -364,7 +364,8 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
|
|||
actionTaken = "Copied (server side copy)"
|
||||
if doCopy := f.Features().Copy; doCopy != nil && (SameConfig(src.Fs(), f) || (SameRemoteType(src.Fs(), f) && f.Features().ServerSideAcrossConfigs)) {
|
||||
// Check transfer limit for server side copies
|
||||
if fs.Config.MaxTransfer >= 0 && accounting.Stats(ctx).GetBytes() >= int64(fs.Config.MaxTransfer) {
|
||||
if fs.Config.MaxTransfer >= 0 && (accounting.Stats(ctx).GetBytes() >= int64(fs.Config.MaxTransfer) ||
|
||||
(fs.Config.MaxTransferMode == fs.MaxTransferModeCautious && accounting.Stats(ctx).GetBytesWithPending()+src.Size() >= int64(fs.Config.MaxTransfer))) {
|
||||
return nil, accounting.ErrorMaxTransferLimitReached
|
||||
}
|
||||
in := tr.Account(nil) // account the transfer
|
||||
|
@ -385,6 +386,10 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
|
|||
}
|
||||
// If can't server side copy, do it manually
|
||||
if err == fs.ErrorCantCopy {
|
||||
if fs.Config.MaxTransfer >= 0 && (accounting.Stats(ctx).GetBytes() >= int64(fs.Config.MaxTransfer) ||
|
||||
(fs.Config.MaxTransferMode == fs.MaxTransferModeCautious && accounting.Stats(ctx).GetBytesWithPending()+src.Size() >= int64(fs.Config.MaxTransfer))) {
|
||||
return nil, accounting.ErrorMaxTransferLimitReached
|
||||
}
|
||||
if doMultiThreadCopy(f, src) {
|
||||
// Number of streams proportional to size
|
||||
streams := src.Size() / int64(fs.Config.MultiThreadCutoff)
|
||||
|
|
|
@ -39,6 +39,7 @@ import (
|
|||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/accounting"
|
||||
"github.com/rclone/rclone/fs/filter"
|
||||
"github.com/rclone/rclone/fs/fserrors"
|
||||
"github.com/rclone/rclone/fs/fshttp"
|
||||
"github.com/rclone/rclone/fs/hash"
|
||||
"github.com/rclone/rclone/fs/operations"
|
||||
|
@ -1527,3 +1528,58 @@ func TestRcatSize(t *testing.T) {
|
|||
// Check files exist
|
||||
fstest.CheckItems(t, r.Fremote, file1, file2)
|
||||
}
|
||||
|
||||
func TestCopyFileMaxTransfer(t *testing.T) {
|
||||
r := fstest.NewRun(t)
|
||||
defer r.Finalise()
|
||||
old := fs.Config.MaxTransfer
|
||||
oldMode := fs.Config.MaxTransferMode
|
||||
|
||||
defer func() {
|
||||
fs.Config.MaxTransfer = old
|
||||
fs.Config.MaxTransferMode = oldMode
|
||||
accounting.Stats(context.Background()).ResetCounters()
|
||||
}()
|
||||
|
||||
file1 := r.WriteFile("file1", "file1 contents", t1)
|
||||
file2 := r.WriteFile("file2", "file2 contents...........", t2)
|
||||
|
||||
rfile1 := file1
|
||||
rfile1.Path = "sub/file1"
|
||||
rfile2 := file2
|
||||
rfile2.Path = "sub/file2"
|
||||
|
||||
fs.Config.MaxTransfer = 15
|
||||
fs.Config.MaxTransferMode = fs.MaxTransferModeHard
|
||||
accounting.Stats(context.Background()).ResetCounters()
|
||||
|
||||
err := operations.CopyFile(context.Background(), r.Fremote, r.Flocal, rfile1.Path, file1.Path)
|
||||
require.NoError(t, err)
|
||||
fstest.CheckItems(t, r.Flocal, file1, file2)
|
||||
fstest.CheckItems(t, r.Fremote, rfile1)
|
||||
|
||||
accounting.Stats(context.Background()).ResetCounters()
|
||||
|
||||
err = operations.CopyFile(context.Background(), r.Fremote, r.Flocal, rfile2.Path, file2.Path)
|
||||
fstest.CheckItems(t, r.Flocal, file1, file2)
|
||||
fstest.CheckItems(t, r.Fremote, rfile1)
|
||||
assert.Equal(t, accounting.ErrorMaxTransferLimitReached, err)
|
||||
assert.True(t, fserrors.IsFatalError(err))
|
||||
|
||||
fs.Config.MaxTransferMode = fs.MaxTransferModeCautious
|
||||
accounting.Stats(context.Background()).ResetCounters()
|
||||
|
||||
err = operations.CopyFile(context.Background(), r.Fremote, r.Flocal, rfile2.Path, file2.Path)
|
||||
fstest.CheckItems(t, r.Flocal, file1, file2)
|
||||
fstest.CheckItems(t, r.Fremote, rfile1)
|
||||
assert.Equal(t, accounting.ErrorMaxTransferLimitReached, err)
|
||||
assert.True(t, fserrors.IsFatalError(err))
|
||||
|
||||
fs.Config.MaxTransferMode = fs.MaxTransferModeSoft
|
||||
accounting.Stats(context.Background()).ResetCounters()
|
||||
|
||||
err = operations.CopyFile(context.Background(), r.Fremote, r.Flocal, rfile2.Path, file2.Path)
|
||||
require.NoError(t, err)
|
||||
fstest.CheckItems(t, r.Flocal, file1, file2)
|
||||
fstest.CheckItems(t, r.Fremote, rfile1, rfile2)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue