Change --track-renames to use the length,hash pair stored in a map

This makes it much faster in the case of many files and use less
memory.

This also detects use of --no-traverse and disables it.
This commit is contained in:
Nick Craig-Wood 2017-01-03 17:35:12 +00:00
parent 274ab349f4
commit f1221b510b
3 changed files with 279 additions and 100 deletions

View file

@ -426,10 +426,16 @@ server side move, and the source and destination have a compatible
hash, then this will track renames during `sync`, `copy`, and `move` hash, then this will track renames during `sync`, `copy`, and `move`
operations and perform renaming server-side. operations and perform renaming server-side.
Files will be matched by size and hash - if both match then a rename
will be considered.
If the destination does not support server-side copy or move, rclone If the destination does not support server-side copy or move, rclone
will fall back to the default behaviour and log an error level message will fall back to the default behaviour and log an error level message
to the console. to the console.
Note that `--track-renames` is incompatible with `--no-traverse` and
that it uses extra memory to keep track of all the rename candidates.
### --delete-(before,during,after) ### ### --delete-(before,during,after) ###
This option allows you to specify when files on your destination are This option allows you to specify when files on your destination are

View file

@ -3,8 +3,8 @@
package fs package fs
import ( import (
"fmt"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -18,23 +18,29 @@ type syncCopyMove struct {
DoMove bool DoMove bool
dir string dir string
// internal state // internal state
noTraverse bool // if set don't trafevers the dst noTraverse bool // if set don't trafevers the dst
deleteBefore bool // set if we must delete objects before copying deleteBefore bool // set if we must delete objects before copying
trackRenames bool // set if we should do server side renames trackRenames bool // set if we should do server side renames
dstFiles map[string]Object // dst files, only used if Delete or trackRenames dstFilesMu sync.Mutex // protect dstFiles
srcFiles map[string]Object // src files, only used if deleteBefore or trackRenames dstFiles map[string]Object // dst files, always filled
srcFilesChan chan Object // passes src objects srcFiles map[string]Object // src files, only used if deleteBefore
srcFilesResult chan error // error result of src listing srcFilesChan chan Object // passes src objects
dstFilesResult chan error // error result of dst listing srcFilesResult chan error // error result of src listing
abort chan struct{} // signal to abort the copiers dstFilesResult chan error // error result of dst listing
checkerWg sync.WaitGroup // wait for checkers abort chan struct{} // signal to abort the copiers
toBeChecked ObjectPairChan // checkers channel checkerWg sync.WaitGroup // wait for checkers
transfersWg sync.WaitGroup // wait for transfers toBeChecked ObjectPairChan // checkers channel
toBeUploaded ObjectPairChan // copiers channel transfersWg sync.WaitGroup // wait for transfers
errorMu sync.Mutex // Mutex covering the errors variables toBeUploaded ObjectPairChan // copiers channel
err error // normal error from copy process errorMu sync.Mutex // Mutex covering the errors variables
noRetryErr error // error with NoRetry set err error // normal error from copy process
fatalErr error // fatal error noRetryErr error // error with NoRetry set
fatalErr error // fatal error
commonHash HashType // common hash type between src and dst
renameMapMu sync.Mutex // mutex to protect the below
renameMap map[string][]Object // dst files by hash - only used by trackRenames
renamerWg sync.WaitGroup // wait for renamers
toBeRenamed ObjectPairChan // renamers channel
} }
func newSyncCopyMove(fdst, fsrc Fs, Delete bool, DoMove bool) *syncCopyMove { func newSyncCopyMove(fdst, fsrc Fs, Delete bool, DoMove bool) *syncCopyMove {
@ -53,6 +59,8 @@ func newSyncCopyMove(fdst, fsrc Fs, Delete bool, DoMove bool) *syncCopyMove {
toBeUploaded: make(ObjectPairChan, Config.Transfers), toBeUploaded: make(ObjectPairChan, Config.Transfers),
deleteBefore: Delete && Config.DeleteBefore, deleteBefore: Delete && Config.DeleteBefore,
trackRenames: Config.TrackRenames, trackRenames: Config.TrackRenames,
commonHash: fsrc.Hashes().Overlap(fdst.Hashes()).GetOne(),
toBeRenamed: make(ObjectPairChan, Config.Transfers),
} }
if s.noTraverse && s.Delete { if s.noTraverse && s.Delete {
Debug(s.fdst, "Ignoring --no-traverse with sync") Debug(s.fdst, "Ignoring --no-traverse with sync")
@ -61,18 +69,22 @@ func newSyncCopyMove(fdst, fsrc Fs, Delete bool, DoMove bool) *syncCopyMove {
if s.trackRenames { if s.trackRenames {
// Don't track renames for remotes without server-side rename support. // Don't track renames for remotes without server-side rename support.
// Some remotes simulate rename by server-side copy and delete, so include // Some remotes simulate rename by server-side copy and delete, so include
// remotes that implements either Mover and Copier. // remotes that implements either Mover or Copier.
switch fdst.(type) { switch fdst.(type) {
case Mover, Copier: case Mover, Copier:
default: default:
ErrorLog(fdst, "Ignoring --track-renames as the destination does not support server-side move or copy") ErrorLog(fdst, "Ignoring --track-renames as the destination does not support server-side move or copy")
s.trackRenames = false s.trackRenames = false
} }
if fsrc.Hashes().Overlap(fdst.Hashes()).Count() == 0 { if s.commonHash == HashNone {
ErrorLog(fdst, "Ignoring --track-renames as the source and destination do not have a common hash") ErrorLog(fdst, "Ignoring --track-renames as the source and destination do not have a common hash")
s.trackRenames = false s.trackRenames = false
} }
} }
if s.noTraverse && s.trackRenames {
Debug(s.fdst, "Ignoring --no-traverse with --track-renames")
s.noTraverse = false
}
return s return s
} }
@ -86,23 +98,30 @@ func (s *syncCopyMove) aborting() bool {
return false return false
} }
// This reads the map and pumps it into the channel passed in, closing
// the channel at the end
func (s *syncCopyMove) pumpMapToChan(files map[string]Object, out chan<- Object) {
outer:
for _, o := range files {
if s.aborting() {
break outer
}
select {
case out <- o:
case <-s.abort:
break outer
}
}
close(out)
s.srcFilesResult <- nil
}
// This reads the source files from s.srcFiles into srcFilesChan then // This reads the source files from s.srcFiles into srcFilesChan then
// closes it // closes it
// //
// It returns the final result of the read into s.srcFilesResult // It returns the final result of the read into s.srcFilesResult
func (s *syncCopyMove) readSrcUsingMap() { func (s *syncCopyMove) readSrcUsingMap() {
outer: s.pumpMapToChan(s.srcFiles, s.srcFilesChan)
for _, o := range s.srcFiles {
if s.aborting() {
break outer
}
select {
case s.srcFilesChan <- o:
case <-s.abort:
break outer
}
}
close(s.srcFilesChan)
s.srcFilesResult <- nil s.srcFilesResult <- nil
} }
@ -213,6 +232,22 @@ func (s *syncCopyMove) processError(err error) {
} }
} }
// Returns the current error (if any) in the order of prececedence
// fatalErr
// normal error
// noRetryErr
func (s *syncCopyMove) currentError() error {
s.errorMu.Lock()
defer s.errorMu.Unlock()
if s.fatalErr != nil {
return s.fatalErr
}
if s.err != nil {
return s.err
}
return s.noRetryErr
}
// pairChecker reads Objects~s on in send to out if they need transferring. // pairChecker reads Objects~s on in send to out if they need transferring.
// //
// FIXME potentially doing lots of hashes at once // FIXME potentially doing lots of hashes at once
@ -248,6 +283,65 @@ func (s *syncCopyMove) pairChecker(in ObjectPairChan, out ObjectPairChan, wg *sy
} }
} }
// tryRename renames a src object when doing track renames if
// possible, it returns true if the object was renamed.
func (s *syncCopyMove) tryRename(src Object) bool {
Stats.Checking(src.Remote())
defer Stats.DoneChecking(src.Remote())
hash, err := s.renameHash(src)
if err != nil {
Debug(src, "Failed to read hash: %v", err)
return false
}
if hash == "" {
return false
}
dst := s.popRenameMap(hash)
if dst == nil {
return false
}
err = MoveFile(s.fdst, s.fdst, src.Remote(), dst.Remote())
if err != nil {
Debug(src, "Failed to rename to %q: %v", dst.Remote(), err)
return false
}
// remove file from dstFiles if present
s.dstFilesMu.Lock()
delete(s.dstFiles, dst.Remote())
s.dstFilesMu.Unlock()
Debug(src, "Renamed from %q", dst.Remote())
return true
}
// pairRenamer reads Objects~s on in and attempts to rename them,
// otherwise it sends them out if they need transferring.
func (s *syncCopyMove) pairRenamer(in ObjectPairChan, out ObjectPairChan, wg *sync.WaitGroup) {
defer wg.Done()
for {
if s.aborting() {
return
}
select {
case pair, ok := <-in:
if !ok {
return
}
src := pair.src
if !s.tryRename(src) {
// pass on if not renamed
out <- pair
}
case <-s.abort:
return
}
}
}
// pairCopyOrMove reads Objects on in and moves or copies them. // pairCopyOrMove reads Objects on in and moves or copies them.
func (s *syncCopyMove) pairCopyOrMove(in ObjectPairChan, fdst Fs, wg *sync.WaitGroup) { func (s *syncCopyMove) pairCopyOrMove(in ObjectPairChan, fdst Fs, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
@ -306,6 +400,27 @@ func (s *syncCopyMove) stopTransfers() {
s.transfersWg.Wait() s.transfersWg.Wait()
} }
// This starts the background renamers.
func (s *syncCopyMove) startRenamers() {
if !s.trackRenames {
return
}
s.renamerWg.Add(Config.Checkers)
for i := 0; i < Config.Checkers; i++ {
go s.pairRenamer(s.toBeRenamed, s.toBeUploaded, &s.renamerWg)
}
}
// This stops the background renamers
func (s *syncCopyMove) stopRenamers() {
if !s.trackRenames {
return
}
close(s.toBeRenamed)
Log(s.fdst, "Waiting for renames to finish")
s.renamerWg.Wait()
}
// This deletes the files in the dstFiles map. If checkSrcMap is set // This deletes the files in the dstFiles map. If checkSrcMap is set
// then it checks to see if they exist first in srcFiles the source // then it checks to see if they exist first in srcFiles the source
// file map, otherwise it unconditionally deletes them. If // file map, otherwise it unconditionally deletes them. If
@ -337,67 +452,104 @@ func (s *syncCopyMove) deleteFiles(checkSrcMap bool) error {
return DeleteFiles(toDelete) return DeleteFiles(toDelete)
} }
func (s *syncCopyMove) renameFiles() error { // renameHash makes a string with the size and the hash for rename detection
//
toRename := make(ObjectPairChan, Config.Transfers) // it may return an empty string in which case no hash could be made
func (s *syncCopyMove) renameHash(obj Object) (hash string, err error) {
for srcRemote, srcObject := range s.srcFiles { hash, err = obj.Hash(s.commonHash)
if _, exists := s.dstFiles[srcRemote]; exists { if err != nil {
continue return hash, err
}
if s.aborting() {
return nil
}
for dstRemote, dstObject := range s.dstFiles {
if _, exists := s.srcFiles[dstRemote]; exists {
continue
}
// At this point, if the files are equal, this is a rename.
if equal(srcObject, dstObject, false, true) {
toRename <- ObjectPair{srcObject, dstObject}
break
}
}
} }
if hash == "" {
return hash, nil
}
return fmt.Sprintf("%d,%s", obj.Size(), hash), nil
}
close(toRename) // makeRenameMap builds a map of the destination files by hash
func (s *syncCopyMove) makeRenameMap() error {
Debug(s.fdst, "Making map for --track-renames")
var ( s.renameMap = make(map[string][]Object)
wg sync.WaitGroup in := make(chan Object, Config.Checkers)
filesMu sync.Mutex go s.pumpMapToChan(s.dstFiles, in)
errorCount int32
)
var wg sync.WaitGroup
wg.Add(Config.Transfers) wg.Add(Config.Transfers)
for i := 0; i < Config.Transfers; i++ { for i := 0; i < Config.Transfers; i++ {
go func() { go func() {
defer wg.Done() defer wg.Done()
for pair := range toRename { for {
Debug(nil, "Rename %q to %q", pair.dst.Remote(), pair.src.Remote()) if s.aborting() {
return
err := MoveFile(s.fdst, s.fdst, pair.src.Remote(), pair.dst.Remote()) }
if err != nil { select {
atomic.AddInt32(&errorCount, 1) case obj, ok := <-in:
continue if !ok {
return
}
Stats.Checking(obj.Remote())
hash, err := s.renameHash(obj)
Stats.DoneChecking(obj.Remote())
if err != nil {
s.processError(err)
} else if hash != "" {
s.renameMapMu.Lock()
s.renameMap[hash] = append(s.renameMap[hash], obj)
s.renameMapMu.Unlock()
}
case <-s.abort:
return
} }
filesMu.Lock()
delete(s.dstFiles, pair.dst.Remote())
delete(s.srcFiles, pair.src.Remote())
filesMu.Unlock()
} }
}() }()
} }
Log(nil, "Waiting for renames to finish")
wg.Wait() wg.Wait()
if errorCount > 0 { Debug(s.fdst, "Finished making map for --track-renames")
return errors.Errorf("failed to rename %d files", errorCount) return s.currentError()
} }
return nil // popRenameMap finds the object with hash and pop the first match from
// renameMap or returns nil if not found.
func (s *syncCopyMove) popRenameMap(hash string) (dst Object) {
s.renameMapMu.Lock()
dsts, ok := s.renameMap[hash]
if ok && len(dsts) > 0 {
dst, dsts = dsts[0], dsts[1:]
if len(dsts) > 0 {
s.renameMap[hash] = dsts
} else {
delete(s.renameMap, hash)
}
}
s.renameMapMu.Unlock()
return dst
}
// delRenameMap removes obj from renameMap
func (s *syncCopyMove) delRenameMap(obj Object) {
hash, err := s.renameHash(obj)
if err != nil {
return
}
if hash == "" {
return
}
s.renameMapMu.Lock()
dsts := s.renameMap[hash]
for i, dst := range dsts {
if obj.Remote() == dst.Remote() {
// remove obj from list if found
dsts = append(dsts[:i], dsts[i+1:]...)
if len(dsts) > 0 {
s.renameMap[hash] = dsts
} else {
delete(s.renameMap, hash)
}
break
}
}
s.renameMapMu.Unlock()
} }
// Syncs fsrc into fdst // Syncs fsrc into fdst
@ -423,8 +575,8 @@ func (s *syncCopyMove) run() error {
go s.readDstFiles() go s.readDstFiles()
} }
// If s.deleteBefore or s.trackRenames then we need to read the whole source map first // If s.deleteBefore then we need to read the whole source map first
readSourceMap := s.deleteBefore || s.trackRenames readSourceMap := s.deleteBefore
if readSourceMap { if readSourceMap {
// Read source files into the map // Read source files into the map
@ -444,10 +596,10 @@ func (s *syncCopyMove) run() error {
} }
} }
// Do renames if required // Build the map of destination files by hash if required
// Have dstFiles and srcFiles complete at this point // Have dstFiles complete at this point
if s.trackRenames { if s.trackRenames {
if err = s.renameFiles(); err != nil { if err = s.makeRenameMap(); err != nil {
return err return err
} }
} }
@ -470,9 +622,11 @@ func (s *syncCopyMove) run() error {
// Start background checking and transferring pipeline // Start background checking and transferring pipeline
s.startCheckers() s.startCheckers()
s.startRenamers()
s.startTransfers() s.startTransfers()
// Do the transfers // Do the transfers
var renameCheck []Object
for src := range s.srcFilesChan { for src := range s.srcFilesChan {
remote := src.Remote() remote := src.Remote()
var dst Object var dst Object
@ -486,20 +640,40 @@ func (s *syncCopyMove) run() error {
} }
} }
} else { } else {
dst = s.dstFiles[remote] s.dstFilesMu.Lock()
// Remove file from s.dstFiles because it exists in srcFiles var ok bool
delete(s.dstFiles, remote) dst, ok = s.dstFiles[remote]
if ok {
// Remove file from s.dstFiles because it exists in srcFiles
delete(s.dstFiles, remote)
}
s.dstFilesMu.Unlock()
if ok && s.trackRenames {
// remove file from rename tracking also
s.delRenameMap(dst)
}
} }
if dst != nil { if dst != nil {
s.toBeChecked <- ObjectPair{src, dst} s.toBeChecked <- ObjectPair{src, dst}
} else if s.trackRenames {
// save object until all matches transferred
renameCheck = append(renameCheck, src)
} else { } else {
// No need to check since doesn't exist // No need to check since doesn't exist
s.toBeUploaded <- ObjectPair{src, nil} s.toBeUploaded <- ObjectPair{src, nil}
} }
} }
if s.trackRenames {
// Attempt renames for all the files which don't have a matching dst
for _, src := range renameCheck {
s.toBeRenamed <- ObjectPair{src, nil}
}
}
// Stop background checking and transferring pipeline // Stop background checking and transferring pipeline
s.stopCheckers() s.stopCheckers()
s.stopRenamers()
s.stopTransfers() s.stopTransfers()
// Retrieve the delayed error from the source listing goroutine // Retrieve the delayed error from the source listing goroutine
@ -513,20 +687,8 @@ func (s *syncCopyMove) run() error {
err = s.deleteFiles(false) err = s.deleteFiles(false)
} }
} }
// Return errors in the precedence
// fatalErr
// error from above
// error from a copy
// noRetryErr
s.processError(err) s.processError(err)
if s.fatalErr != nil { return s.currentError()
return s.fatalErr
}
if s.err != nil {
return s.err
}
return s.noRetryErr
} }
// Sync fsrc into fdst // Sync fsrc into fdst

View file

@ -616,6 +616,12 @@ func TestSyncWithTrackRenames(t *testing.T) {
}() }()
haveHash := r.fremote.Hashes().Overlap(r.flocal.Hashes()).GetOne() != fs.HashNone
_, canMove := r.fremote.(fs.Mover)
_, canCopy := r.fremote.(fs.Copier)
canTrackRenames := haveHash && (canMove || canCopy)
t.Logf("Can track renames: %v", canTrackRenames)
f1 := r.WriteFile("potato", "Potato Content", t1) f1 := r.WriteFile("potato", "Potato Content", t1)
f2 := r.WriteFile("yam", "Yam Content", t2) f2 := r.WriteFile("yam", "Yam Content", t2)
@ -633,6 +639,11 @@ func TestSyncWithTrackRenames(t *testing.T) {
fstest.CheckItems(t, r.fremote, f1, f2) fstest.CheckItems(t, r.fremote, f1, f2)
if canTrackRenames {
assert.Equal(t, fs.Stats.GetTransfers(), int64(0))
} else {
assert.Equal(t, fs.Stats.GetTransfers(), int64(1))
}
} }
// Test a server side move if possible, or the backup path if not // Test a server side move if possible, or the backup path if not