diff --git a/docs/content/docs.md b/docs/content/docs.md index e23fe216f..4df0df26b 100644 --- a/docs/content/docs.md +++ b/docs/content/docs.md @@ -535,6 +535,22 @@ to reduce the value so rclone moves on to a high level retry (see the Disable low level retries with `--low-level-retries 1`. +### --max-backlog=N ### + +This is the maximum allowable backlog of files in a sync/copy/move +queued for being checked or transferred. + +This can be set arbitrarily large. It will only use memory when the +queue is in use. Note that it will use in the order of N kB of memory +when the backlog is in use. + +Setting this large allows rclone to calculate how many files are +pending more accurately and give a more accurate estimated finish +time. + +Setting this small will make rclone more synchronous to the listings +of the remote which may be desirable. + ### --max-delete=N ### This tells rclone not to delete more than N files. If that limit is diff --git a/fs/accounting/stats.go b/fs/accounting/stats.go index f6b143940..84fdb6452 100644 --- a/fs/accounting/stats.go +++ b/fs/accounting/stats.go @@ -65,17 +65,23 @@ The value for "eta" is null if an eta cannot be determined. // StatsInfo accounts all transfers type StatsInfo struct { - mu sync.RWMutex - bytes int64 - errors int64 - lastError error - checks int64 - checking *stringSet - transfers int64 - transferring *stringSet - deletes int64 - start time.Time - inProgress *inProgress + mu sync.RWMutex + bytes int64 + errors int64 + lastError error + checks int64 + checking *stringSet + checkQueue int + checkQueueSize int64 + transfers int64 + transferring *stringSet + transferQueue int + transferQueueSize int64 + renameQueue int + renameQueueSize int64 + deletes int64 + start time.Time + inProgress *inProgress } // NewStats cretates an initialised StatsInfo @@ -294,3 +300,27 @@ func (s *StatsInfo) DoneTransferring(remote string, ok bool) { s.mu.Unlock() } } + +// SetCheckQueue sets the number of queued checks +func (s *StatsInfo) SetCheckQueue(n int, size int64) { + s.mu.Lock() + s.checkQueue = n + s.checkQueueSize = size + s.mu.Unlock() +} + +// SetTransferQueue sets the number of queued transfers +func (s *StatsInfo) SetTransferQueue(n int, size int64) { + s.mu.Lock() + s.transferQueue = n + s.transferQueueSize = size + s.mu.Unlock() +} + +// SetRenameQueue sets the number of queued transfers +func (s *StatsInfo) SetRenameQueue(n int, size int64) { + s.mu.Lock() + s.renameQueue = n + s.renameQueueSize = size + s.mu.Unlock() +} diff --git a/fs/config.go b/fs/config.go index 062394f5d..7d5640ac1 100644 --- a/fs/config.go +++ b/fs/config.go @@ -81,6 +81,7 @@ type ConfigInfo struct { AskPassword bool UseServerModTime bool MaxTransfer SizeSuffix + MaxBacklog int } // NewConfig creates a new config with everything set to the default @@ -109,6 +110,7 @@ func NewConfig() *ConfigInfo { c.AskPassword = true c.TPSLimitBurst = 1 c.MaxTransfer = -1 + c.MaxBacklog = 10000 return c } diff --git a/fs/config/configflags/configflags.go b/fs/config/configflags/configflags.go index 8bab7ad58..3b99ed885 100644 --- a/fs/config/configflags/configflags.go +++ b/fs/config/configflags/configflags.go @@ -83,6 +83,7 @@ func AddFlags(flagSet *pflag.FlagSet) { flags.FVarP(flagSet, &fs.Config.StreamingUploadCutoff, "streaming-upload-cutoff", "", "Cutoff for switching to chunked upload if file size is unknown. Upload starts after reaching cutoff or when file ends.") flags.FVarP(flagSet, &fs.Config.Dump, "dump", "", "List of items to dump from: "+fs.DumpFlagsList) flags.FVarP(flagSet, &fs.Config.MaxTransfer, "max-transfer", "", "Maximum size of data to transfer.") + flags.IntVarP(flagSet, &fs.Config.MaxBacklog, "max-backlog", "", fs.Config.MaxBacklog, "Maximum number of objects in sync or check backlog.") } // SetFlags converts any flags into config which weren't straight foward diff --git a/fs/fs.go b/fs/fs.go index febd386d9..445e050c1 100644 --- a/fs/fs.go +++ b/fs/fs.go @@ -827,9 +827,6 @@ type ObjectPair struct { Src, Dst Object } -// ObjectPairChan is a channel of ObjectPair -type ObjectPairChan chan ObjectPair - // Find looks for an Info object for the name passed in // // Services are looked up in the config file diff --git a/fs/sync/pipe.go b/fs/sync/pipe.go new file mode 100644 index 000000000..ad173d111 --- /dev/null +++ b/fs/sync/pipe.go @@ -0,0 +1,100 @@ +package sync + +import ( + "context" + "sync" + + "github.com/ncw/rclone/fs" +) + +// pipe provides an unbounded channel like experience +// +// Note unlike channels these aren't strictly ordered. +type pipe struct { + mu sync.Mutex + c chan struct{} + queue []fs.ObjectPair + closed bool + totalSize int64 + stats func(items int, totalSize int64) +} + +func newPipe(stats func(items int, totalSize int64), maxBacklog int) *pipe { + return &pipe{ + c: make(chan struct{}, maxBacklog), + stats: stats, + } +} + +// Put an pair into the pipe +// +// It returns ok = false if the context was cancelled +// +// It will panic if you call it after Close() +func (p *pipe) Put(ctx context.Context, pair fs.ObjectPair) (ok bool) { + if ctx.Err() != nil { + return false + } + p.mu.Lock() + p.queue = append(p.queue, pair) + size := pair.Src.Size() + if size > 0 { + p.totalSize += size + } + p.stats(len(p.queue), p.totalSize) + p.mu.Unlock() + select { + case <-ctx.Done(): + return false + case p.c <- struct{}{}: + } + return true +} + +// Get a pair from the pipe +// +// It returns ok = false if the context was cancelled or Close() has +// been called. +func (p *pipe) Get(ctx context.Context) (pair fs.ObjectPair, ok bool) { + if ctx.Err() != nil { + return + } + select { + case <-ctx.Done(): + return + case _, ok = <-p.c: + if !ok { + return + } + } + p.mu.Lock() + pair, p.queue = p.queue[0], p.queue[1:] + size := pair.Src.Size() + if size > 0 { + p.totalSize -= size + } + if p.totalSize < 0 { + p.totalSize = 0 + } + p.stats(len(p.queue), p.totalSize) + p.mu.Unlock() + return pair, true +} + +// Stats reads the number of items in the queue and the totalSize +func (p *pipe) Stats() (items int, totalSize int64) { + p.mu.Lock() + items, totalSize = len(p.queue), p.totalSize + p.mu.Unlock() + return items, totalSize +} + +// Close the pipe +// +// Writes to a closed pipe will panic as will double closing a pipe +func (p *pipe) Close() { + p.mu.Lock() + close(p.c) + p.closed = true + p.mu.Unlock() +} diff --git a/fs/sync/pipe_test.go b/fs/sync/pipe_test.go new file mode 100644 index 000000000..17ce2611b --- /dev/null +++ b/fs/sync/pipe_test.go @@ -0,0 +1,122 @@ +package sync + +import ( + "context" + "sync" + "sync/atomic" + "testing" + + "github.com/ncw/rclone/fs" + "github.com/ncw/rclone/fstest/mockobject" + "github.com/stretchr/testify/assert" +) + +func TestPipe(t *testing.T) { + var queueLength int + var queueSize int64 + stats := func(n int, size int64) { + queueLength, queueSize = n, size + } + + // Make a new pipe + p := newPipe(stats, 10) + + checkStats := func(expectedN int, expectedSize int64) { + n, size := p.Stats() + assert.Equal(t, expectedN, n) + assert.Equal(t, expectedSize, size) + assert.Equal(t, expectedN, queueLength) + assert.Equal(t, expectedSize, queueSize) + } + + checkStats(0, 0) + + ctx := context.Background() + + obj1 := mockobject.New("potato").WithContent([]byte("hello"), mockobject.SeekModeNone) + + pair1 := fs.ObjectPair{Src: obj1, Dst: nil} + + // Put an object + ok := p.Put(ctx, pair1) + assert.Equal(t, true, ok) + checkStats(1, 5) + + // Close the pipe showing reading on closed pipe is OK + p.Close() + + // Read from pipe + pair2, ok := p.Get(ctx) + assert.Equal(t, pair1, pair2) + assert.Equal(t, true, ok) + checkStats(0, 0) + + // Check read on closed pipe + pair2, ok = p.Get(ctx) + assert.Equal(t, fs.ObjectPair{}, pair2) + assert.Equal(t, false, ok) + + // Check panic on write to closed pipe + assert.Panics(t, func() { p.Put(ctx, pair1) }) + + // Make a new pipe + p = newPipe(stats, 10) + ctx2, cancel := context.WithCancel(ctx) + + // cancel it in the background - check read ceases + go cancel() + pair2, ok = p.Get(ctx2) + assert.Equal(t, fs.ObjectPair{}, pair2) + assert.Equal(t, false, ok) + + // check we can't write + ok = p.Put(ctx2, pair1) + assert.Equal(t, false, ok) + +} + +// TestPipeConcurrent runs concurrent Get and Put to flush out any +// race conditions and concurrency problems. +func TestPipeConcurrent(t *testing.T) { + const ( + N = 1000 + readWriters = 10 + ) + + stats := func(n int, size int64) {} + + // Make a new pipe + p := newPipe(stats, 10) + + var wg sync.WaitGroup + obj1 := mockobject.New("potato").WithContent([]byte("hello"), mockobject.SeekModeNone) + pair1 := fs.ObjectPair{Src: obj1, Dst: nil} + ctx := context.Background() + var count int64 + + for j := 0; j < readWriters; j++ { + wg.Add(2) + go func() { + defer wg.Done() + for i := 0; i < N; i++ { + // Read from pipe + pair2, ok := p.Get(ctx) + assert.Equal(t, pair1, pair2) + assert.Equal(t, true, ok) + atomic.AddInt64(&count, -1) + } + }() + go func() { + defer wg.Done() + for i := 0; i < N; i++ { + // Put an object + ok := p.Put(ctx, pair1) + assert.Equal(t, true, ok) + atomic.AddInt64(&count, 1) + } + }() + } + wg.Wait() + + assert.Equal(t, int64(0), count) +} diff --git a/fs/sync/sync.go b/fs/sync/sync.go index 8c0d88baa..6465abbf7 100644 --- a/fs/sync/sync.go +++ b/fs/sync/sync.go @@ -43,9 +43,9 @@ type syncCopyMove struct { srcEmptyDirsMu sync.Mutex // protect srcEmptyDirs srcEmptyDirs map[string]fs.DirEntry // potentially empty directories checkerWg sync.WaitGroup // wait for checkers - toBeChecked fs.ObjectPairChan // checkers channel + toBeChecked *pipe // checkers channel transfersWg sync.WaitGroup // wait for transfers - toBeUploaded fs.ObjectPairChan // copiers channel + toBeUploaded *pipe // copiers channel errorMu sync.Mutex // Mutex covering the errors variables err error // normal error from copy process noRetryErr error // error with NoRetry set @@ -54,7 +54,7 @@ type syncCopyMove struct { renameMapMu sync.Mutex // mutex to protect the below renameMap map[string][]fs.Object // dst files by hash - only used by trackRenames renamerWg sync.WaitGroup // wait for renamers - toBeRenamed fs.ObjectPairChan // renamers channel + toBeRenamed *pipe // renamers channel trackRenamesWg sync.WaitGroup // wg for background track renames trackRenamesCh chan fs.Object // objects are pumped in here renameCheck []fs.Object // accumulate files to check for rename here @@ -75,12 +75,12 @@ func newSyncCopyMove(fdst, fsrc fs.Fs, deleteMode fs.DeleteMode, DoMove bool, de dstFilesResult: make(chan error, 1), dstEmptyDirs: make(map[string]fs.DirEntry), srcEmptyDirs: make(map[string]fs.DirEntry), - toBeChecked: make(fs.ObjectPairChan, fs.Config.Transfers), - toBeUploaded: make(fs.ObjectPairChan, fs.Config.Transfers), + toBeChecked: newPipe(accounting.Stats.SetCheckQueue, fs.Config.MaxBacklog), + toBeUploaded: newPipe(accounting.Stats.SetTransferQueue, fs.Config.MaxBacklog), deleteFilesCh: make(chan fs.Object, fs.Config.Checkers), trackRenames: fs.Config.TrackRenames, commonHash: fsrc.Hashes().Overlap(fdst.Hashes()).GetOne(), - toBeRenamed: make(fs.ObjectPairChan, fs.Config.Transfers), + toBeRenamed: newPipe(accounting.Stats.SetRenameQueue, fs.Config.MaxBacklog), trackRenamesCh: make(chan fs.Object, fs.Config.Checkers), } s.ctx, s.cancel = context.WithCancel(context.Background()) @@ -131,12 +131,7 @@ func newSyncCopyMove(fdst, fsrc fs.Fs, deleteMode fs.DeleteMode, DoMove bool, de // Check to see if the context has been cancelled func (s *syncCopyMove) aborting() bool { - select { - case <-s.ctx.Done(): - return true - default: - } - return false + return s.ctx.Err() != nil } // This reads the map and pumps it into the channel passed in, closing @@ -197,119 +192,95 @@ func (s *syncCopyMove) currentError() error { // pairChecker reads Objects~s on in send to out if they need transferring. // // FIXME potentially doing lots of hashes at once -func (s *syncCopyMove) pairChecker(in fs.ObjectPairChan, out fs.ObjectPairChan, wg *sync.WaitGroup) { +func (s *syncCopyMove) pairChecker(in *pipe, out *pipe, wg *sync.WaitGroup) { defer wg.Done() for { - if s.aborting() { + pair, ok := in.Get(s.ctx) + if !ok { return } - select { - case pair, ok := <-in: - if !ok { - return - } - src := pair.Src - accounting.Stats.Checking(src.Remote()) - // Check to see if can store this - if src.Storable() { - if operations.NeedTransfer(pair.Dst, pair.Src) { - // If files are treated as immutable, fail if destination exists and does not match - if fs.Config.Immutable && pair.Dst != nil { - fs.Errorf(pair.Dst, "Source and destination exist but do not match: immutable file modified") - s.processError(fs.ErrorImmutableModified) - } else { - // If destination already exists, then we must move it into --backup-dir if required - if pair.Dst != nil && s.backupDir != nil { - remoteWithSuffix := pair.Dst.Remote() + s.suffix - overwritten, _ := s.backupDir.NewObject(remoteWithSuffix) - _, err := operations.Move(s.backupDir, overwritten, remoteWithSuffix, pair.Dst) - if err != nil { - s.processError(err) - } else { - // If successful zero out the dst as it is no longer there and copy the file - pair.Dst = nil - select { - case <-s.ctx.Done(): - return - case out <- pair: - } - } + src := pair.Src + accounting.Stats.Checking(src.Remote()) + // Check to see if can store this + if src.Storable() { + if operations.NeedTransfer(pair.Dst, pair.Src) { + // If files are treated as immutable, fail if destination exists and does not match + if fs.Config.Immutable && pair.Dst != nil { + fs.Errorf(pair.Dst, "Source and destination exist but do not match: immutable file modified") + s.processError(fs.ErrorImmutableModified) + } else { + // If destination already exists, then we must move it into --backup-dir if required + if pair.Dst != nil && s.backupDir != nil { + remoteWithSuffix := pair.Dst.Remote() + s.suffix + overwritten, _ := s.backupDir.NewObject(remoteWithSuffix) + _, err := operations.Move(s.backupDir, overwritten, remoteWithSuffix, pair.Dst) + if err != nil { + s.processError(err) } else { - select { - case <-s.ctx.Done(): + // If successful zero out the dst as it is no longer there and copy the file + pair.Dst = nil + ok = out.Put(s.ctx, pair) + if !ok { return - case out <- pair: } } - } - } else { - // If moving need to delete the files we don't need to copy - if s.DoMove { - // Delete src if no error on copy - s.processError(operations.DeleteFile(src)) + } else { + ok = out.Put(s.ctx, pair) + if !ok { + return + } } } + } else { + // If moving need to delete the files we don't need to copy + if s.DoMove { + // Delete src if no error on copy + s.processError(operations.DeleteFile(src)) + } } - accounting.Stats.DoneChecking(src.Remote()) - case <-s.ctx.Done(): - return } + accounting.Stats.DoneChecking(src.Remote()) } } // pairRenamer reads Objects~s on in and attempts to rename them, // otherwise it sends them out if they need transferring. -func (s *syncCopyMove) pairRenamer(in fs.ObjectPairChan, out fs.ObjectPairChan, wg *sync.WaitGroup) { +func (s *syncCopyMove) pairRenamer(in *pipe, out *pipe, wg *sync.WaitGroup) { defer wg.Done() for { - if s.aborting() { + pair, ok := in.Get(s.ctx) + if !ok { return } - select { - case pair, ok := <-in: + src := pair.Src + if !s.tryRename(src) { + // pass on if not renamed + ok = out.Put(s.ctx, pair) if !ok { return } - src := pair.Src - if !s.tryRename(src) { - // pass on if not renamed - select { - case <-s.ctx.Done(): - return - case out <- pair: - } - } - case <-s.ctx.Done(): - return } } } // pairCopyOrMove reads Objects on in and moves or copies them. -func (s *syncCopyMove) pairCopyOrMove(in fs.ObjectPairChan, fdst fs.Fs, wg *sync.WaitGroup) { +func (s *syncCopyMove) pairCopyOrMove(in *pipe, fdst fs.Fs, wg *sync.WaitGroup) { defer wg.Done() var err error for { - if s.aborting() { + pair, ok := in.Get(s.ctx) + if !ok { return } - select { - case pair, ok := <-in: - if !ok { - return - } - src := pair.Src - accounting.Stats.Transferring(src.Remote()) - if s.DoMove { - _, err = operations.Move(fdst, pair.Dst, src.Remote(), src) - } else { - _, err = operations.Copy(fdst, pair.Dst, src.Remote(), src) - } - s.processError(err) - accounting.Stats.DoneTransferring(src.Remote(), err == nil) - case <-s.ctx.Done(): - return + src := pair.Src + accounting.Stats.Transferring(src.Remote()) + if s.DoMove { + _, err = operations.Move(fdst, pair.Dst, src.Remote(), src) + } else { + _, err = operations.Copy(fdst, pair.Dst, src.Remote(), src) } + s.processError(err) + accounting.Stats.DoneTransferring(src.Remote(), err == nil) } } @@ -323,7 +294,7 @@ func (s *syncCopyMove) startCheckers() { // This stops the background checkers func (s *syncCopyMove) stopCheckers() { - close(s.toBeChecked) + s.toBeChecked.Close() fs.Infof(s.fdst, "Waiting for checks to finish") s.checkerWg.Wait() } @@ -338,7 +309,7 @@ func (s *syncCopyMove) startTransfers() { // This stops the background transfers func (s *syncCopyMove) stopTransfers() { - close(s.toBeUploaded) + s.toBeUploaded.Close() fs.Infof(s.fdst, "Waiting for transfers to finish") s.transfersWg.Wait() } @@ -359,7 +330,7 @@ func (s *syncCopyMove) stopRenamers() { if !s.trackRenames { return } - close(s.toBeRenamed) + s.toBeRenamed.Close() fs.Infof(s.fdst, "Waiting for renames to finish") s.renamerWg.Wait() } @@ -685,10 +656,9 @@ func (s *syncCopyMove) run() error { s.makeRenameMap() // Attempt renames for all the files which don't have a matching dst for _, src := range s.renameCheck { - select { - case <-s.ctx.Done(): + ok := s.toBeRenamed.Put(s.ctx, fs.ObjectPair{Src: src, Dst: nil}) + if !ok { break - case s.toBeRenamed <- fs.ObjectPair{Src: src, Dst: nil}: } } } @@ -792,10 +762,9 @@ func (s *syncCopyMove) SrcOnly(src fs.DirEntry) (recurse bool) { } } else { // No need to check since doesn't exist - select { - case <-s.ctx.Done(): + ok := s.toBeUploaded.Put(s.ctx, fs.ObjectPair{Src: x, Dst: nil}) + if !ok { return - case s.toBeUploaded <- fs.ObjectPair{Src: x, Dst: nil}: } } case fs.Directory: @@ -825,10 +794,9 @@ func (s *syncCopyMove) Match(dst, src fs.DirEntry) (recurse bool) { } dstX, ok := dst.(fs.Object) if ok { - select { - case <-s.ctx.Done(): - return - case s.toBeChecked <- fs.ObjectPair{Src: srcX, Dst: dstX}: + ok = s.toBeChecked.Put(s.ctx, fs.ObjectPair{Src: srcX, Dst: dstX}) + if !ok { + return false } } else { // FIXME src is file, dst is directory