forked from TrueCloudLab/rclone
sync: fix --max-duration and --cutoff-mode soft
Before this change using --max-duration and --cutoff-mode soft would
work like --cutoff-mode hard.
This bug was introduced in this commit which made transfers be
cancelable - before that transfers couldn't be canceled.
122a47fba6
accounting: Allow transfers to be canceled with context #3257
This change adds the timeout to the input context for reading files
rather than the transfer context so the files transfers themselves
aren't canceled if --cutoff-mode soft is in action.
This also adds a test for cutoff mode soft and max duration which was
missing.
See: https://forum.rclone.org/t/max-duration-and-retries-not-working-correctly/27738
This commit is contained in:
parent
3f61869179
commit
4ac875a811
2 changed files with 53 additions and 21 deletions
|
@ -145,16 +145,29 @@ func newSyncCopyMove(ctx context.Context, fdst, fsrc fs.Fs, deleteMode fs.Delete
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// If a max session duration has been defined add a deadline to the context
|
|
||||||
if ci.MaxDuration > 0 {
|
if ci.MaxDuration > 0 {
|
||||||
s.maxDurationEndTime = time.Now().Add(ci.MaxDuration)
|
s.maxDurationEndTime = time.Now().Add(ci.MaxDuration)
|
||||||
fs.Infof(s.fdst, "Transfer session deadline: %s", s.maxDurationEndTime.Format("2006/01/02 15:04:05"))
|
fs.Infof(s.fdst, "Transfer session %v deadline: %s", ci.CutoffMode, s.maxDurationEndTime.Format("2006/01/02 15:04:05"))
|
||||||
|
}
|
||||||
|
// If a max session duration has been defined add a deadline
|
||||||
|
// to the main context if cutoff mode is hard. This will cut
|
||||||
|
// the transfers off.
|
||||||
|
if !s.maxDurationEndTime.IsZero() && ci.CutoffMode == fs.CutoffModeHard {
|
||||||
s.ctx, s.cancel = context.WithDeadline(ctx, s.maxDurationEndTime)
|
s.ctx, s.cancel = context.WithDeadline(ctx, s.maxDurationEndTime)
|
||||||
} else {
|
} else {
|
||||||
s.ctx, s.cancel = context.WithCancel(ctx)
|
s.ctx, s.cancel = context.WithCancel(ctx)
|
||||||
}
|
}
|
||||||
// Input context - cancel this for graceful stop
|
// Input context - cancel this for graceful stop.
|
||||||
s.inCtx, s.inCancel = context.WithCancel(s.ctx)
|
//
|
||||||
|
// If a max session duration has been defined add a deadline
|
||||||
|
// to the input context if cutoff mode is graceful or soft.
|
||||||
|
// This won't stop the transfers but will cut the
|
||||||
|
// list/check/transfer pipelines.
|
||||||
|
if !s.maxDurationEndTime.IsZero() && ci.CutoffMode != fs.CutoffModeHard {
|
||||||
|
s.inCtx, s.inCancel = context.WithDeadline(s.ctx, s.maxDurationEndTime)
|
||||||
|
} else {
|
||||||
|
s.inCtx, s.inCancel = context.WithCancel(s.ctx)
|
||||||
|
}
|
||||||
if s.noTraverse && s.deleteMode != fs.DeleteModeOff {
|
if s.noTraverse && s.deleteMode != fs.DeleteModeOff {
|
||||||
if !fi.HaveFilesFrom() {
|
if !fi.HaveFilesFrom() {
|
||||||
fs.Errorf(nil, "Ignoring --no-traverse with sync")
|
fs.Errorf(nil, "Ignoring --no-traverse with sync")
|
||||||
|
@ -904,8 +917,9 @@ func (s *syncCopyMove) run() error {
|
||||||
s.processError(s.deleteEmptyDirectories(s.ctx, s.fsrc, s.srcEmptyDirs))
|
s.processError(s.deleteEmptyDirectories(s.ctx, s.fsrc, s.srcEmptyDirs))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read the error out of the context if there is one
|
// Read the error out of the contexts if there is one
|
||||||
s.processError(s.ctx.Err())
|
s.processError(s.ctx.Err())
|
||||||
|
s.processError(s.inCtx.Err())
|
||||||
|
|
||||||
// If the duration was exceeded then add a Fatal Error so we don't retry
|
// If the duration was exceeded then add a Fatal Error so we don't retry
|
||||||
if !s.maxDurationEndTime.IsZero() && time.Since(s.maxDurationEndTime) > 0 {
|
if !s.maxDurationEndTime.IsZero() && time.Since(s.maxDurationEndTime) > 0 {
|
||||||
|
@ -918,7 +932,8 @@ func (s *syncCopyMove) run() error {
|
||||||
fs.Infof(nil, "There was nothing to transfer")
|
fs.Infof(nil, "There was nothing to transfer")
|
||||||
}
|
}
|
||||||
|
|
||||||
// cancel the context to free resources
|
// cancel the contexts to free resources
|
||||||
|
s.inCancel()
|
||||||
s.cancel()
|
s.cancel()
|
||||||
return s.currentError()
|
return s.currentError()
|
||||||
}
|
}
|
||||||
|
|
|
@ -1002,7 +1002,7 @@ func TestSyncWithUpdateOlder(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test with a max transfer duration
|
// Test with a max transfer duration
|
||||||
func TestSyncWithMaxDuration(t *testing.T) {
|
func testSyncWithMaxDuration(t *testing.T, cutoffMode fs.CutoffMode) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
ctx, ci := fs.AddConfig(ctx)
|
ctx, ci := fs.AddConfig(ctx)
|
||||||
if *fstest.RemoteName != "" {
|
if *fstest.RemoteName != "" {
|
||||||
|
@ -1013,32 +1013,49 @@ func TestSyncWithMaxDuration(t *testing.T) {
|
||||||
|
|
||||||
maxDuration := 250 * time.Millisecond
|
maxDuration := 250 * time.Millisecond
|
||||||
ci.MaxDuration = maxDuration
|
ci.MaxDuration = maxDuration
|
||||||
bytesPerSecond := 300
|
ci.CutoffMode = cutoffMode
|
||||||
accounting.TokenBucket.SetBwLimit(fs.BwPair{Tx: fs.SizeSuffix(bytesPerSecond), Rx: fs.SizeSuffix(bytesPerSecond)})
|
ci.CheckFirst = true
|
||||||
|
ci.OrderBy = "size"
|
||||||
ci.Transfers = 1
|
ci.Transfers = 1
|
||||||
|
ci.Checkers = 1
|
||||||
|
bytesPerSecond := 10 * 1024
|
||||||
|
accounting.TokenBucket.SetBwLimit(fs.BwPair{Tx: fs.SizeSuffix(bytesPerSecond), Rx: fs.SizeSuffix(bytesPerSecond)})
|
||||||
defer accounting.TokenBucket.SetBwLimit(fs.BwPair{Tx: -1, Rx: -1})
|
defer accounting.TokenBucket.SetBwLimit(fs.BwPair{Tx: -1, Rx: -1})
|
||||||
|
|
||||||
// 5 files of 60 bytes at 60 Byte/s 5 seconds
|
// write one small file which we expect to transfer and one big one which we don't
|
||||||
testFiles := make([]fstest.Item, 5)
|
file1 := r.WriteFile("file1", string(make([]byte, 16)), t1)
|
||||||
for i := 0; i < len(testFiles); i++ {
|
file2 := r.WriteFile("file2", string(make([]byte, 50*1024)), t1)
|
||||||
testFiles[i] = r.WriteFile(fmt.Sprintf("file%d", i), "------------------------------------------------------------", t1)
|
r.CheckLocalItems(t, file1, file2)
|
||||||
}
|
r.CheckRemoteItems(t)
|
||||||
|
|
||||||
fstest.CheckListing(t, r.Flocal, testFiles)
|
|
||||||
|
|
||||||
accounting.GlobalStats().ResetCounters()
|
accounting.GlobalStats().ResetCounters()
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
err := Sync(ctx, r.Fremote, r.Flocal, false)
|
err := Sync(ctx, r.Fremote, r.Flocal, false)
|
||||||
require.True(t, errors.Is(err, errorMaxDurationReached))
|
require.True(t, errors.Is(err, errorMaxDurationReached))
|
||||||
|
|
||||||
|
if cutoffMode == fs.CutoffModeHard {
|
||||||
|
r.CheckRemoteItems(t, file1)
|
||||||
|
assert.Equal(t, int64(1), accounting.GlobalStats().GetTransfers())
|
||||||
|
} else {
|
||||||
|
r.CheckRemoteItems(t, file1, file2)
|
||||||
|
assert.Equal(t, int64(2), accounting.GlobalStats().GetTransfers())
|
||||||
|
}
|
||||||
|
|
||||||
elapsed := time.Since(startTime)
|
elapsed := time.Since(startTime)
|
||||||
maxTransferTime := (time.Duration(len(testFiles)) * 60 * time.Second) / time.Duration(bytesPerSecond)
|
const maxTransferTime = 20 * time.Second
|
||||||
|
|
||||||
what := fmt.Sprintf("expecting elapsed time %v between %v and %v", elapsed, maxDuration, maxTransferTime)
|
what := fmt.Sprintf("expecting elapsed time %v between %v and %v", elapsed, maxDuration, maxTransferTime)
|
||||||
require.True(t, elapsed >= maxDuration, what)
|
assert.True(t, elapsed >= maxDuration, what)
|
||||||
require.True(t, elapsed < 5*time.Second, what)
|
assert.True(t, elapsed < maxTransferTime, what)
|
||||||
// we must not have transferred all files during the session
|
}
|
||||||
require.True(t, accounting.GlobalStats().GetTransfers() < int64(len(testFiles)))
|
|
||||||
|
func TestSyncWithMaxDuration(t *testing.T) {
|
||||||
|
t.Run("Hard", func(t *testing.T) {
|
||||||
|
testSyncWithMaxDuration(t, fs.CutoffModeHard)
|
||||||
|
})
|
||||||
|
t.Run("Soft", func(t *testing.T) {
|
||||||
|
testSyncWithMaxDuration(t, fs.CutoffModeSoft)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test with TrackRenames set
|
// Test with TrackRenames set
|
||||||
|
|
Loading…
Add table
Reference in a new issue