sync: Track and perform server-side renames

This commits adds support for tracking of file renames if `track-renames` flag is set,
and it then performs server-side renames for remotes that support it, i.e.
remotes that implement either the `Mover` or the `Copier` interface.

Fixes #888
This commit is contained in:
Bjørn Erik Pedersen 2016-12-18 11:03:56 +01:00 committed by Nick Craig-Wood
parent 5c89fd679d
commit 47d3a450a4
5 changed files with 156 additions and 11 deletions

View file

@ -82,6 +82,7 @@ var (
deleteBefore = pflag.BoolP("delete-before", "", false, "When synchronizing, delete files on destination before transfering") deleteBefore = pflag.BoolP("delete-before", "", false, "When synchronizing, delete files on destination before transfering")
deleteDuring = pflag.BoolP("delete-during", "", false, "When synchronizing, delete files during transfer (default)") deleteDuring = pflag.BoolP("delete-during", "", false, "When synchronizing, delete files during transfer (default)")
deleteAfter = pflag.BoolP("delete-after", "", false, "When synchronizing, delete files on destination after transfering") deleteAfter = pflag.BoolP("delete-after", "", false, "When synchronizing, delete files on destination after transfering")
trackRenames = pflag.BoolP("track-renames", "", false, "When synchronizing, track file renames and do a server side move if possible")
lowLevelRetries = pflag.IntP("low-level-retries", "", 10, "Number of low level retries to do.") lowLevelRetries = pflag.IntP("low-level-retries", "", 10, "Number of low level retries to do.")
updateOlder = pflag.BoolP("update", "u", false, "Skip files that are newer on the destination.") updateOlder = pflag.BoolP("update", "u", false, "Skip files that are newer on the destination.")
noGzip = pflag.BoolP("no-gzip-encoding", "", false, "Don't set Accept-Encoding: gzip.") noGzip = pflag.BoolP("no-gzip-encoding", "", false, "Don't set Accept-Encoding: gzip.")
@ -294,6 +295,7 @@ type ConfigInfo struct {
DeleteBefore bool // Delete before checking DeleteBefore bool // Delete before checking
DeleteDuring bool // Delete during checking/transfer DeleteDuring bool // Delete during checking/transfer
DeleteAfter bool // Delete after successful transfer. DeleteAfter bool // Delete after successful transfer.
TrackRenames bool // Track file renames.
LowLevelRetries int LowLevelRetries int
UpdateOlder bool // Skip files that are newer on the destination UpdateOlder bool // Skip files that are newer on the destination
NoGzip bool // Disable compression NoGzip bool // Disable compression
@ -360,6 +362,8 @@ func LoadConfig() {
Config.DeleteDuring = *deleteDuring Config.DeleteDuring = *deleteDuring
Config.DeleteAfter = *deleteAfter Config.DeleteAfter = *deleteAfter
Config.TrackRenames = *trackRenames
switch { switch {
case *deleteBefore && (*deleteDuring || *deleteAfter), case *deleteBefore && (*deleteDuring || *deleteAfter),
*deleteDuring && *deleteAfter: *deleteDuring && *deleteAfter:

View file

@ -109,13 +109,17 @@ func CheckHashes(src, dst Object) (equal bool, hash HashType, err error) {
// Otherwise the file is considered to be not equal including if there // Otherwise the file is considered to be not equal including if there
// were errors reading info. // were errors reading info.
func Equal(src, dst Object) bool { func Equal(src, dst Object) bool {
return equal(src, dst, Config.SizeOnly, Config.CheckSum)
}
func equal(src, dst Object, sizeOnly, checkSum bool) bool {
if !Config.IgnoreSize { if !Config.IgnoreSize {
if src.Size() != dst.Size() { if src.Size() != dst.Size() {
Debug(src, "Sizes differ") Debug(src, "Sizes differ")
return false return false
} }
} }
if Config.SizeOnly { if sizeOnly {
Debug(src, "Sizes identical") Debug(src, "Sizes identical")
return true return true
} }
@ -123,7 +127,7 @@ func Equal(src, dst Object) bool {
// Assert: Size is equal or being ignored // Assert: Size is equal or being ignored
// If checking checksum and not modtime // If checking checksum and not modtime
if Config.CheckSum { if checkSum {
// Check the hash // Check the hash
same, hash, _ := CheckHashes(src, dst) same, hash, _ := CheckHashes(src, dst)
if !same { if !same {

View file

@ -168,6 +168,19 @@ func NewRun(t *testing.T) *Run {
return r return r
} }
// Rename a file in local
func (r *Run) RenameFile(item fstest.Item, newpath string) fstest.Item {
oldFilepath := path.Join(r.localName, item.Path)
newFilepath := path.Join(r.localName, newpath)
if err := os.Rename(oldFilepath, newFilepath); err != nil {
r.Fatalf("Failed to rename file from %q to %q: %v", item.Path, newpath, err)
}
item.Path = newpath
return item
}
// Write a file to local // Write a file to local
func (r *Run) WriteFile(filePath, content string, t time.Time) fstest.Item { func (r *Run) WriteFile(filePath, content string, t time.Time) fstest.Item {
item := fstest.NewItem(filePath, content, t) item := fstest.NewItem(filePath, content, t)

View file

@ -4,6 +4,7 @@ package fs
import ( import (
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -19,8 +20,9 @@ type syncCopyMove struct {
// internal state // internal state
noTraverse bool // if set don't trafevers the dst noTraverse bool // if set don't trafevers the dst
deleteBefore bool // set if we must delete objects before copying deleteBefore bool // set if we must delete objects before copying
dstFiles map[string]Object // dst files, only used if Delete trackRenames bool // set if we should do server side renames
srcFiles map[string]Object // src files, only used if deleteBefore dstFiles map[string]Object // dst files, only used if Delete or trackRenames
srcFiles map[string]Object // src files, only used if deleteBefore or trackRenames
srcFilesChan chan Object // passes src objects srcFilesChan chan Object // passes src objects
srcFilesResult chan error // error result of src listing srcFilesResult chan error // error result of src listing
dstFilesResult chan error // error result of dst listing dstFilesResult chan error // error result of dst listing
@ -36,6 +38,20 @@ type syncCopyMove struct {
} }
func newSyncCopyMove(fdst, fsrc Fs, Delete bool, DoMove bool) *syncCopyMove { func newSyncCopyMove(fdst, fsrc Fs, Delete bool, DoMove bool) *syncCopyMove {
// Don't track renames for remotes without server-side rename support.
// Some remotes simulate rename by server-side copy and delete, so include
// remotes that implements either Mover and Copier.
var canMove bool
switch fdst.(type) {
case Mover, Copier:
canMove = true
}
if !canMove && Config.TrackRenames {
ErrorLog(nil, "track-renames flag is set, but the destination %q does not support server-side moves", fdst.Name())
}
s := &syncCopyMove{ s := &syncCopyMove{
fdst: fdst, fdst: fdst,
fsrc: fsrc, fsrc: fsrc,
@ -50,6 +66,7 @@ func newSyncCopyMove(fdst, fsrc Fs, Delete bool, DoMove bool) *syncCopyMove {
toBeChecked: make(ObjectPairChan, Config.Transfers), toBeChecked: make(ObjectPairChan, Config.Transfers),
toBeUploaded: make(ObjectPairChan, Config.Transfers), toBeUploaded: make(ObjectPairChan, Config.Transfers),
deleteBefore: Delete && Config.DeleteBefore, deleteBefore: Delete && Config.DeleteBefore,
trackRenames: canMove && Config.TrackRenames,
} }
if s.noTraverse && s.Delete { if s.noTraverse && s.Delete {
Debug(s.fdst, "Ignoring --no-traverse with sync") Debug(s.fdst, "Ignoring --no-traverse with sync")
@ -320,6 +337,69 @@ func (s *syncCopyMove) deleteFiles(checkSrcMap bool) error {
return DeleteFiles(toDelete) return DeleteFiles(toDelete)
} }
func (s *syncCopyMove) renameFiles() error {
toRename := make(ObjectPairChan, Config.Transfers)
for srcRemote, srcObject := range s.srcFiles {
if _, exists := s.dstFiles[srcRemote]; exists {
continue
}
if s.aborting() {
return nil
}
for dstRemote, dstObject := range s.dstFiles {
if _, exists := s.srcFiles[dstRemote]; exists {
continue
}
// At this point, if the files are equal, this is a rename.
if equal(srcObject, dstObject, false, true) {
toRename <- ObjectPair{srcObject, dstObject}
break
}
}
}
close(toRename)
var (
wg sync.WaitGroup
filesMu sync.Mutex
errorCount int32
)
wg.Add(Config.Transfers)
for i := 0; i < Config.Transfers; i++ {
go func() {
defer wg.Done()
for pair := range toRename {
Debug(nil, "Rename %q to %q", pair.dst.Remote(), pair.src.Remote())
err := MoveFile(s.fdst, s.fdst, pair.src.Remote(), pair.dst.Remote())
if err != nil {
atomic.AddInt32(&errorCount, 1)
continue
}
filesMu.Lock()
delete(s.dstFiles, pair.dst.Remote())
delete(s.srcFiles, pair.src.Remote())
filesMu.Unlock()
}
}()
}
Log(nil, "Waiting for renames to finish")
wg.Wait()
if errorCount > 0 {
return errors.Errorf("failed to rename %d files", errorCount)
}
return nil
}
// Syncs fsrc into fdst // Syncs fsrc into fdst
// //
// If Delete is true then it deletes any files in fdst that aren't in fsrc // If Delete is true then it deletes any files in fdst that aren't in fsrc
@ -343,17 +423,16 @@ func (s *syncCopyMove) run() error {
go s.readDstFiles() go s.readDstFiles()
} }
// If s.deleteBefore then we need to read the whole source map first // If s.deleteBefore or s.trackRenames then we need to read the whole source map first
if s.deleteBefore { readSourceMap := s.deleteBefore || s.trackRenames
if readSourceMap {
// Read source files into the map // Read source files into the map
s.srcFiles, err = readFilesMap(s.fsrc, false, s.dir) s.srcFiles, err = readFilesMap(s.fsrc, false, s.dir)
if err != nil { if err != nil {
return err return err
} }
// Pump the map into s.srcFilesChan
go s.readSrcUsingMap()
} else {
go s.readSrcUsingChan()
} }
// Wait for dstfiles to finish reading if we were reading them // Wait for dstfiles to finish reading if we were reading them
@ -365,8 +444,15 @@ func (s *syncCopyMove) run() error {
} }
} }
// Delete files first if required // Do renames if required
// Have dstFiles and srcFiles complete at this point // Have dstFiles and srcFiles complete at this point
if s.trackRenames {
if err = s.renameFiles(); err != nil {
return err
}
}
// Delete files first if required
if s.deleteBefore { if s.deleteBefore {
err = s.deleteFiles(true) err = s.deleteFiles(true)
if err != nil { if err != nil {
@ -374,6 +460,14 @@ func (s *syncCopyMove) run() error {
} }
} }
// Now we can fill the src channel.
if readSourceMap {
// Pump the map into s.srcFilesChan
go s.readSrcUsingMap()
} else {
go s.readSrcUsingChan()
}
// Start background checking and transferring pipeline // Start background checking and transferring pipeline
s.startCheckers() s.startCheckers()
s.startTransfers() s.startTransfers()

View file

@ -605,6 +605,36 @@ func TestSyncWithUpdateOlder(t *testing.T) {
fstest.CheckItems(t, r.fremote, oneO, twoF, threeO, fourF, fiveF) fstest.CheckItems(t, r.fremote, oneO, twoF, threeO, fourF, fiveF)
} }
// Test with TrackRenames set
func TestSyncWithTrackRenames(t *testing.T) {
r := NewRun(t)
defer r.Finalise()
fs.Config.TrackRenames = true
defer func() {
fs.Config.TrackRenames = false
}()
f1 := r.WriteFile("potato", "Potato Content", t1)
f2 := r.WriteFile("yam", "Yam Content", t2)
fs.Stats.ResetCounters()
require.NoError(t, fs.Sync(r.fremote, r.flocal))
fstest.CheckItems(t, r.fremote, f1, f2)
fstest.CheckItems(t, r.flocal, f1, f2)
// Now rename locally.
f2 = r.RenameFile(f2, "yaml")
fs.Stats.ResetCounters()
require.NoError(t, fs.Sync(r.fremote, r.flocal))
fstest.CheckItems(t, r.fremote, f1, f2)
}
// Test a server side move if possible, or the backup path if not // Test a server side move if possible, or the backup path if not
func testServerSideMove(t *testing.T, r *Run, fremoteMove fs.Fs, withFilter bool) { func testServerSideMove(t *testing.T, r *Run, fremoteMove fs.Fs, withFilter bool) {
file1 := r.WriteBoth("potato2", "------------------------------------------------------------", t1) file1 := r.WriteBoth("potato2", "------------------------------------------------------------", t1)