From d6a5bfe2d4e4a4ad2e6c0f50b8e30b77801191d8 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Fri, 18 Apr 2014 16:34:59 +0100 Subject: [PATCH] Get rid of fs.CopyFs and replace with fs.Sync in preparation for Object.Update --- fs/operations.go | 104 +++++++++++++-------------------------- rclone.go | 4 +- rclonetest/rclonetest.go | 10 ++-- 3 files changed, 40 insertions(+), 78 deletions(-) diff --git a/fs/operations.go b/fs/operations.go index 02930b186..6fcd16d9c 100644 --- a/fs/operations.go +++ b/fs/operations.go @@ -77,7 +77,7 @@ func Equal(src, dst Object) bool { if dt >= ModifyWindow || dt <= -ModifyWindow { Debug(src, "Modification times differ by %s: %v, %v", dt, srcModTime, dstModTime) } else { - Debug(src, "Size and modification time differ by %s (within %s)", dt, ModifyWindow) + Debug(src, "Size and modification time the same (differ by %s, within tolerance %s)", dt, ModifyWindow) return true } @@ -129,10 +129,11 @@ func Copy(f Fs, src Object) { } // Check to see if src needs to be copied to dst and if so puts it in out -func checkOne(src, dst Object, out ObjectsChan) { +func checkOne(pair ObjectPair, out ObjectPairChan) { + src, dst := pair.src, pair.dst if dst == nil { - Debug(src, "Couldn't find local file - download") - out <- src + Debug(src, "Couldn't find file - need to transfer") + out <- pair return } // Check to see if can store this @@ -144,77 +145,33 @@ func checkOne(src, dst Object, out ObjectsChan) { Debug(src, "Unchanged skipping") return } - out <- src + out <- pair } // Read FsObjects~s on in send to out if they need uploading // // FIXME potentially doing lots of MD5SUMS at once -func PairChecker(in ObjectPairChan, out ObjectsChan, wg *sync.WaitGroup) { +func PairChecker(in ObjectPairChan, out ObjectPairChan, wg *sync.WaitGroup) { defer wg.Done() for pair := range in { src := pair.src Stats.Checking(src) - checkOne(src, pair.dst, out) - Stats.DoneChecking(src) - } -} - -// Read FsObjects~s on in send to out if they need uploading -// -// FIXME potentially doing lots of MD5SUMS at once -func Checker(in, out ObjectsChan, fdst Fs, wg *sync.WaitGroup) { - defer wg.Done() - for src := range in { - Stats.Checking(src) - dst := fdst.NewFsObject(src.Remote()) - checkOne(src, dst, out) + checkOne(pair, out) Stats.DoneChecking(src) } } // Read FsObjects on in and copy them -func Copier(in ObjectsChan, fdst Fs, wg *sync.WaitGroup) { +func Copier(in ObjectPairChan, fdst Fs, wg *sync.WaitGroup) { defer wg.Done() - for src := range in { + for pair := range in { + src := pair.src Stats.Transferring(src) Copy(fdst, src) Stats.DoneTransferring(src) } } -// Copies fsrc into fdst -func CopyFs(fdst, fsrc Fs) error { - err := fdst.Mkdir() - if err != nil { - Stats.Error() - return err - } - - to_be_checked := fsrc.List() - to_be_uploaded := make(ObjectsChan, Config.Transfers) - - var checkerWg sync.WaitGroup - checkerWg.Add(Config.Checkers) - for i := 0; i < Config.Checkers; i++ { - go Checker(to_be_checked, to_be_uploaded, fdst, &checkerWg) - } - - var copierWg sync.WaitGroup - copierWg.Add(Config.Transfers) - for i := 0; i < Config.Transfers; i++ { - go Copier(to_be_uploaded, fdst, &copierWg) - } - - Log(fdst, "Waiting for checks to finish") - checkerWg.Wait() - close(to_be_uploaded) - Log(fdst, "Waiting for transfers to finish") - copierWg.Wait() - - return nil -} - // Delete all the files passed in the channel func DeleteFiles(to_be_deleted ObjectsChan) { var wg sync.WaitGroup @@ -247,7 +204,9 @@ func DeleteFiles(to_be_deleted ObjectsChan) { } // Syncs fsrc into fdst -func Sync(fdst, fsrc Fs) error { +// +// If Delete is true then it deletes any files in fdst that aren't in fsrc +func Sync(fdst, fsrc Fs, Delete bool) error { err := fdst.Mkdir() if err != nil { Stats.Error() @@ -265,7 +224,7 @@ func Sync(fdst, fsrc Fs) error { // Read source files checking them off against dest files to_be_checked := make(ObjectPairChan, Config.Transfers) - to_be_uploaded := make(ObjectsChan, Config.Transfers) + to_be_uploaded := make(ObjectPairChan, Config.Transfers) var checkerWg sync.WaitGroup checkerWg.Add(Config.Checkers) @@ -287,8 +246,8 @@ func Sync(fdst, fsrc Fs) error { delete(delFiles, remote) to_be_checked <- ObjectPair{src, dst} } else { - // No need to check doesn't exist - to_be_uploaded <- src + // No need to check since doesn't exist + to_be_uploaded <- ObjectPair{src, nil} } } close(to_be_checked) @@ -300,20 +259,23 @@ func Sync(fdst, fsrc Fs) error { Log(fdst, "Waiting for transfers to finish") copierWg.Wait() - if Stats.Errored() { - Log(fdst, "Not deleting files as there were IO errors") - return nil - } - - // Delete the spare files - toDelete := make(ObjectsChan, Config.Transfers) - go func() { - for _, fs := range delFiles { - toDelete <- fs + // Delete files if asked + if Delete { + if Stats.Errored() { + Log(fdst, "Not deleting files as there were IO errors") + return nil } - close(toDelete) - }() - DeleteFiles(toDelete) + + // Delete the spare files + toDelete := make(ObjectsChan, Config.Transfers) + go func() { + for _, fs := range delFiles { + toDelete <- fs + } + close(toDelete) + }() + DeleteFiles(toDelete) + } return nil } diff --git a/rclone.go b/rclone.go index 06dbb0325..c1544b48a 100644 --- a/rclone.go +++ b/rclone.go @@ -61,7 +61,7 @@ var Commands = []Command{ unchanged files, testing first by modification time then by MD5SUM. Doesn't delete files from the destination.`, Run: func(fdst, fsrc fs.Fs) { - err := fs.CopyFs(fdst, fsrc) + err := fs.Sync(fdst, fsrc, false) if err != nil { log.Fatalf("Failed to copy: %v", err) } @@ -79,7 +79,7 @@ var Commands = []Command{ exist in destination. Since this can cause data loss, test first with the --dry-run flag.`, Run: func(fdst, fsrc fs.Fs) { - err := fs.Sync(fdst, fsrc) + err := fs.Sync(fdst, fsrc, true) if err != nil { log.Fatalf("Failed to sync: %v", err) } diff --git a/rclonetest/rclonetest.go b/rclonetest/rclonetest.go index 0e014cd85..3eb982d36 100644 --- a/rclonetest/rclonetest.go +++ b/rclonetest/rclonetest.go @@ -150,7 +150,7 @@ var t2 = Time("2011-12-25T12:59:59.123456789Z") func TestCopy(flocal, fremote fs.Fs) { WriteFile("empty", "", t1) - err := fs.CopyFs(fremote, flocal) + err := fs.Sync(fremote, flocal, false) if err != nil { log.Fatalf("Copy failed: %v", err) } @@ -168,7 +168,7 @@ func TestSync(flocal, fremote fs.Fs) { if err != nil { log.Fatalf("Chtimes failed: %v", err) } - err = fs.Sync(fremote, flocal) + err = fs.Sync(fremote, flocal, true) if err != nil { log.Fatalf("Sync failed: %v", err) } @@ -182,7 +182,7 @@ func TestSync(flocal, fremote fs.Fs) { log.Printf("Sync after adding a file") WriteFile("potato", "------------------------------------------------------------", t1) - err = fs.Sync(fremote, flocal) + err = fs.Sync(fremote, flocal, true) if err != nil { log.Fatalf("Sync failed: %v", err) } @@ -197,7 +197,7 @@ func TestSync(flocal, fremote fs.Fs) { log.Printf("Sync after changing a file's size only") WriteFile("potato", "smaller but same date", t1) - err = fs.Sync(fremote, flocal) + err = fs.Sync(fremote, flocal, true) if err != nil { log.Fatalf("Sync failed: %v", err) } @@ -215,7 +215,7 @@ func TestSync(flocal, fremote fs.Fs) { if err != nil { log.Fatalf("Remove failed: %v", err) } - err = fs.Sync(fremote, flocal) + err = fs.Sync(fremote, flocal, true) if err != nil { log.Fatalf("Sync failed: %v", err) }