diff --git a/fs/config.go b/fs/config.go index 9f232cf8e..f9ff59586 100644 --- a/fs/config.go +++ b/fs/config.go @@ -82,6 +82,7 @@ var ( deleteBefore = pflag.BoolP("delete-before", "", false, "When synchronizing, delete files on destination before transfering") deleteDuring = pflag.BoolP("delete-during", "", false, "When synchronizing, delete files during transfer (default)") deleteAfter = pflag.BoolP("delete-after", "", false, "When synchronizing, delete files on destination after transfering") + trackRenames = pflag.BoolP("track-renames", "", false, "When synchronizing, track file renames and do a server side move if possible") lowLevelRetries = pflag.IntP("low-level-retries", "", 10, "Number of low level retries to do.") updateOlder = pflag.BoolP("update", "u", false, "Skip files that are newer on the destination.") noGzip = pflag.BoolP("no-gzip-encoding", "", false, "Don't set Accept-Encoding: gzip.") @@ -294,6 +295,7 @@ type ConfigInfo struct { DeleteBefore bool // Delete before checking DeleteDuring bool // Delete during checking/transfer DeleteAfter bool // Delete after successful transfer. + TrackRenames bool // Track file renames. LowLevelRetries int UpdateOlder bool // Skip files that are newer on the destination NoGzip bool // Disable compression @@ -360,6 +362,8 @@ func LoadConfig() { Config.DeleteDuring = *deleteDuring Config.DeleteAfter = *deleteAfter + Config.TrackRenames = *trackRenames + switch { case *deleteBefore && (*deleteDuring || *deleteAfter), *deleteDuring && *deleteAfter: diff --git a/fs/operations.go b/fs/operations.go index 16c479e33..985420a01 100644 --- a/fs/operations.go +++ b/fs/operations.go @@ -109,13 +109,17 @@ func CheckHashes(src, dst Object) (equal bool, hash HashType, err error) { // Otherwise the file is considered to be not equal including if there // were errors reading info. func Equal(src, dst Object) bool { + return equal(src, dst, Config.SizeOnly, Config.CheckSum) +} + +func equal(src, dst Object, sizeOnly, checkSum bool) bool { if !Config.IgnoreSize { if src.Size() != dst.Size() { Debug(src, "Sizes differ") return false } } - if Config.SizeOnly { + if sizeOnly { Debug(src, "Sizes identical") return true } @@ -123,7 +127,7 @@ func Equal(src, dst Object) bool { // Assert: Size is equal or being ignored // If checking checksum and not modtime - if Config.CheckSum { + if checkSum { // Check the hash same, hash, _ := CheckHashes(src, dst) if !same { diff --git a/fs/operations_test.go b/fs/operations_test.go index 260b4f0c8..def431397 100644 --- a/fs/operations_test.go +++ b/fs/operations_test.go @@ -168,6 +168,19 @@ func NewRun(t *testing.T) *Run { return r } +// Rename a file in local +func (r *Run) RenameFile(item fstest.Item, newpath string) fstest.Item { + oldFilepath := path.Join(r.localName, item.Path) + newFilepath := path.Join(r.localName, newpath) + if err := os.Rename(oldFilepath, newFilepath); err != nil { + r.Fatalf("Failed to rename file from %q to %q: %v", item.Path, newpath, err) + } + + item.Path = newpath + + return item +} + // Write a file to local func (r *Run) WriteFile(filePath, content string, t time.Time) fstest.Item { item := fstest.NewItem(filePath, content, t) diff --git a/fs/sync.go b/fs/sync.go index 6e9138951..2a64f6b4e 100644 --- a/fs/sync.go +++ b/fs/sync.go @@ -4,6 +4,7 @@ package fs import ( "sync" + "sync/atomic" "time" "github.com/pkg/errors" @@ -19,8 +20,9 @@ type syncCopyMove struct { // internal state noTraverse bool // if set don't trafevers the dst deleteBefore bool // set if we must delete objects before copying - dstFiles map[string]Object // dst files, only used if Delete - srcFiles map[string]Object // src files, only used if deleteBefore + trackRenames bool // set if we should do server side renames + dstFiles map[string]Object // dst files, only used if Delete or trackRenames + srcFiles map[string]Object // src files, only used if deleteBefore or trackRenames srcFilesChan chan Object // passes src objects srcFilesResult chan error // error result of src listing dstFilesResult chan error // error result of dst listing @@ -36,6 +38,20 @@ type syncCopyMove struct { } func newSyncCopyMove(fdst, fsrc Fs, Delete bool, DoMove bool) *syncCopyMove { + + // Don't track renames for remotes without server-side rename support. + // Some remotes simulate rename by server-side copy and delete, so include + // remotes that implements either Mover and Copier. + var canMove bool + switch fdst.(type) { + case Mover, Copier: + canMove = true + } + + if !canMove && Config.TrackRenames { + ErrorLog(nil, "track-renames flag is set, but the destination %q does not support server-side moves", fdst.Name()) + } + s := &syncCopyMove{ fdst: fdst, fsrc: fsrc, @@ -50,6 +66,7 @@ func newSyncCopyMove(fdst, fsrc Fs, Delete bool, DoMove bool) *syncCopyMove { toBeChecked: make(ObjectPairChan, Config.Transfers), toBeUploaded: make(ObjectPairChan, Config.Transfers), deleteBefore: Delete && Config.DeleteBefore, + trackRenames: canMove && Config.TrackRenames, } if s.noTraverse && s.Delete { Debug(s.fdst, "Ignoring --no-traverse with sync") @@ -320,6 +337,69 @@ func (s *syncCopyMove) deleteFiles(checkSrcMap bool) error { return DeleteFiles(toDelete) } +func (s *syncCopyMove) renameFiles() error { + + toRename := make(ObjectPairChan, Config.Transfers) + + for srcRemote, srcObject := range s.srcFiles { + if _, exists := s.dstFiles[srcRemote]; exists { + continue + } + + if s.aborting() { + return nil + } + + for dstRemote, dstObject := range s.dstFiles { + if _, exists := s.srcFiles[dstRemote]; exists { + continue + } + + // At this point, if the files are equal, this is a rename. + if equal(srcObject, dstObject, false, true) { + toRename <- ObjectPair{srcObject, dstObject} + break + } + } + } + + close(toRename) + + var ( + wg sync.WaitGroup + filesMu sync.Mutex + errorCount int32 + ) + + wg.Add(Config.Transfers) + for i := 0; i < Config.Transfers; i++ { + go func() { + defer wg.Done() + for pair := range toRename { + Debug(nil, "Rename %q to %q", pair.dst.Remote(), pair.src.Remote()) + + err := MoveFile(s.fdst, s.fdst, pair.src.Remote(), pair.dst.Remote()) + if err != nil { + atomic.AddInt32(&errorCount, 1) + continue + } + + filesMu.Lock() + delete(s.dstFiles, pair.dst.Remote()) + delete(s.srcFiles, pair.src.Remote()) + filesMu.Unlock() + } + }() + } + Log(nil, "Waiting for renames to finish") + wg.Wait() + if errorCount > 0 { + return errors.Errorf("failed to rename %d files", errorCount) + } + + return nil +} + // Syncs fsrc into fdst // // If Delete is true then it deletes any files in fdst that aren't in fsrc @@ -343,17 +423,16 @@ func (s *syncCopyMove) run() error { go s.readDstFiles() } - // If s.deleteBefore then we need to read the whole source map first - if s.deleteBefore { + // If s.deleteBefore or s.trackRenames then we need to read the whole source map first + readSourceMap := s.deleteBefore || s.trackRenames + + if readSourceMap { // Read source files into the map s.srcFiles, err = readFilesMap(s.fsrc, false, s.dir) if err != nil { return err } - // Pump the map into s.srcFilesChan - go s.readSrcUsingMap() - } else { - go s.readSrcUsingChan() + } // Wait for dstfiles to finish reading if we were reading them @@ -365,8 +444,15 @@ func (s *syncCopyMove) run() error { } } - // Delete files first if required + // Do renames if required // Have dstFiles and srcFiles complete at this point + if s.trackRenames { + if err = s.renameFiles(); err != nil { + return err + } + } + + // Delete files first if required if s.deleteBefore { err = s.deleteFiles(true) if err != nil { @@ -374,6 +460,14 @@ func (s *syncCopyMove) run() error { } } + // Now we can fill the src channel. + if readSourceMap { + // Pump the map into s.srcFilesChan + go s.readSrcUsingMap() + } else { + go s.readSrcUsingChan() + } + // Start background checking and transferring pipeline s.startCheckers() s.startTransfers() diff --git a/fs/sync_test.go b/fs/sync_test.go index 06d9467d7..fd29c6234 100644 --- a/fs/sync_test.go +++ b/fs/sync_test.go @@ -605,6 +605,36 @@ func TestSyncWithUpdateOlder(t *testing.T) { fstest.CheckItems(t, r.fremote, oneO, twoF, threeO, fourF, fiveF) } +// Test with TrackRenames set +func TestSyncWithTrackRenames(t *testing.T) { + r := NewRun(t) + defer r.Finalise() + + fs.Config.TrackRenames = true + defer func() { + fs.Config.TrackRenames = false + + }() + + f1 := r.WriteFile("potato", "Potato Content", t1) + f2 := r.WriteFile("yam", "Yam Content", t2) + + fs.Stats.ResetCounters() + require.NoError(t, fs.Sync(r.fremote, r.flocal)) + + fstest.CheckItems(t, r.fremote, f1, f2) + fstest.CheckItems(t, r.flocal, f1, f2) + + // Now rename locally. + f2 = r.RenameFile(f2, "yaml") + + fs.Stats.ResetCounters() + require.NoError(t, fs.Sync(r.fremote, r.flocal)) + + fstest.CheckItems(t, r.fremote, f1, f2) + +} + // Test a server side move if possible, or the backup path if not func testServerSideMove(t *testing.T, r *Run, fremoteMove fs.Fs, withFilter bool) { file1 := r.WriteBoth("potato2", "------------------------------------------------------------", t1)