fs: make syncCopyMove use context for go routine cancellation

This commit is contained in:
Nick Craig-Wood 2017-09-01 15:47:32 +01:00
parent fe96d5cf0a
commit 261c7ad9e4

View file

@ -3,13 +3,13 @@
package fs package fs
import ( import (
"context"
"fmt" "fmt"
"sort" "sort"
"sync" "sync"
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
"golang.org/x/net/context"
) )
var oldSyncMethod = BoolP("old-sync-method", "", false, "Deprecated - use --fast-list instead") var oldSyncMethod = BoolP("old-sync-method", "", false, "Deprecated - use --fast-list instead")
@ -22,6 +22,8 @@ type syncCopyMove struct {
DoMove bool DoMove bool
dir string dir string
// internal state // internal state
ctx context.Context // internal context for controlling go-routines
cancel func() // cancel the context
noTraverse bool // if set don't trafevers the dst noTraverse bool // if set don't trafevers the dst
deletersWg sync.WaitGroup // for delete before go routine deletersWg sync.WaitGroup // for delete before go routine
deleteFilesCh chan Object // channel to receive deletes if delete before deleteFilesCh chan Object // channel to receive deletes if delete before
@ -34,7 +36,6 @@ type syncCopyMove struct {
dstFilesResult chan error // error result of dst listing dstFilesResult chan error // error result of dst listing
dstEmptyDirsMu sync.Mutex // protect dstEmptyDirs dstEmptyDirsMu sync.Mutex // protect dstEmptyDirs
dstEmptyDirs []DirEntry // potentially empty directories dstEmptyDirs []DirEntry // potentially empty directories
abort chan struct{} // signal to abort the copiers
checkerWg sync.WaitGroup // wait for checkers checkerWg sync.WaitGroup // wait for checkers
toBeChecked ObjectPairChan // checkers channel toBeChecked ObjectPairChan // checkers channel
transfersWg sync.WaitGroup // wait for transfers transfersWg sync.WaitGroup // wait for transfers
@ -66,7 +67,6 @@ func newSyncCopyMove(fdst, fsrc Fs, deleteMode DeleteMode, DoMove bool) (*syncCo
srcFilesResult: make(chan error, 1), srcFilesResult: make(chan error, 1),
dstFilesResult: make(chan error, 1), dstFilesResult: make(chan error, 1),
noTraverse: Config.NoTraverse, noTraverse: Config.NoTraverse,
abort: make(chan struct{}),
toBeChecked: make(ObjectPairChan, Config.Transfers), toBeChecked: make(ObjectPairChan, Config.Transfers),
toBeUploaded: make(ObjectPairChan, Config.Transfers), toBeUploaded: make(ObjectPairChan, Config.Transfers),
deleteFilesCh: make(chan Object, Config.Checkers), deleteFilesCh: make(chan Object, Config.Checkers),
@ -75,6 +75,7 @@ func newSyncCopyMove(fdst, fsrc Fs, deleteMode DeleteMode, DoMove bool) (*syncCo
toBeRenamed: make(ObjectPairChan, Config.Transfers), toBeRenamed: make(ObjectPairChan, Config.Transfers),
trackRenamesCh: make(chan Object, Config.Checkers), trackRenamesCh: make(chan Object, Config.Checkers),
} }
s.ctx, s.cancel = context.WithCancel(context.Background())
if s.noTraverse && s.deleteMode != DeleteModeOff { if s.noTraverse && s.deleteMode != DeleteModeOff {
Errorf(nil, "Ignoring --no-traverse with sync") Errorf(nil, "Ignoring --no-traverse with sync")
s.noTraverse = false s.noTraverse = false
@ -124,10 +125,10 @@ func newSyncCopyMove(fdst, fsrc Fs, deleteMode DeleteMode, DoMove bool) (*syncCo
return s, nil return s, nil
} }
// Check to see if have set the abort flag // Check to see if the context has been cancelled
func (s *syncCopyMove) aborting() bool { func (s *syncCopyMove) aborting() bool {
select { select {
case <-s.abort: case <-s.ctx.Done():
return true return true
default: default:
} }
@ -144,7 +145,7 @@ outer:
} }
select { select {
case out <- o: case out <- o:
case <-s.abort: case <-s.ctx.Done():
break outer break outer
} }
} }
@ -218,7 +219,7 @@ func (s *syncCopyMove) processError(err error) {
switch { switch {
case IsFatalError(err): case IsFatalError(err):
if !s.aborting() { if !s.aborting() {
close(s.abort) s.cancel()
} }
s.fatalErr = err s.fatalErr = err
case IsNoRetryError(err): case IsNoRetryError(err):
@ -287,7 +288,7 @@ func (s *syncCopyMove) pairChecker(in ObjectPairChan, out ObjectPairChan, wg *sy
} }
} }
Stats.DoneChecking(src.Remote()) Stats.DoneChecking(src.Remote())
case <-s.abort: case <-s.ctx.Done():
return return
} }
} }
@ -311,7 +312,7 @@ func (s *syncCopyMove) pairRenamer(in ObjectPairChan, out ObjectPairChan, wg *sy
// pass on if not renamed // pass on if not renamed
out <- pair out <- pair
} }
case <-s.abort: case <-s.ctx.Done():
return return
} }
} }
@ -339,7 +340,7 @@ func (s *syncCopyMove) pairCopyOrMove(in ObjectPairChan, fdst Fs, wg *sync.WaitG
} }
s.processError(err) s.processError(err)
Stats.DoneTransferring(src.Remote(), err == nil) Stats.DoneTransferring(src.Remote(), err == nil)
case <-s.abort: case <-s.ctx.Done():
return return
} }
} }
@ -650,8 +651,8 @@ func (s *syncCopyMove) run() error {
s.startTrackRenames() s.startTrackRenames()
ctx := context.Background() // set up a march over fdst and fsrc
m := newMarch(ctx, s.fdst, s.fsrc, s.dir, s) m := newMarch(s.ctx, s.fdst, s.fsrc, s.dir, s)
m.run() m.run()
s.stopTrackRenames() s.stopTrackRenames()