sync,march: check the cancel context on every channel send and receive
This fixes a deadlock on sync when all the copying channels receive a Fatal Error.
This commit is contained in:
parent
98bf65c43b
commit
d178233e74
2 changed files with 62 additions and 10 deletions
|
@ -146,7 +146,12 @@ func (m *March) Run() {
|
||||||
// Now we have traversed this directory, send these
|
// Now we have traversed this directory, send these
|
||||||
// jobs off for traversal in the background
|
// jobs off for traversal in the background
|
||||||
for _, newJob := range jobs {
|
for _, newJob := range jobs {
|
||||||
in <- newJob
|
select {
|
||||||
|
case <-m.ctx.Done():
|
||||||
|
// discard job if finishing
|
||||||
|
traversing.Done()
|
||||||
|
case in <- newJob:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
@ -164,6 +169,13 @@ func (m *March) Run() {
|
||||||
dstRemote: m.dir,
|
dstRemote: m.dir,
|
||||||
dstDepth: dstDepth - 1,
|
dstDepth: dstDepth - 1,
|
||||||
}
|
}
|
||||||
|
go func() {
|
||||||
|
// when the context is cancelled discard the remaining jobs
|
||||||
|
<-m.ctx.Done()
|
||||||
|
for range in {
|
||||||
|
traversing.Done()
|
||||||
|
}
|
||||||
|
}()
|
||||||
traversing.Wait()
|
traversing.Wait()
|
||||||
close(in)
|
close(in)
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
|
@ -224,10 +224,18 @@ func (s *syncCopyMove) pairChecker(in fs.ObjectPairChan, out fs.ObjectPairChan,
|
||||||
} else {
|
} else {
|
||||||
// If successful zero out the dst as it is no longer there and copy the file
|
// If successful zero out the dst as it is no longer there and copy the file
|
||||||
pair.Dst = nil
|
pair.Dst = nil
|
||||||
out <- pair
|
select {
|
||||||
|
case <-s.ctx.Done():
|
||||||
|
return
|
||||||
|
case out <- pair:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
out <- pair
|
select {
|
||||||
|
case <-s.ctx.Done():
|
||||||
|
return
|
||||||
|
case out <- pair:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -261,7 +269,11 @@ func (s *syncCopyMove) pairRenamer(in fs.ObjectPairChan, out fs.ObjectPairChan,
|
||||||
src := pair.Src
|
src := pair.Src
|
||||||
if !s.tryRename(src) {
|
if !s.tryRename(src) {
|
||||||
// pass on if not renamed
|
// pass on if not renamed
|
||||||
out <- pair
|
select {
|
||||||
|
case <-s.ctx.Done():
|
||||||
|
return
|
||||||
|
case out <- pair:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
case <-s.ctx.Done():
|
case <-s.ctx.Done():
|
||||||
return
|
return
|
||||||
|
@ -407,6 +419,7 @@ func (s *syncCopyMove) deleteFiles(checkSrcMap bool) error {
|
||||||
// Delete the spare files
|
// Delete the spare files
|
||||||
toDelete := make(fs.ObjectsChan, fs.Config.Transfers)
|
toDelete := make(fs.ObjectsChan, fs.Config.Transfers)
|
||||||
go func() {
|
go func() {
|
||||||
|
outer:
|
||||||
for remote, o := range s.dstFiles {
|
for remote, o := range s.dstFiles {
|
||||||
if checkSrcMap {
|
if checkSrcMap {
|
||||||
_, exists := s.srcFiles[remote]
|
_, exists := s.srcFiles[remote]
|
||||||
|
@ -417,7 +430,11 @@ func (s *syncCopyMove) deleteFiles(checkSrcMap bool) error {
|
||||||
if s.aborting() {
|
if s.aborting() {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
toDelete <- o
|
select {
|
||||||
|
case <-s.ctx.Done():
|
||||||
|
break outer
|
||||||
|
case toDelete <- o:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
close(toDelete)
|
close(toDelete)
|
||||||
}()
|
}()
|
||||||
|
@ -612,7 +629,11 @@ func (s *syncCopyMove) run() error {
|
||||||
s.makeRenameMap()
|
s.makeRenameMap()
|
||||||
// Attempt renames for all the files which don't have a matching dst
|
// Attempt renames for all the files which don't have a matching dst
|
||||||
for _, src := range s.renameCheck {
|
for _, src := range s.renameCheck {
|
||||||
s.toBeRenamed <- fs.ObjectPair{Src: src, Dst: nil}
|
select {
|
||||||
|
case <-s.ctx.Done():
|
||||||
|
break
|
||||||
|
case s.toBeRenamed <- fs.ObjectPair{Src: src, Dst: nil}:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -646,6 +667,9 @@ func (s *syncCopyMove) run() error {
|
||||||
//delete empty subdirectories that were part of the move
|
//delete empty subdirectories that were part of the move
|
||||||
s.processError(deleteEmptyDirectories(s.fsrc, s.srcEmptyDirs))
|
s.processError(deleteEmptyDirectories(s.fsrc, s.srcEmptyDirs))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// cancel the context to free resources
|
||||||
|
s.cancel()
|
||||||
return s.currentError()
|
return s.currentError()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -663,7 +687,11 @@ func (s *syncCopyMove) DstOnly(dst fs.DirEntry) (recurse bool) {
|
||||||
s.dstFiles[x.Remote()] = x
|
s.dstFiles[x.Remote()] = x
|
||||||
s.dstFilesMu.Unlock()
|
s.dstFilesMu.Unlock()
|
||||||
case fs.DeleteModeDuring, fs.DeleteModeOnly:
|
case fs.DeleteModeDuring, fs.DeleteModeOnly:
|
||||||
s.deleteFilesCh <- x
|
select {
|
||||||
|
case <-s.ctx.Done():
|
||||||
|
return
|
||||||
|
case s.deleteFilesCh <- x:
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
panic(fmt.Sprintf("unexpected delete mode %d", s.deleteMode))
|
panic(fmt.Sprintf("unexpected delete mode %d", s.deleteMode))
|
||||||
}
|
}
|
||||||
|
@ -692,10 +720,18 @@ func (s *syncCopyMove) SrcOnly(src fs.DirEntry) (recurse bool) {
|
||||||
case fs.Object:
|
case fs.Object:
|
||||||
if s.trackRenames {
|
if s.trackRenames {
|
||||||
// Save object to check for a rename later
|
// Save object to check for a rename later
|
||||||
s.trackRenamesCh <- x
|
select {
|
||||||
|
case <-s.ctx.Done():
|
||||||
|
return
|
||||||
|
case s.trackRenamesCh <- x:
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// No need to check since doesn't exist
|
// No need to check since doesn't exist
|
||||||
s.toBeUploaded <- fs.ObjectPair{Src: x, Dst: nil}
|
select {
|
||||||
|
case <-s.ctx.Done():
|
||||||
|
return
|
||||||
|
case s.toBeUploaded <- fs.ObjectPair{Src: x, Dst: nil}:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
case fs.Directory:
|
case fs.Directory:
|
||||||
// Do the same thing to the entire contents of the directory
|
// Do the same thing to the entire contents of the directory
|
||||||
|
@ -719,7 +755,11 @@ func (s *syncCopyMove) Match(dst, src fs.DirEntry) (recurse bool) {
|
||||||
}
|
}
|
||||||
dstX, ok := dst.(fs.Object)
|
dstX, ok := dst.(fs.Object)
|
||||||
if ok {
|
if ok {
|
||||||
s.toBeChecked <- fs.ObjectPair{Src: srcX, Dst: dstX}
|
select {
|
||||||
|
case <-s.ctx.Done():
|
||||||
|
return
|
||||||
|
case s.toBeChecked <- fs.ObjectPair{Src: srcX, Dst: dstX}:
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// FIXME src is file, dst is directory
|
// FIXME src is file, dst is directory
|
||||||
err := errors.New("can't overwrite directory with file")
|
err := errors.New("can't overwrite directory with file")
|
||||||
|
|
Loading…
Reference in a new issue