Implement --no-traverse flag to stop copy traversing the destination remote.
Refactor sync/copy/move * Don't load the src listing unless doing a sync and --delete-before * Don't load the dst listing if doing copy/move and --no-traverse is set `rclone --no-traverse copy src dst` now won't load either of the listings into memory so will use the minimum amount of memory. This change will reduce the amount of memory rclone uses dramatically too as in normal operations (copy without --notraverse or sync) as it no longer loads the source file listing into memory at all. Fixes #8 Fixes #544 Fixes #546
This commit is contained in:
parent
13797a1fb8
commit
af4ef8ad8d
5 changed files with 174 additions and 109 deletions
206
fs/operations.go
206
fs/operations.go
|
@ -472,13 +472,11 @@ func DeleteFiles(toBeDeleted ObjectsChan) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Read a map of Object.Remote to Object for the given Fs.
|
||||
// Read a Objects into add() for the given Fs.
|
||||
// dir is the start directory, "" for root
|
||||
// If includeAll is specified all files will be added,
|
||||
// otherwise only files passing the filter will be added.
|
||||
func readFilesMap(fs Fs, includeAll bool, dir string) (files map[string]Object, err error) {
|
||||
files = make(map[string]Object)
|
||||
normalised := make(map[string]struct{})
|
||||
func readFilesFn(fs Fs, includeAll bool, dir string, add func(Object)) (err error) {
|
||||
list := NewLister()
|
||||
if !includeAll {
|
||||
list.SetFilter(Config.Filter)
|
||||
|
@ -488,30 +486,45 @@ func readFilesMap(fs Fs, includeAll bool, dir string) (files map[string]Object,
|
|||
for {
|
||||
o, err := list.GetObject()
|
||||
if err != nil {
|
||||
return files, err
|
||||
return err
|
||||
}
|
||||
// Check if we are finished
|
||||
if o == nil {
|
||||
break
|
||||
}
|
||||
// Make sure we don't delete excluded files if not required
|
||||
if includeAll || Config.Filter.IncludeObject(o) {
|
||||
add(o)
|
||||
} else {
|
||||
Debug(o, "Excluded from sync (and deletion)")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Read a map of Object.Remote to Object for the given Fs.
|
||||
// dir is the start directory, "" for root
|
||||
// If includeAll is specified all files will be added,
|
||||
// otherwise only files passing the filter will be added.
|
||||
//
|
||||
// This also detects duplicates and normalised duplicates
|
||||
func readFilesMap(fs Fs, includeAll bool, dir string) (files map[string]Object, err error) {
|
||||
files = make(map[string]Object)
|
||||
normalised := make(map[string]struct{})
|
||||
err = readFilesFn(fs, includeAll, dir, func(o Object) {
|
||||
remote := o.Remote()
|
||||
normalisedRemote := strings.ToLower(norm.NFC.String(remote))
|
||||
if _, ok := files[remote]; !ok {
|
||||
// Make sure we don't delete excluded files if not required
|
||||
if includeAll || Config.Filter.IncludeObject(o) {
|
||||
files[remote] = o
|
||||
if _, ok := normalised[normalisedRemote]; ok {
|
||||
Log(o, "Warning: File found with same name but different case on %v", o.Fs())
|
||||
}
|
||||
} else {
|
||||
Debug(o, "Excluded from sync (and deletion)")
|
||||
files[remote] = o
|
||||
if _, ok := normalised[normalisedRemote]; ok {
|
||||
Log(o, "Warning: File found with same name but different case on %v", o.Fs())
|
||||
}
|
||||
} else {
|
||||
Log(o, "Duplicate file detected")
|
||||
}
|
||||
normalised[normalisedRemote] = struct{}{}
|
||||
}
|
||||
return files, nil
|
||||
})
|
||||
return files, err
|
||||
}
|
||||
|
||||
// readFilesMaps runs readFilesMap on fdst and fsrc at the same time
|
||||
|
@ -698,115 +711,99 @@ func (s *syncCopyMove) stopTransfers() {
|
|||
// If DoMove is true then files will be moved instead of copied
|
||||
//
|
||||
// dir is the start directory, "" for root
|
||||
func syncCopyMove(fdst, fsrc Fs, Delete bool, DoMove bool, dir string) error {
|
||||
if Same(fdst, fsrc) {
|
||||
ErrorLog(fdst, "Nothing to do as source and destination are the same")
|
||||
func (s *syncCopyMove) run() error {
|
||||
if Same(s.fdst, s.fsrc) {
|
||||
ErrorLog(s.fdst, "Nothing to do as source and destination are the same")
|
||||
return nil
|
||||
}
|
||||
|
||||
err := Mkdir(fdst)
|
||||
err := Mkdir(s.fdst)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Read the files of both source and destination in parallel
|
||||
dstFiles, srcFiles, err := readFilesMaps(fdst, Config.Filter.DeleteExcluded, fsrc, false, dir)
|
||||
if err != nil {
|
||||
return err
|
||||
// Start reading dstFiles if required
|
||||
if !s.noTraverse {
|
||||
go s.readDstFiles()
|
||||
}
|
||||
|
||||
startDeletion := make(chan struct{}, 0)
|
||||
|
||||
// Delete files if asked
|
||||
var delWg sync.WaitGroup
|
||||
delWg.Add(1)
|
||||
go func() {
|
||||
if !Delete {
|
||||
return
|
||||
// If s.deleteBefore then we need to read the whole source map first
|
||||
if s.deleteBefore {
|
||||
// Read source files into the map
|
||||
s.srcFiles, err = readFilesMap(s.fsrc, false, s.dir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
Debug(fdst, "Deletion finished")
|
||||
delWg.Done()
|
||||
}()
|
||||
// Pump the map into s.srcFilesChan
|
||||
go s.readSrcUsingMap()
|
||||
} else {
|
||||
go s.readSrcUsingChan()
|
||||
}
|
||||
|
||||
_ = <-startDeletion
|
||||
Debug(fdst, "Starting deletion")
|
||||
|
||||
if Stats.Errored() {
|
||||
ErrorLog(fdst, "Not deleting files as there were IO errors")
|
||||
return
|
||||
// Wait for dstfiles to finish reading if we were reading them
|
||||
// and report any errors
|
||||
if !s.noTraverse {
|
||||
err = <-s.dstFilesResult
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Delete the spare files
|
||||
toDelete := make(ObjectsChan, Config.Transfers)
|
||||
// Delete files first if required
|
||||
// Have dstFiles and srcFiles complete at this point
|
||||
if s.deleteBefore {
|
||||
err = s.deleteFiles(true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
go func() {
|
||||
for key, fs := range dstFiles {
|
||||
_, exists := srcFiles[key]
|
||||
if !exists {
|
||||
toDelete <- fs
|
||||
// Start background checking and transferring pipeline
|
||||
s.startCheckers()
|
||||
s.startTransfers()
|
||||
|
||||
// Do the transfers
|
||||
for src := range s.srcFilesChan {
|
||||
remote := src.Remote()
|
||||
var dst Object
|
||||
if s.noTraverse {
|
||||
var err error
|
||||
dst, err = s.fdst.NewObject(remote)
|
||||
if err != nil {
|
||||
dst = nil
|
||||
if err != ErrorObjectNotFound {
|
||||
Debug(src, "Error making NewObject: %v", err)
|
||||
}
|
||||
}
|
||||
close(toDelete)
|
||||
}()
|
||||
DeleteFiles(toDelete)
|
||||
}()
|
||||
|
||||
// Start deleting, unless we must delete after transfer
|
||||
if Delete && !Config.DeleteAfter {
|
||||
close(startDeletion)
|
||||
}
|
||||
|
||||
// If deletes must finish before starting transfers, we must wait now.
|
||||
if Delete && Config.DeleteBefore {
|
||||
Log(fdst, "Waiting for deletes to finish (before)")
|
||||
delWg.Wait()
|
||||
}
|
||||
|
||||
// Read source files checking them off against dest files
|
||||
toBeChecked := make(ObjectPairChan, Config.Transfers)
|
||||
toBeUploaded := make(ObjectPairChan, Config.Transfers)
|
||||
|
||||
var checkerWg sync.WaitGroup
|
||||
checkerWg.Add(Config.Checkers)
|
||||
for i := 0; i < Config.Checkers; i++ {
|
||||
go PairChecker(toBeChecked, toBeUploaded, &checkerWg)
|
||||
}
|
||||
|
||||
var copierWg sync.WaitGroup
|
||||
copierWg.Add(Config.Transfers)
|
||||
for i := 0; i < Config.Transfers; i++ {
|
||||
if DoMove {
|
||||
go PairMover(toBeUploaded, fdst, &copierWg)
|
||||
} else {
|
||||
go PairCopier(toBeUploaded, fdst, &copierWg)
|
||||
dst = s.dstFiles[remote]
|
||||
// Remove file from s.dstFiles because it exists in srcFiles
|
||||
delete(s.dstFiles, remote)
|
||||
}
|
||||
}
|
||||
|
||||
for remote, src := range srcFiles {
|
||||
if dst, dstFound := dstFiles[remote]; dstFound {
|
||||
toBeChecked <- ObjectPair{src, dst}
|
||||
if dst != nil {
|
||||
s.toBeChecked <- ObjectPair{src, dst}
|
||||
} else {
|
||||
// No need to check since doesn't exist
|
||||
toBeUploaded <- ObjectPair{src, nil}
|
||||
s.toBeUploaded <- ObjectPair{src, nil}
|
||||
}
|
||||
}
|
||||
close(toBeChecked)
|
||||
|
||||
Log(fdst, "Waiting for checks to finish")
|
||||
checkerWg.Wait()
|
||||
close(toBeUploaded)
|
||||
Log(fdst, "Waiting for transfers to finish")
|
||||
copierWg.Wait()
|
||||
// Stop background checking and transferring pipeline
|
||||
s.stopCheckers()
|
||||
s.stopTransfers()
|
||||
|
||||
// If deleting after, start deletion now
|
||||
if Delete && Config.DeleteAfter {
|
||||
close(startDeletion)
|
||||
// Retrieve the delayed error from the source listing goroutine
|
||||
err = <-s.srcFilesResult
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Unless we have already waited, wait for deletion to finish.
|
||||
if Delete && !Config.DeleteBefore {
|
||||
Log(fdst, "Waiting for deletes to finish (during+after)")
|
||||
delWg.Wait()
|
||||
|
||||
// Delete files during or after
|
||||
if s.Delete && (Config.DeleteDuring || Config.DeleteAfter) {
|
||||
err = s.deleteFiles(false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -814,12 +811,17 @@ func syncCopyMove(fdst, fsrc Fs, Delete bool, DoMove bool, dir string) error {
|
|||
|
||||
// Sync fsrc into fdst
|
||||
func Sync(fdst, fsrc Fs) error {
|
||||
return syncCopyMove(fdst, fsrc, true, false, "")
|
||||
return newSyncCopyMove(fdst, fsrc, true, false).run()
|
||||
}
|
||||
|
||||
// CopyDir copies fsrc into fdst
|
||||
func CopyDir(fdst, fsrc Fs) error {
|
||||
return syncCopyMove(fdst, fsrc, false, false, "")
|
||||
return newSyncCopyMove(fdst, fsrc, false, false).run()
|
||||
}
|
||||
|
||||
// moveDir moves fsrc into fdst
|
||||
func moveDir(fdst, fsrc Fs) error {
|
||||
return newSyncCopyMove(fdst, fsrc, false, true).run()
|
||||
}
|
||||
|
||||
// MoveDir moves fsrc into fdst
|
||||
|
@ -847,7 +849,7 @@ func MoveDir(fdst, fsrc Fs) error {
|
|||
}
|
||||
|
||||
// Now move the files
|
||||
err := syncCopyMove(fdst, fsrc, false, true, "")
|
||||
err := moveDir(fdst, fsrc)
|
||||
if err != nil || Stats.Errored() {
|
||||
ErrorLog(fdst, "Not deleting files as there were IO errors")
|
||||
return err
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue