diff --git a/fs/sync/sync.go b/fs/sync/sync.go index f9844ea16..58e74e22a 100644 --- a/fs/sync/sync.go +++ b/fs/sync/sync.go @@ -145,16 +145,29 @@ func newSyncCopyMove(ctx context.Context, fdst, fsrc fs.Fs, deleteMode fs.Delete if err != nil { return nil, err } - // If a max session duration has been defined add a deadline to the context if ci.MaxDuration > 0 { s.maxDurationEndTime = time.Now().Add(ci.MaxDuration) - fs.Infof(s.fdst, "Transfer session deadline: %s", s.maxDurationEndTime.Format("2006/01/02 15:04:05")) + fs.Infof(s.fdst, "Transfer session %v deadline: %s", ci.CutoffMode, s.maxDurationEndTime.Format("2006/01/02 15:04:05")) + } + // If a max session duration has been defined add a deadline + // to the main context if cutoff mode is hard. This will cut + // the transfers off. + if !s.maxDurationEndTime.IsZero() && ci.CutoffMode == fs.CutoffModeHard { s.ctx, s.cancel = context.WithDeadline(ctx, s.maxDurationEndTime) } else { s.ctx, s.cancel = context.WithCancel(ctx) } - // Input context - cancel this for graceful stop - s.inCtx, s.inCancel = context.WithCancel(s.ctx) + // Input context - cancel this for graceful stop. + // + // If a max session duration has been defined add a deadline + // to the input context if cutoff mode is graceful or soft. + // This won't stop the transfers but will cut the + // list/check/transfer pipelines. + if !s.maxDurationEndTime.IsZero() && ci.CutoffMode != fs.CutoffModeHard { + s.inCtx, s.inCancel = context.WithDeadline(s.ctx, s.maxDurationEndTime) + } else { + s.inCtx, s.inCancel = context.WithCancel(s.ctx) + } if s.noTraverse && s.deleteMode != fs.DeleteModeOff { if !fi.HaveFilesFrom() { fs.Errorf(nil, "Ignoring --no-traverse with sync") @@ -904,8 +917,9 @@ func (s *syncCopyMove) run() error { s.processError(s.deleteEmptyDirectories(s.ctx, s.fsrc, s.srcEmptyDirs)) } - // Read the error out of the context if there is one + // Read the error out of the contexts if there is one s.processError(s.ctx.Err()) + s.processError(s.inCtx.Err()) // If the duration was exceeded then add a Fatal Error so we don't retry if !s.maxDurationEndTime.IsZero() && time.Since(s.maxDurationEndTime) > 0 { @@ -918,7 +932,8 @@ func (s *syncCopyMove) run() error { fs.Infof(nil, "There was nothing to transfer") } - // cancel the context to free resources + // cancel the contexts to free resources + s.inCancel() s.cancel() return s.currentError() } diff --git a/fs/sync/sync_test.go b/fs/sync/sync_test.go index 4bf60c471..72e6a65e7 100644 --- a/fs/sync/sync_test.go +++ b/fs/sync/sync_test.go @@ -1002,7 +1002,7 @@ func TestSyncWithUpdateOlder(t *testing.T) { } // Test with a max transfer duration -func TestSyncWithMaxDuration(t *testing.T) { +func testSyncWithMaxDuration(t *testing.T, cutoffMode fs.CutoffMode) { ctx := context.Background() ctx, ci := fs.AddConfig(ctx) if *fstest.RemoteName != "" { @@ -1013,32 +1013,49 @@ func TestSyncWithMaxDuration(t *testing.T) { maxDuration := 250 * time.Millisecond ci.MaxDuration = maxDuration - bytesPerSecond := 300 - accounting.TokenBucket.SetBwLimit(fs.BwPair{Tx: fs.SizeSuffix(bytesPerSecond), Rx: fs.SizeSuffix(bytesPerSecond)}) + ci.CutoffMode = cutoffMode + ci.CheckFirst = true + ci.OrderBy = "size" ci.Transfers = 1 + ci.Checkers = 1 + bytesPerSecond := 10 * 1024 + accounting.TokenBucket.SetBwLimit(fs.BwPair{Tx: fs.SizeSuffix(bytesPerSecond), Rx: fs.SizeSuffix(bytesPerSecond)}) defer accounting.TokenBucket.SetBwLimit(fs.BwPair{Tx: -1, Rx: -1}) - // 5 files of 60 bytes at 60 Byte/s 5 seconds - testFiles := make([]fstest.Item, 5) - for i := 0; i < len(testFiles); i++ { - testFiles[i] = r.WriteFile(fmt.Sprintf("file%d", i), "------------------------------------------------------------", t1) - } - - fstest.CheckListing(t, r.Flocal, testFiles) + // write one small file which we expect to transfer and one big one which we don't + file1 := r.WriteFile("file1", string(make([]byte, 16)), t1) + file2 := r.WriteFile("file2", string(make([]byte, 50*1024)), t1) + r.CheckLocalItems(t, file1, file2) + r.CheckRemoteItems(t) accounting.GlobalStats().ResetCounters() startTime := time.Now() err := Sync(ctx, r.Fremote, r.Flocal, false) require.True(t, errors.Is(err, errorMaxDurationReached)) + if cutoffMode == fs.CutoffModeHard { + r.CheckRemoteItems(t, file1) + assert.Equal(t, int64(1), accounting.GlobalStats().GetTransfers()) + } else { + r.CheckRemoteItems(t, file1, file2) + assert.Equal(t, int64(2), accounting.GlobalStats().GetTransfers()) + } + elapsed := time.Since(startTime) - maxTransferTime := (time.Duration(len(testFiles)) * 60 * time.Second) / time.Duration(bytesPerSecond) + const maxTransferTime = 20 * time.Second what := fmt.Sprintf("expecting elapsed time %v between %v and %v", elapsed, maxDuration, maxTransferTime) - require.True(t, elapsed >= maxDuration, what) - require.True(t, elapsed < 5*time.Second, what) - // we must not have transferred all files during the session - require.True(t, accounting.GlobalStats().GetTransfers() < int64(len(testFiles))) + assert.True(t, elapsed >= maxDuration, what) + assert.True(t, elapsed < maxTransferTime, what) +} + +func TestSyncWithMaxDuration(t *testing.T) { + t.Run("Hard", func(t *testing.T) { + testSyncWithMaxDuration(t, fs.CutoffModeHard) + }) + t.Run("Soft", func(t *testing.T) { + testSyncWithMaxDuration(t, fs.CutoffModeSoft) + }) } // Test with TrackRenames set