From f1221b510be21a69ba6a9cff4ac126c37e07fa94 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Tue, 3 Jan 2017 17:35:12 +0000 Subject: [PATCH] Change --track-renames to use the length,hash pair stored in a map This makes it much faster in the case of many files and use less memory. This also detects use of --no-traverse and disables it. --- docs/content/docs.md | 6 + fs/sync.go | 362 +++++++++++++++++++++++++++++++------------ fs/sync_test.go | 11 ++ 3 files changed, 279 insertions(+), 100 deletions(-) diff --git a/docs/content/docs.md b/docs/content/docs.md index 952aa450c..9c7f4c60c 100644 --- a/docs/content/docs.md +++ b/docs/content/docs.md @@ -426,10 +426,16 @@ server side move, and the source and destination have a compatible hash, then this will track renames during `sync`, `copy`, and `move` operations and perform renaming server-side. +Files will be matched by size and hash - if both match then a rename +will be considered. + If the destination does not support server-side copy or move, rclone will fall back to the default behaviour and log an error level message to the console. +Note that `--track-renames` is incompatible with `--no-traverse` and +that it uses extra memory to keep track of all the rename candidates. + ### --delete-(before,during,after) ### This option allows you to specify when files on your destination are diff --git a/fs/sync.go b/fs/sync.go index 88b573bbd..756428c00 100644 --- a/fs/sync.go +++ b/fs/sync.go @@ -3,8 +3,8 @@ package fs import ( + "fmt" "sync" - "sync/atomic" "time" "github.com/pkg/errors" @@ -18,23 +18,29 @@ type syncCopyMove struct { DoMove bool dir string // internal state - noTraverse bool // if set don't trafevers the dst - deleteBefore bool // set if we must delete objects before copying - trackRenames bool // set if we should do server side renames - dstFiles map[string]Object // dst files, only used if Delete or trackRenames - srcFiles map[string]Object // src files, only used if deleteBefore or trackRenames - srcFilesChan chan Object // passes src objects - srcFilesResult chan error // error result of src listing - dstFilesResult chan error // error result of dst listing - abort chan struct{} // signal to abort the copiers - checkerWg sync.WaitGroup // wait for checkers - toBeChecked ObjectPairChan // checkers channel - transfersWg sync.WaitGroup // wait for transfers - toBeUploaded ObjectPairChan // copiers channel - errorMu sync.Mutex // Mutex covering the errors variables - err error // normal error from copy process - noRetryErr error // error with NoRetry set - fatalErr error // fatal error + noTraverse bool // if set don't trafevers the dst + deleteBefore bool // set if we must delete objects before copying + trackRenames bool // set if we should do server side renames + dstFilesMu sync.Mutex // protect dstFiles + dstFiles map[string]Object // dst files, always filled + srcFiles map[string]Object // src files, only used if deleteBefore + srcFilesChan chan Object // passes src objects + srcFilesResult chan error // error result of src listing + dstFilesResult chan error // error result of dst listing + abort chan struct{} // signal to abort the copiers + checkerWg sync.WaitGroup // wait for checkers + toBeChecked ObjectPairChan // checkers channel + transfersWg sync.WaitGroup // wait for transfers + toBeUploaded ObjectPairChan // copiers channel + errorMu sync.Mutex // Mutex covering the errors variables + err error // normal error from copy process + noRetryErr error // error with NoRetry set + fatalErr error // fatal error + commonHash HashType // common hash type between src and dst + renameMapMu sync.Mutex // mutex to protect the below + renameMap map[string][]Object // dst files by hash - only used by trackRenames + renamerWg sync.WaitGroup // wait for renamers + toBeRenamed ObjectPairChan // renamers channel } func newSyncCopyMove(fdst, fsrc Fs, Delete bool, DoMove bool) *syncCopyMove { @@ -53,6 +59,8 @@ func newSyncCopyMove(fdst, fsrc Fs, Delete bool, DoMove bool) *syncCopyMove { toBeUploaded: make(ObjectPairChan, Config.Transfers), deleteBefore: Delete && Config.DeleteBefore, trackRenames: Config.TrackRenames, + commonHash: fsrc.Hashes().Overlap(fdst.Hashes()).GetOne(), + toBeRenamed: make(ObjectPairChan, Config.Transfers), } if s.noTraverse && s.Delete { Debug(s.fdst, "Ignoring --no-traverse with sync") @@ -61,18 +69,22 @@ func newSyncCopyMove(fdst, fsrc Fs, Delete bool, DoMove bool) *syncCopyMove { if s.trackRenames { // Don't track renames for remotes without server-side rename support. // Some remotes simulate rename by server-side copy and delete, so include - // remotes that implements either Mover and Copier. + // remotes that implements either Mover or Copier. switch fdst.(type) { case Mover, Copier: default: ErrorLog(fdst, "Ignoring --track-renames as the destination does not support server-side move or copy") s.trackRenames = false } - if fsrc.Hashes().Overlap(fdst.Hashes()).Count() == 0 { + if s.commonHash == HashNone { ErrorLog(fdst, "Ignoring --track-renames as the source and destination do not have a common hash") s.trackRenames = false } } + if s.noTraverse && s.trackRenames { + Debug(s.fdst, "Ignoring --no-traverse with --track-renames") + s.noTraverse = false + } return s } @@ -86,23 +98,30 @@ func (s *syncCopyMove) aborting() bool { return false } +// This reads the map and pumps it into the channel passed in, closing +// the channel at the end +func (s *syncCopyMove) pumpMapToChan(files map[string]Object, out chan<- Object) { +outer: + for _, o := range files { + if s.aborting() { + break outer + } + select { + case out <- o: + case <-s.abort: + break outer + } + } + close(out) + s.srcFilesResult <- nil +} + // This reads the source files from s.srcFiles into srcFilesChan then // closes it // // It returns the final result of the read into s.srcFilesResult func (s *syncCopyMove) readSrcUsingMap() { -outer: - for _, o := range s.srcFiles { - if s.aborting() { - break outer - } - select { - case s.srcFilesChan <- o: - case <-s.abort: - break outer - } - } - close(s.srcFilesChan) + s.pumpMapToChan(s.srcFiles, s.srcFilesChan) s.srcFilesResult <- nil } @@ -213,6 +232,22 @@ func (s *syncCopyMove) processError(err error) { } } +// Returns the current error (if any) in the order of prececedence +// fatalErr +// normal error +// noRetryErr +func (s *syncCopyMove) currentError() error { + s.errorMu.Lock() + defer s.errorMu.Unlock() + if s.fatalErr != nil { + return s.fatalErr + } + if s.err != nil { + return s.err + } + return s.noRetryErr +} + // pairChecker reads Objects~s on in send to out if they need transferring. // // FIXME potentially doing lots of hashes at once @@ -248,6 +283,65 @@ func (s *syncCopyMove) pairChecker(in ObjectPairChan, out ObjectPairChan, wg *sy } } +// tryRename renames a src object when doing track renames if +// possible, it returns true if the object was renamed. +func (s *syncCopyMove) tryRename(src Object) bool { + Stats.Checking(src.Remote()) + defer Stats.DoneChecking(src.Remote()) + + hash, err := s.renameHash(src) + if err != nil { + Debug(src, "Failed to read hash: %v", err) + return false + } + if hash == "" { + return false + } + + dst := s.popRenameMap(hash) + if dst == nil { + return false + } + + err = MoveFile(s.fdst, s.fdst, src.Remote(), dst.Remote()) + if err != nil { + Debug(src, "Failed to rename to %q: %v", dst.Remote(), err) + return false + } + + // remove file from dstFiles if present + s.dstFilesMu.Lock() + delete(s.dstFiles, dst.Remote()) + s.dstFilesMu.Unlock() + + Debug(src, "Renamed from %q", dst.Remote()) + return true +} + +// pairRenamer reads Objects~s on in and attempts to rename them, +// otherwise it sends them out if they need transferring. +func (s *syncCopyMove) pairRenamer(in ObjectPairChan, out ObjectPairChan, wg *sync.WaitGroup) { + defer wg.Done() + for { + if s.aborting() { + return + } + select { + case pair, ok := <-in: + if !ok { + return + } + src := pair.src + if !s.tryRename(src) { + // pass on if not renamed + out <- pair + } + case <-s.abort: + return + } + } +} + // pairCopyOrMove reads Objects on in and moves or copies them. func (s *syncCopyMove) pairCopyOrMove(in ObjectPairChan, fdst Fs, wg *sync.WaitGroup) { defer wg.Done() @@ -306,6 +400,27 @@ func (s *syncCopyMove) stopTransfers() { s.transfersWg.Wait() } +// This starts the background renamers. +func (s *syncCopyMove) startRenamers() { + if !s.trackRenames { + return + } + s.renamerWg.Add(Config.Checkers) + for i := 0; i < Config.Checkers; i++ { + go s.pairRenamer(s.toBeRenamed, s.toBeUploaded, &s.renamerWg) + } +} + +// This stops the background renamers +func (s *syncCopyMove) stopRenamers() { + if !s.trackRenames { + return + } + close(s.toBeRenamed) + Log(s.fdst, "Waiting for renames to finish") + s.renamerWg.Wait() +} + // This deletes the files in the dstFiles map. If checkSrcMap is set // then it checks to see if they exist first in srcFiles the source // file map, otherwise it unconditionally deletes them. If @@ -337,67 +452,104 @@ func (s *syncCopyMove) deleteFiles(checkSrcMap bool) error { return DeleteFiles(toDelete) } -func (s *syncCopyMove) renameFiles() error { - - toRename := make(ObjectPairChan, Config.Transfers) - - for srcRemote, srcObject := range s.srcFiles { - if _, exists := s.dstFiles[srcRemote]; exists { - continue - } - - if s.aborting() { - return nil - } - - for dstRemote, dstObject := range s.dstFiles { - if _, exists := s.srcFiles[dstRemote]; exists { - continue - } - - // At this point, if the files are equal, this is a rename. - if equal(srcObject, dstObject, false, true) { - toRename <- ObjectPair{srcObject, dstObject} - break - } - } +// renameHash makes a string with the size and the hash for rename detection +// +// it may return an empty string in which case no hash could be made +func (s *syncCopyMove) renameHash(obj Object) (hash string, err error) { + hash, err = obj.Hash(s.commonHash) + if err != nil { + return hash, err } + if hash == "" { + return hash, nil + } + return fmt.Sprintf("%d,%s", obj.Size(), hash), nil +} - close(toRename) +// makeRenameMap builds a map of the destination files by hash +func (s *syncCopyMove) makeRenameMap() error { + Debug(s.fdst, "Making map for --track-renames") - var ( - wg sync.WaitGroup - filesMu sync.Mutex - errorCount int32 - ) + s.renameMap = make(map[string][]Object) + in := make(chan Object, Config.Checkers) + go s.pumpMapToChan(s.dstFiles, in) + var wg sync.WaitGroup wg.Add(Config.Transfers) for i := 0; i < Config.Transfers; i++ { go func() { defer wg.Done() - for pair := range toRename { - Debug(nil, "Rename %q to %q", pair.dst.Remote(), pair.src.Remote()) - - err := MoveFile(s.fdst, s.fdst, pair.src.Remote(), pair.dst.Remote()) - if err != nil { - atomic.AddInt32(&errorCount, 1) - continue + for { + if s.aborting() { + return + } + select { + case obj, ok := <-in: + if !ok { + return + } + Stats.Checking(obj.Remote()) + hash, err := s.renameHash(obj) + Stats.DoneChecking(obj.Remote()) + if err != nil { + s.processError(err) + } else if hash != "" { + s.renameMapMu.Lock() + s.renameMap[hash] = append(s.renameMap[hash], obj) + s.renameMapMu.Unlock() + } + case <-s.abort: + return } - - filesMu.Lock() - delete(s.dstFiles, pair.dst.Remote()) - delete(s.srcFiles, pair.src.Remote()) - filesMu.Unlock() } }() } - Log(nil, "Waiting for renames to finish") wg.Wait() - if errorCount > 0 { - return errors.Errorf("failed to rename %d files", errorCount) - } + Debug(s.fdst, "Finished making map for --track-renames") + return s.currentError() +} - return nil +// popRenameMap finds the object with hash and pop the first match from +// renameMap or returns nil if not found. +func (s *syncCopyMove) popRenameMap(hash string) (dst Object) { + s.renameMapMu.Lock() + dsts, ok := s.renameMap[hash] + if ok && len(dsts) > 0 { + dst, dsts = dsts[0], dsts[1:] + if len(dsts) > 0 { + s.renameMap[hash] = dsts + } else { + delete(s.renameMap, hash) + } + } + s.renameMapMu.Unlock() + return dst +} + +// delRenameMap removes obj from renameMap +func (s *syncCopyMove) delRenameMap(obj Object) { + hash, err := s.renameHash(obj) + if err != nil { + return + } + if hash == "" { + return + } + s.renameMapMu.Lock() + dsts := s.renameMap[hash] + for i, dst := range dsts { + if obj.Remote() == dst.Remote() { + // remove obj from list if found + dsts = append(dsts[:i], dsts[i+1:]...) + if len(dsts) > 0 { + s.renameMap[hash] = dsts + } else { + delete(s.renameMap, hash) + } + break + } + } + s.renameMapMu.Unlock() } // Syncs fsrc into fdst @@ -423,8 +575,8 @@ func (s *syncCopyMove) run() error { go s.readDstFiles() } - // If s.deleteBefore or s.trackRenames then we need to read the whole source map first - readSourceMap := s.deleteBefore || s.trackRenames + // If s.deleteBefore then we need to read the whole source map first + readSourceMap := s.deleteBefore if readSourceMap { // Read source files into the map @@ -444,10 +596,10 @@ func (s *syncCopyMove) run() error { } } - // Do renames if required - // Have dstFiles and srcFiles complete at this point + // Build the map of destination files by hash if required + // Have dstFiles complete at this point if s.trackRenames { - if err = s.renameFiles(); err != nil { + if err = s.makeRenameMap(); err != nil { return err } } @@ -470,9 +622,11 @@ func (s *syncCopyMove) run() error { // Start background checking and transferring pipeline s.startCheckers() + s.startRenamers() s.startTransfers() // Do the transfers + var renameCheck []Object for src := range s.srcFilesChan { remote := src.Remote() var dst Object @@ -486,20 +640,40 @@ func (s *syncCopyMove) run() error { } } } else { - dst = s.dstFiles[remote] - // Remove file from s.dstFiles because it exists in srcFiles - delete(s.dstFiles, remote) + s.dstFilesMu.Lock() + var ok bool + dst, ok = s.dstFiles[remote] + if ok { + // Remove file from s.dstFiles because it exists in srcFiles + delete(s.dstFiles, remote) + } + s.dstFilesMu.Unlock() + if ok && s.trackRenames { + // remove file from rename tracking also + s.delRenameMap(dst) + } } if dst != nil { s.toBeChecked <- ObjectPair{src, dst} + } else if s.trackRenames { + // save object until all matches transferred + renameCheck = append(renameCheck, src) } else { // No need to check since doesn't exist s.toBeUploaded <- ObjectPair{src, nil} } } + if s.trackRenames { + // Attempt renames for all the files which don't have a matching dst + for _, src := range renameCheck { + s.toBeRenamed <- ObjectPair{src, nil} + } + } + // Stop background checking and transferring pipeline s.stopCheckers() + s.stopRenamers() s.stopTransfers() // Retrieve the delayed error from the source listing goroutine @@ -513,20 +687,8 @@ func (s *syncCopyMove) run() error { err = s.deleteFiles(false) } } - - // Return errors in the precedence - // fatalErr - // error from above - // error from a copy - // noRetryErr s.processError(err) - if s.fatalErr != nil { - return s.fatalErr - } - if s.err != nil { - return s.err - } - return s.noRetryErr + return s.currentError() } // Sync fsrc into fdst diff --git a/fs/sync_test.go b/fs/sync_test.go index fd29c6234..486984b93 100644 --- a/fs/sync_test.go +++ b/fs/sync_test.go @@ -616,6 +616,12 @@ func TestSyncWithTrackRenames(t *testing.T) { }() + haveHash := r.fremote.Hashes().Overlap(r.flocal.Hashes()).GetOne() != fs.HashNone + _, canMove := r.fremote.(fs.Mover) + _, canCopy := r.fremote.(fs.Copier) + canTrackRenames := haveHash && (canMove || canCopy) + t.Logf("Can track renames: %v", canTrackRenames) + f1 := r.WriteFile("potato", "Potato Content", t1) f2 := r.WriteFile("yam", "Yam Content", t2) @@ -633,6 +639,11 @@ func TestSyncWithTrackRenames(t *testing.T) { fstest.CheckItems(t, r.fremote, f1, f2) + if canTrackRenames { + assert.Equal(t, fs.Stats.GetTransfers(), int64(0)) + } else { + assert.Equal(t, fs.Stats.GetTransfers(), int64(1)) + } } // Test a server side move if possible, or the backup path if not