rclone/fs/sync.go
2017-01-03 20:35:05 +00:00

584 lines
14 KiB
Go

// Implementation of sync/copy/move
package fs
import (
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
)
type syncCopyMove struct {
// parameters
fdst Fs
fsrc Fs
Delete bool
DoMove bool
dir string
// internal state
noTraverse bool // if set don't trafevers the dst
deleteBefore bool // set if we must delete objects before copying
trackRenames bool // set if we should do server side renames
dstFiles map[string]Object // dst files, only used if Delete or trackRenames
srcFiles map[string]Object // src files, only used if deleteBefore or trackRenames
srcFilesChan chan Object // passes src objects
srcFilesResult chan error // error result of src listing
dstFilesResult chan error // error result of dst listing
abort chan struct{} // signal to abort the copiers
checkerWg sync.WaitGroup // wait for checkers
toBeChecked ObjectPairChan // checkers channel
transfersWg sync.WaitGroup // wait for transfers
toBeUploaded ObjectPairChan // copiers channel
errorMu sync.Mutex // Mutex covering the errors variables
err error // normal error from copy process
noRetryErr error // error with NoRetry set
fatalErr error // fatal error
}
func newSyncCopyMove(fdst, fsrc Fs, Delete bool, DoMove bool) *syncCopyMove {
s := &syncCopyMove{
fdst: fdst,
fsrc: fsrc,
Delete: Delete,
DoMove: DoMove,
dir: "",
srcFilesChan: make(chan Object, Config.Checkers+Config.Transfers),
srcFilesResult: make(chan error, 1),
dstFilesResult: make(chan error, 1),
noTraverse: Config.NoTraverse,
abort: make(chan struct{}),
toBeChecked: make(ObjectPairChan, Config.Transfers),
toBeUploaded: make(ObjectPairChan, Config.Transfers),
deleteBefore: Delete && Config.DeleteBefore,
trackRenames: Config.TrackRenames,
}
if s.noTraverse && s.Delete {
Debug(s.fdst, "Ignoring --no-traverse with sync")
s.noTraverse = false
}
if s.trackRenames {
// Don't track renames for remotes without server-side rename support.
// Some remotes simulate rename by server-side copy and delete, so include
// remotes that implements either Mover and Copier.
switch fdst.(type) {
case Mover, Copier:
default:
ErrorLog(fdst, "Ignoring --track-renames as the destination does not support server-side move or copy")
s.trackRenames = false
}
if fsrc.Hashes().Overlap(fdst.Hashes()).Count() == 0 {
ErrorLog(fdst, "Ignoring --track-renames as the source and destination do not have a common hash")
s.trackRenames = false
}
}
return s
}
// Check to see if have set the abort flag
func (s *syncCopyMove) aborting() bool {
select {
case <-s.abort:
return true
default:
}
return false
}
// This reads the source files from s.srcFiles into srcFilesChan then
// closes it
//
// It returns the final result of the read into s.srcFilesResult
func (s *syncCopyMove) readSrcUsingMap() {
outer:
for _, o := range s.srcFiles {
if s.aborting() {
break outer
}
select {
case s.srcFilesChan <- o:
case <-s.abort:
break outer
}
}
close(s.srcFilesChan)
s.srcFilesResult <- nil
}
// This reads the source files into srcFilesChan then closes it
//
// It returns the final result of the read into s.srcFilesResult
func (s *syncCopyMove) readSrcUsingChan() {
err := readFilesFn(s.fsrc, false, s.dir, func(o Object) error {
if s.aborting() {
return ErrorListAborted
}
select {
case s.srcFilesChan <- o:
case <-s.abort:
return ErrorListAborted
}
return nil
})
close(s.srcFilesChan)
if err != nil {
err = errors.Wrapf(err, "error listing source: %s", s.fsrc)
}
s.srcFilesResult <- err
}
// This reads the destination files in into dstFiles
//
// It returns the final result of the read into s.dstFilesResult
func (s *syncCopyMove) readDstFiles() {
var err error
s.dstFiles, err = readFilesMap(s.fdst, Config.Filter.DeleteExcluded, s.dir)
s.dstFilesResult <- err
}
// NeedTransfer checks to see if src needs to be copied to dst using
// the current config.
//
// Returns a flag which indicates whether the file needs to be
// transferred or not.
func NeedTransfer(dst, src Object) bool {
if dst == nil {
Debug(src, "Couldn't find file - need to transfer")
return true
}
// If we should ignore existing files, don't transfer
if Config.IgnoreExisting {
Debug(src, "Destination exists, skipping")
return false
}
// If we should upload unconditionally
if Config.IgnoreTimes {
Debug(src, "Transferring unconditionally as --ignore-times is in use")
return true
}
// If UpdateOlder is in effect, skip if dst is newer than src
if Config.UpdateOlder {
srcModTime := src.ModTime()
dstModTime := dst.ModTime()
dt := dstModTime.Sub(srcModTime)
// If have a mutually agreed precision then use that
modifyWindow := Config.ModifyWindow
if modifyWindow == ModTimeNotSupported {
// Otherwise use 1 second as a safe default as
// the resolution of the time a file was
// uploaded.
modifyWindow = time.Second
}
switch {
case dt >= modifyWindow:
Debug(src, "Destination is newer than source, skipping")
return false
case dt <= -modifyWindow:
Debug(src, "Destination is older than source, transferring")
default:
if src.Size() == dst.Size() {
Debug(src, "Destination mod time is within %v of source and sizes identical, skipping", modifyWindow)
return false
}
Debug(src, "Destination mod time is within %v of source but sizes differ, transferring", modifyWindow)
}
} else {
// Check to see if changed or not
if Equal(src, dst) {
Debug(src, "Unchanged skipping")
return false
}
}
return true
}
// This checks the types of errors returned while copying files
func (s *syncCopyMove) processError(err error) {
if err == nil {
return
}
s.errorMu.Lock()
defer s.errorMu.Unlock()
switch {
case IsFatalError(err):
if !s.aborting() {
close(s.abort)
}
s.fatalErr = err
case IsNoRetryError(err):
s.noRetryErr = err
default:
s.err = err
}
}
// pairChecker reads Objects~s on in send to out if they need transferring.
//
// FIXME potentially doing lots of hashes at once
func (s *syncCopyMove) pairChecker(in ObjectPairChan, out ObjectPairChan, wg *sync.WaitGroup) {
defer wg.Done()
for {
if s.aborting() {
return
}
select {
case pair, ok := <-in:
if !ok {
return
}
src := pair.src
Stats.Checking(src.Remote())
// Check to see if can store this
if src.Storable() {
if NeedTransfer(pair.dst, pair.src) {
out <- pair
} else {
// If moving need to delete the files we don't need to copy
if s.DoMove {
// Delete src if no error on copy
s.processError(DeleteFile(src))
}
}
}
Stats.DoneChecking(src.Remote())
case <-s.abort:
return
}
}
}
// pairCopyOrMove reads Objects on in and moves or copies them.
func (s *syncCopyMove) pairCopyOrMove(in ObjectPairChan, fdst Fs, wg *sync.WaitGroup) {
defer wg.Done()
var err error
for {
if s.aborting() {
return
}
select {
case pair, ok := <-in:
if !ok {
return
}
src := pair.src
Stats.Transferring(src.Remote())
if s.DoMove {
err = Move(fdst, pair.dst, src.Remote(), src)
} else {
err = Copy(fdst, pair.dst, src.Remote(), src)
}
s.processError(err)
Stats.DoneTransferring(src.Remote(), err == nil)
case <-s.abort:
return
}
}
}
// This starts the background checkers.
func (s *syncCopyMove) startCheckers() {
s.checkerWg.Add(Config.Checkers)
for i := 0; i < Config.Checkers; i++ {
go s.pairChecker(s.toBeChecked, s.toBeUploaded, &s.checkerWg)
}
}
// This stops the background checkers
func (s *syncCopyMove) stopCheckers() {
close(s.toBeChecked)
Log(s.fdst, "Waiting for checks to finish")
s.checkerWg.Wait()
}
// This starts the background transfers
func (s *syncCopyMove) startTransfers() {
s.transfersWg.Add(Config.Transfers)
for i := 0; i < Config.Transfers; i++ {
go s.pairCopyOrMove(s.toBeUploaded, s.fdst, &s.transfersWg)
}
}
// This stops the background transfers
func (s *syncCopyMove) stopTransfers() {
close(s.toBeUploaded)
Log(s.fdst, "Waiting for transfers to finish")
s.transfersWg.Wait()
}
// This deletes the files in the dstFiles map. If checkSrcMap is set
// then it checks to see if they exist first in srcFiles the source
// file map, otherwise it unconditionally deletes them. If
// checkSrcMap is clear then it assumes that the any source files that
// have been found have been removed from dstFiles already.
func (s *syncCopyMove) deleteFiles(checkSrcMap bool) error {
if Stats.Errored() {
ErrorLog(s.fdst, "%v", ErrorNotDeleting)
return ErrorNotDeleting
}
// Delete the spare files
toDelete := make(ObjectsChan, Config.Transfers)
go func() {
for remote, o := range s.dstFiles {
if checkSrcMap {
_, exists := s.srcFiles[remote]
if exists {
continue
}
}
if s.aborting() {
break
}
toDelete <- o
}
close(toDelete)
}()
return DeleteFiles(toDelete)
}
func (s *syncCopyMove) renameFiles() error {
toRename := make(ObjectPairChan, Config.Transfers)
for srcRemote, srcObject := range s.srcFiles {
if _, exists := s.dstFiles[srcRemote]; exists {
continue
}
if s.aborting() {
return nil
}
for dstRemote, dstObject := range s.dstFiles {
if _, exists := s.srcFiles[dstRemote]; exists {
continue
}
// At this point, if the files are equal, this is a rename.
if equal(srcObject, dstObject, false, true) {
toRename <- ObjectPair{srcObject, dstObject}
break
}
}
}
close(toRename)
var (
wg sync.WaitGroup
filesMu sync.Mutex
errorCount int32
)
wg.Add(Config.Transfers)
for i := 0; i < Config.Transfers; i++ {
go func() {
defer wg.Done()
for pair := range toRename {
Debug(nil, "Rename %q to %q", pair.dst.Remote(), pair.src.Remote())
err := MoveFile(s.fdst, s.fdst, pair.src.Remote(), pair.dst.Remote())
if err != nil {
atomic.AddInt32(&errorCount, 1)
continue
}
filesMu.Lock()
delete(s.dstFiles, pair.dst.Remote())
delete(s.srcFiles, pair.src.Remote())
filesMu.Unlock()
}
}()
}
Log(nil, "Waiting for renames to finish")
wg.Wait()
if errorCount > 0 {
return errors.Errorf("failed to rename %d files", errorCount)
}
return nil
}
// Syncs fsrc into fdst
//
// If Delete is true then it deletes any files in fdst that aren't in fsrc
//
// If DoMove is true then files will be moved instead of copied
//
// dir is the start directory, "" for root
func (s *syncCopyMove) run() error {
if Same(s.fdst, s.fsrc) {
ErrorLog(s.fdst, "Nothing to do as source and destination are the same")
return nil
}
err := Mkdir(s.fdst, "")
if err != nil {
return err
}
// Start reading dstFiles if required
if !s.noTraverse {
go s.readDstFiles()
}
// If s.deleteBefore or s.trackRenames then we need to read the whole source map first
readSourceMap := s.deleteBefore || s.trackRenames
if readSourceMap {
// Read source files into the map
s.srcFiles, err = readFilesMap(s.fsrc, false, s.dir)
if err != nil {
return err
}
}
// Wait for dstfiles to finish reading if we were reading them
// and report any errors
if !s.noTraverse {
err = <-s.dstFilesResult
if err != nil {
return err
}
}
// Do renames if required
// Have dstFiles and srcFiles complete at this point
if s.trackRenames {
if err = s.renameFiles(); err != nil {
return err
}
}
// Delete files first if required
if s.deleteBefore {
err = s.deleteFiles(true)
if err != nil {
return err
}
}
// Now we can fill the src channel.
if readSourceMap {
// Pump the map into s.srcFilesChan
go s.readSrcUsingMap()
} else {
go s.readSrcUsingChan()
}
// Start background checking and transferring pipeline
s.startCheckers()
s.startTransfers()
// Do the transfers
for src := range s.srcFilesChan {
remote := src.Remote()
var dst Object
if s.noTraverse {
var err error
dst, err = s.fdst.NewObject(remote)
if err != nil {
dst = nil
if err != ErrorObjectNotFound {
Debug(src, "Error making NewObject: %v", err)
}
}
} else {
dst = s.dstFiles[remote]
// Remove file from s.dstFiles because it exists in srcFiles
delete(s.dstFiles, remote)
}
if dst != nil {
s.toBeChecked <- ObjectPair{src, dst}
} else {
// No need to check since doesn't exist
s.toBeUploaded <- ObjectPair{src, nil}
}
}
// Stop background checking and transferring pipeline
s.stopCheckers()
s.stopTransfers()
// Retrieve the delayed error from the source listing goroutine
err = <-s.srcFilesResult
// Delete files during or after
if s.Delete && (Config.DeleteDuring || Config.DeleteAfter) {
if err != nil {
ErrorLog(s.fdst, "%v", ErrorNotDeleting)
} else {
err = s.deleteFiles(false)
}
}
// Return errors in the precedence
// fatalErr
// error from above
// error from a copy
// noRetryErr
s.processError(err)
if s.fatalErr != nil {
return s.fatalErr
}
if s.err != nil {
return s.err
}
return s.noRetryErr
}
// Sync fsrc into fdst
func Sync(fdst, fsrc Fs) error {
return newSyncCopyMove(fdst, fsrc, true, false).run()
}
// CopyDir copies fsrc into fdst
func CopyDir(fdst, fsrc Fs) error {
return newSyncCopyMove(fdst, fsrc, false, false).run()
}
// moveDir moves fsrc into fdst
func moveDir(fdst, fsrc Fs) error {
return newSyncCopyMove(fdst, fsrc, false, true).run()
}
// MoveDir moves fsrc into fdst
func MoveDir(fdst, fsrc Fs) error {
if Same(fdst, fsrc) {
ErrorLog(fdst, "Nothing to do as source and destination are the same")
return nil
}
// First attempt to use DirMover if exists, same Fs and no filters are active
if fdstDirMover, ok := fdst.(DirMover); ok && fsrc.Name() == fdst.Name() && Config.Filter.InActive() {
if Config.DryRun {
Log(fdst, "Not doing server side directory move as --dry-run")
return nil
}
Debug(fdst, "Using server side directory move")
err := fdstDirMover.DirMove(fsrc)
switch err {
case ErrorCantDirMove, ErrorDirExists:
Debug(fdst, "Server side directory move failed - fallback to file moves: %v", err)
case nil:
Debug(fdst, "Server side directory move succeeded")
return nil
default:
Stats.Error()
ErrorLog(fdst, "Server side directory move failed: %v", err)
return err
}
}
// The two remotes mustn't overlap if we didn't do server side move
if Overlapping(fdst, fsrc) {
err := ErrorCantMoveOverlapping
ErrorLog(fdst, "%v", err)
return err
}
// Otherwise move the files one by one
return moveDir(fdst, fsrc)
}