forked from TrueCloudLab/rclone
Factor Move out of sync.go and add remote parameter to Move and Copy
This commit is contained in:
parent
5e35aeca9e
commit
50b3cfccb1
2 changed files with 59 additions and 88 deletions
|
@ -222,12 +222,13 @@ func removeFailedCopy(dst Object) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
// Copy src object to dst or f if nil
|
||||
//
|
||||
// If dst is nil then the object must not exist already. If you do
|
||||
// call Copy() with dst nil on a pre-existing file then some filing
|
||||
// systems (eg Drive) may duplicate the file.
|
||||
func Copy(f Fs, dst, src Object) (err error) {
|
||||
// Copy src object to dst or f if nil. If dst is nil then it uses
|
||||
// remote as the name of the new object.
|
||||
func Copy(f Fs, dst Object, remote string, src Object) (err error) {
|
||||
if Config.DryRun {
|
||||
Log(src, "Not copying as --dry-run")
|
||||
return nil
|
||||
}
|
||||
maxTries := Config.LowLevelRetries
|
||||
tries := 0
|
||||
doUpdate := dst != nil
|
||||
|
@ -238,7 +239,7 @@ func Copy(f Fs, dst, src Object) (err error) {
|
|||
actionTaken = "Copied (server side copy)"
|
||||
if fCopy, ok := f.(Copier); ok && src.Fs().Name() == f.Name() {
|
||||
var newDst Object
|
||||
newDst, err = fCopy.Copy(src, src.Remote())
|
||||
newDst, err = fCopy.Copy(src, remote)
|
||||
if err == nil {
|
||||
dst = newDst
|
||||
}
|
||||
|
@ -333,6 +334,46 @@ func Copy(f Fs, dst, src Object) (err error) {
|
|||
return err
|
||||
}
|
||||
|
||||
// Move src object to dst or fdst if nil. If dst is nil then it uses
|
||||
// remote as the name of the new object.
|
||||
func Move(fdst Fs, dst Object, remote string, src Object) (err error) {
|
||||
if Config.DryRun {
|
||||
Log(src, "Not moving as --dry-run")
|
||||
return nil
|
||||
}
|
||||
// See if we have Move available
|
||||
if do, ok := fdst.(Mover); ok && src.Fs().Name() == fdst.Name() {
|
||||
// Delete destination if it exists
|
||||
if dst != nil {
|
||||
err = DeleteFile(dst)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// Move dst <- src
|
||||
_, err := do.Move(src, remote)
|
||||
switch err {
|
||||
case nil:
|
||||
Debug(src, "Moved")
|
||||
return nil
|
||||
case ErrorCantMove:
|
||||
Debug(src, "Can't move, switching to copy")
|
||||
default:
|
||||
Stats.Error()
|
||||
ErrorLog(dst, "Couldn't move: %v", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
// Move not found or didn't work so copy dst <- src
|
||||
err = Copy(fdst, dst, remote, src)
|
||||
if err != nil {
|
||||
ErrorLog(src, "Not deleting source as copy failed: %v", err)
|
||||
return err
|
||||
}
|
||||
// Delete src if no error on copy
|
||||
return DeleteFile(src)
|
||||
}
|
||||
|
||||
// DeleteFile deletes a single file respecting --dry-run and accumulating stats and errors.
|
||||
func DeleteFile(dst Object) (err error) {
|
||||
if Config.DryRun {
|
||||
|
|
92
fs/sync.go
92
fs/sync.go
|
@ -27,7 +27,7 @@ type syncCopyMove struct {
|
|||
abort chan struct{} // signal to abort the copiers
|
||||
checkerWg sync.WaitGroup // wait for checkers
|
||||
toBeChecked ObjectPairChan // checkers channel
|
||||
copierWg sync.WaitGroup // wait for copiers
|
||||
transfersWg sync.WaitGroup // wait for transfers
|
||||
toBeUploaded ObjectPairChan // copiers channel
|
||||
errorMu sync.Mutex // Mutex covering the errors variables
|
||||
err error // normal error from copy process
|
||||
|
@ -230,9 +230,10 @@ func (s *syncCopyMove) pairChecker(in ObjectPairChan, out ObjectPairChan, wg *sy
|
|||
}
|
||||
}
|
||||
|
||||
// pairCopier reads Objects on in and copies them.
|
||||
func (s *syncCopyMove) pairCopier(in ObjectPairChan, fdst Fs, wg *sync.WaitGroup) {
|
||||
// pairCopyOrMove reads Objects on in and moves or copies them.
|
||||
func (s *syncCopyMove) pairCopyOrMove(in ObjectPairChan, fdst Fs, wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
var err error
|
||||
for {
|
||||
if s.aborting() {
|
||||
return
|
||||
|
@ -244,13 +245,12 @@ func (s *syncCopyMove) pairCopier(in ObjectPairChan, fdst Fs, wg *sync.WaitGroup
|
|||
}
|
||||
src := pair.src
|
||||
Stats.Transferring(src.Remote())
|
||||
var err error
|
||||
if Config.DryRun {
|
||||
Log(src, "Not copying as --dry-run")
|
||||
if s.DoMove {
|
||||
err = Move(fdst, pair.dst, src.Remote(), src)
|
||||
} else {
|
||||
err = Copy(fdst, pair.dst, src)
|
||||
s.processError(err)
|
||||
err = Copy(fdst, pair.dst, src.Remote(), src)
|
||||
}
|
||||
s.processError(err)
|
||||
Stats.DoneTransferring(src.Remote(), err == nil)
|
||||
case <-s.abort:
|
||||
return
|
||||
|
@ -258,72 +258,6 @@ func (s *syncCopyMove) pairCopier(in ObjectPairChan, fdst Fs, wg *sync.WaitGroup
|
|||
}
|
||||
}
|
||||
|
||||
// pairMover reads Objects on in and moves them if possible, or copies
|
||||
// them if not
|
||||
func (s *syncCopyMove) pairMover(in ObjectPairChan, fdst Fs, wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
// See if we have Move available
|
||||
fdstMover, haveMover := fdst.(Mover)
|
||||
for {
|
||||
if s.aborting() {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case pair, ok := <-in:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
transferredOK := true
|
||||
src := pair.src
|
||||
dst := pair.dst
|
||||
Stats.Transferring(src.Remote())
|
||||
doCopy := func() {
|
||||
// Copy dst <- src
|
||||
err := Copy(fdst, dst, src)
|
||||
s.processError(err)
|
||||
if err != nil {
|
||||
transferredOK = false
|
||||
ErrorLog(src, "Not deleting as copy failed: %v", err)
|
||||
} else {
|
||||
// Delete src if no error on copy
|
||||
s.processError(DeleteFile(src))
|
||||
}
|
||||
}
|
||||
if Config.DryRun {
|
||||
Log(src, "Not moving as --dry-run")
|
||||
} else if haveMover && src.Fs().Name() == fdst.Name() {
|
||||
// Delete destination if it exists
|
||||
if dst != nil {
|
||||
s.processError(DeleteFile(dst))
|
||||
}
|
||||
// Move dst <- src
|
||||
_, err := fdstMover.Move(src, src.Remote())
|
||||
if err != nil {
|
||||
// If this remote can't do moves,
|
||||
// then set the flag and copy
|
||||
if err == ErrorCantMove {
|
||||
Debug(src, "Can't move, switching to copy")
|
||||
haveMover = false
|
||||
doCopy()
|
||||
} else {
|
||||
Stats.Error()
|
||||
ErrorLog(dst, "Couldn't move: %v", err)
|
||||
s.processError(err)
|
||||
transferredOK = false
|
||||
}
|
||||
} else {
|
||||
Debug(src, "Moved")
|
||||
}
|
||||
} else {
|
||||
doCopy()
|
||||
}
|
||||
Stats.DoneTransferring(src.Remote(), transferredOK)
|
||||
case <-s.abort:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This starts the background checkers.
|
||||
func (s *syncCopyMove) startCheckers() {
|
||||
s.checkerWg.Add(Config.Checkers)
|
||||
|
@ -341,13 +275,9 @@ func (s *syncCopyMove) stopCheckers() {
|
|||
|
||||
// This starts the background transfers
|
||||
func (s *syncCopyMove) startTransfers() {
|
||||
s.copierWg.Add(Config.Transfers)
|
||||
s.transfersWg.Add(Config.Transfers)
|
||||
for i := 0; i < Config.Transfers; i++ {
|
||||
if s.DoMove {
|
||||
go s.pairMover(s.toBeUploaded, s.fdst, &s.copierWg)
|
||||
} else {
|
||||
go s.pairCopier(s.toBeUploaded, s.fdst, &s.copierWg)
|
||||
}
|
||||
go s.pairCopyOrMove(s.toBeUploaded, s.fdst, &s.transfersWg)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -355,7 +285,7 @@ func (s *syncCopyMove) startTransfers() {
|
|||
func (s *syncCopyMove) stopTransfers() {
|
||||
close(s.toBeUploaded)
|
||||
Log(s.fdst, "Waiting for transfers to finish")
|
||||
s.copierWg.Wait()
|
||||
s.transfersWg.Wait()
|
||||
}
|
||||
|
||||
// This deletes the files in the dstFiles map. If checkSrcMap is set
|
||||
|
|
Loading…
Reference in a new issue