fs: factor multiple directory traverse out of sync

This commit is contained in:
Nick Craig-Wood 2017-09-01 15:21:46 +01:00
parent 8574129892
commit fe96d5cf0a
3 changed files with 341 additions and 274 deletions

317
fs/march.go Normal file
View file

@ -0,0 +1,317 @@
package fs
import (
"sync"
"golang.org/x/net/context"
)
// march traverses two Fs simultaneously, calling walker for each match
type march struct {
// parameters
ctx context.Context
fdst Fs
fsrc Fs
dir string
callback marcher
// internal state
srcListDir listDirFn // function to call to list a directory in the src
dstListDir listDirFn // function to call to list a directory in the dst
}
// marcher is called on each match
type marcher interface {
// SrcOnly is called for a DirEntry found only in the source
SrcOnly(src DirEntry) (recurse bool)
// DstOnly is called for a DirEntry found only in the destination
DstOnly(dst DirEntry) (recurse bool)
// Match is called for a DirEntry found both in the source and destination
Match(dst, src DirEntry) (recurse bool)
}
// newMarch sets up a march over fsrc, and fdst calling back callback for each match
func newMarch(ctx context.Context, fdst, fsrc Fs, dir string, callback marcher) *march {
m := &march{
ctx: ctx,
fdst: fdst,
fsrc: fsrc,
dir: dir,
callback: callback,
}
m.srcListDir = m.makeListDir(fsrc, false)
m.dstListDir = m.makeListDir(fdst, Config.Filter.DeleteExcluded)
return m
}
// list a directory into entries, err
type listDirFn func(dir string) (entries DirEntries, err error)
// makeListDir makes a listing function for the given fs and includeAll flags
func (m *march) makeListDir(f Fs, includeAll bool) listDirFn {
if !Config.UseListR || f.Features().ListR == nil {
return func(dir string) (entries DirEntries, err error) {
return ListDirSorted(f, includeAll, dir)
}
}
var (
mu sync.Mutex
started bool
dirs DirTree
dirsErr error
)
return func(dir string) (entries DirEntries, err error) {
mu.Lock()
defer mu.Unlock()
if !started {
dirs, dirsErr = NewDirTree(f, m.dir, includeAll, Config.MaxDepth)
started = true
}
if dirsErr != nil {
return nil, dirsErr
}
entries, ok := dirs[dir]
if !ok {
err = ErrorDirNotFound
} else {
delete(dirs, dir)
}
return entries, err
}
}
// listDirJob describe a directory listing that needs to be done
type listDirJob struct {
srcRemote string
dstRemote string
srcDepth int
dstDepth int
noSrc bool
noDst bool
}
// run starts the matching process off
func (m *march) run() {
srcDepth := Config.MaxDepth
if srcDepth < 0 {
srcDepth = MaxLevel
}
dstDepth := srcDepth
if Config.Filter.DeleteExcluded {
dstDepth = MaxLevel
}
// Start some directory listing go routines
var wg sync.WaitGroup // sync closing of go routines
var traversing sync.WaitGroup // running directory traversals
in := make(chan listDirJob, Config.Checkers)
for i := 0; i < Config.Checkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-m.ctx.Done():
return
case job, ok := <-in:
if !ok {
return
}
jobs := m.processJob(job)
if len(jobs) > 0 {
traversing.Add(len(jobs))
go func() {
// Now we have traversed this directory, send these
// jobs off for traversal in the background
for _, newJob := range jobs {
in <- newJob
}
}()
}
traversing.Done()
}
}
}()
}
// Start the process
traversing.Add(1)
in <- listDirJob{
srcRemote: m.dir,
srcDepth: srcDepth - 1,
dstRemote: m.dir,
dstDepth: dstDepth - 1,
}
traversing.Wait()
close(in)
wg.Wait()
}
// Check to see if the context has been cancelled
func (m *march) aborting() bool {
select {
case <-m.ctx.Done():
return true
default:
}
return false
}
type matchPair struct {
src, dst DirEntry
}
// Process the two sorted listings, matching up the items in the two
// sorted slices
//
// Into srcOnly go Entries which only exist in the srcList
// Into dstOnly go Entries which only exist in the dstList
// Into matches go matchPair's of src and dst which have the same name
//
// This checks for duplicates and checks the list is sorted.
func matchListings(srcList, dstList DirEntries) (srcOnly DirEntries, dstOnly DirEntries, matches []matchPair) {
for iSrc, iDst := 0, 0; ; iSrc, iDst = iSrc+1, iDst+1 {
var src, dst DirEntry
var srcRemote, dstRemote string
if iSrc < len(srcList) {
src = srcList[iSrc]
srcRemote = src.Remote()
}
if iDst < len(dstList) {
dst = dstList[iDst]
dstRemote = dst.Remote()
}
if src == nil && dst == nil {
break
}
if src != nil && iSrc > 0 {
prev := srcList[iSrc-1].Remote()
if srcRemote == prev {
Logf(src, "Duplicate %s found in source - ignoring", DirEntryType(src))
src = nil // ignore the src
} else if srcRemote < prev {
Errorf(src, "Out of order listing in source")
src = nil // ignore the src
}
}
if dst != nil && iDst > 0 {
prev := dstList[iDst-1].Remote()
if dstRemote == prev {
Logf(dst, "Duplicate %s found in destination - ignoring", DirEntryType(dst))
dst = nil // ignore the dst
} else if dstRemote < prev {
Errorf(dst, "Out of order listing in destination")
dst = nil // ignore the dst
}
}
if src != nil && dst != nil {
if srcRemote < dstRemote {
dst = nil
iDst-- // retry the dst
} else if srcRemote > dstRemote {
src = nil
iSrc-- // retry the src
}
}
// Debugf(nil, "src = %v, dst = %v", src, dst)
switch {
case src == nil && dst == nil:
// do nothing
case src == nil:
dstOnly = append(dstOnly, dst)
case dst == nil:
srcOnly = append(srcOnly, src)
default:
matches = append(matches, matchPair{src: src, dst: dst})
}
}
return
}
// processJob processes a listDirJob listing the source and
// destination directories, comparing them and returning a slice of
// more jobs
//
// returns errors using processError
func (m *march) processJob(job listDirJob) (jobs []listDirJob) {
var (
srcList, dstList DirEntries
srcListErr, dstListErr error
wg sync.WaitGroup
)
// List the src and dst directories
if !job.noSrc {
wg.Add(1)
go func() {
defer wg.Done()
srcList, srcListErr = m.srcListDir(job.srcRemote)
}()
}
if !job.noDst {
wg.Add(1)
go func() {
defer wg.Done()
dstList, dstListErr = m.dstListDir(job.dstRemote)
}()
}
// Wait for listings to complete and report errors
wg.Wait()
if srcListErr != nil {
Errorf(job.srcRemote, "error reading source directory: %v", srcListErr)
Stats.Error()
return nil
}
if dstListErr == ErrorDirNotFound {
// Copy the stuff anyway
} else if dstListErr != nil {
Errorf(job.dstRemote, "error reading destination directory: %v", dstListErr)
Stats.Error()
return nil
}
// Work out what to do and do it
srcOnly, dstOnly, matches := matchListings(srcList, dstList)
for _, src := range srcOnly {
if m.aborting() {
return nil
}
recurse := m.callback.SrcOnly(src)
if recurse && job.srcDepth > 0 {
jobs = append(jobs, listDirJob{
srcRemote: src.Remote(),
srcDepth: job.srcDepth - 1,
noDst: true,
})
}
}
for _, dst := range dstOnly {
if m.aborting() {
return nil
}
recurse := m.callback.DstOnly(dst)
if recurse && job.dstDepth > 0 {
jobs = append(jobs, listDirJob{
dstRemote: dst.Remote(),
dstDepth: job.dstDepth - 1,
noSrc: true,
})
}
}
for _, match := range matches {
if m.aborting() {
return nil
}
recurse := m.callback.Match(match.dst, match.src)
if recurse && job.srcDepth > 0 && job.dstDepth > 0 {
jobs = append(jobs, listDirJob{
srcRemote: match.src.Remote(),
dstRemote: match.dst.Remote(),
srcDepth: job.srcDepth - 1,
dstDepth: job.dstDepth - 1,
})
}
}
return jobs
}

View file

@ -1,4 +1,4 @@
// Internal tests for sync/copy/move
// Internal tests for march
package fs

View file

@ -3,6 +3,7 @@
package fs
import (
"context"
"fmt"
"sort"
"sync"
@ -52,8 +53,6 @@ type syncCopyMove struct {
renameCheck []Object // accumulate files to check for rename here
backupDir Fs // place to store overwrites/deletes
suffix string // suffix to add to files placed in backupDir
srcListDir listDirFn // function to call to list a directory in the src
dstListDir listDirFn // function to call to list a directory in the dst
}
func newSyncCopyMove(fdst, fsrc Fs, deleteMode DeleteMode, DoMove bool) (*syncCopyMove, error) {
@ -122,47 +121,9 @@ func newSyncCopyMove(fdst, fsrc Fs, deleteMode DeleteMode, DoMove bool) (*syncCo
}
s.suffix = Config.Suffix
}
s.srcListDir = s.makeListDir(fsrc, false)
s.dstListDir = s.makeListDir(fdst, Config.Filter.DeleteExcluded)
return s, nil
}
// list a directory into entries, err
type listDirFn func(dir string) (entries DirEntries, err error)
// makeListDir makes a listing function for the given fs and includeAll flags
func (s *syncCopyMove) makeListDir(f Fs, includeAll bool) listDirFn {
if !Config.UseListR || f.Features().ListR == nil {
return func(dir string) (entries DirEntries, err error) {
return ListDirSorted(f, includeAll, dir)
}
}
var (
mu sync.Mutex
started bool
dirs DirTree
dirsErr error
)
return func(dir string) (entries DirEntries, err error) {
mu.Lock()
defer mu.Unlock()
if !started {
dirs, dirsErr = NewDirTree(f, s.dir, includeAll, Config.MaxDepth)
started = true
}
if dirsErr != nil {
return nil, dirsErr
}
entries, ok := dirs[dir]
if !ok {
err = ErrorDirNotFound
} else {
delete(dirs, dir)
}
return entries, err
}
}
// Check to see if have set the abort flag
func (s *syncCopyMove) aborting() bool {
select {
@ -667,15 +628,6 @@ func (s *syncCopyMove) tryRename(src Object) bool {
return true
}
// listDirJob describe a directory listing that needs to be done
type listDirJob struct {
remote string
srcDepth int
dstDepth int
noSrc bool
noDst bool
}
// Syncs fsrc into fdst
//
// If Delete is true then it deletes any files in fdst that aren't in fsrc
@ -684,15 +636,6 @@ type listDirJob struct {
//
// dir is the start directory, "" for root
func (s *syncCopyMove) run() error {
srcDepth := Config.MaxDepth
if srcDepth < 0 {
srcDepth = MaxLevel
}
dstDepth := srcDepth
if Config.Filter.DeleteExcluded {
dstDepth = MaxLevel
}
if Same(s.fdst, s.fsrc) {
Errorf(s.fdst, "Nothing to do as source and destination are the same")
return nil
@ -705,53 +648,11 @@ func (s *syncCopyMove) run() error {
s.startDeleters()
s.dstFiles = make(map[string]Object)
// Start some directory listing go routines
var wg sync.WaitGroup // sync closing of go routines
var traversing sync.WaitGroup // running directory traversals
in := make(chan listDirJob, Config.Checkers)
s.startTrackRenames()
for i := 0; i < Config.Checkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
if s.aborting() {
return
}
select {
case job, ok := <-in:
if !ok {
return
}
jobs := s.processJob(job)
if len(jobs) > 0 {
traversing.Add(len(jobs))
go func() {
// Now we have traversed this directory, send these
// jobs off for traversal in the background
for _, newJob := range jobs {
in <- newJob
}
}()
}
traversing.Done()
case <-s.abort:
return
}
}
}()
}
// Start the process
traversing.Add(1)
in <- listDirJob{
remote: s.dir,
srcDepth: srcDepth - 1,
dstDepth: dstDepth - 1,
}
traversing.Wait()
close(in)
wg.Wait()
ctx := context.Background()
m := newMarch(ctx, s.fdst, s.fsrc, s.dir, s)
m.run()
s.stopTrackRenames()
if s.trackRenames {
@ -789,10 +690,10 @@ func (s *syncCopyMove) run() error {
return s.currentError()
}
// Have an object which is in the destination only
func (s *syncCopyMove) dstOnly(dst DirEntry, job listDirJob, jobs *[]listDirJob) {
// DstOnly have an object which is in the destination only
func (s *syncCopyMove) DstOnly(dst DirEntry) (recurse bool) {
if s.deleteMode == DeleteModeOff {
return
return false
}
switch x := dst.(type) {
case Object:
@ -809,29 +710,24 @@ func (s *syncCopyMove) dstOnly(dst DirEntry, job listDirJob, jobs *[]listDirJob)
}
case Directory:
// Do the same thing to the entire contents of the directory
if job.dstDepth > 0 {
*jobs = append(*jobs, listDirJob{
remote: dst.Remote(),
dstDepth: job.dstDepth - 1,
noSrc: true,
})
}
// Record directory as it is potentially empty and needs deleting
if s.fdst.Features().CanHaveEmptyDirectories {
s.dstEmptyDirsMu.Lock()
s.dstEmptyDirs = append(s.dstEmptyDirs, dst)
s.dstEmptyDirsMu.Unlock()
}
return true
default:
panic("Bad object in DirEntries")
}
return false
}
// Have an object which is in the source only
func (s *syncCopyMove) srcOnly(src DirEntry, job listDirJob, jobs *[]listDirJob) {
// SrcOnly have an object which is in the source only
func (s *syncCopyMove) SrcOnly(src DirEntry) (recurse bool) {
if s.deleteMode == DeleteModeOnly {
return
return false
}
switch x := src.(type) {
case Object:
@ -844,24 +740,19 @@ func (s *syncCopyMove) srcOnly(src DirEntry, job listDirJob, jobs *[]listDirJob)
}
case Directory:
// Do the same thing to the entire contents of the directory
if job.srcDepth > 0 {
*jobs = append(*jobs, listDirJob{
remote: src.Remote(),
srcDepth: job.srcDepth - 1,
noDst: true,
})
}
return true
default:
panic("Bad object in DirEntries")
}
return false
}
// Given a src and a dst, transfer the src to dst
func (s *syncCopyMove) transfer(dst, src DirEntry, job listDirJob, jobs *[]listDirJob) {
// Match is called when src and dst are present, so sync src to dst
func (s *syncCopyMove) Match(dst, src DirEntry) (recurse bool) {
switch srcX := src.(type) {
case Object:
if s.deleteMode == DeleteModeOnly {
return
return false
}
dstX, ok := dst.(Object)
if ok {
@ -876,157 +767,16 @@ func (s *syncCopyMove) transfer(dst, src DirEntry, job listDirJob, jobs *[]listD
// Do the same thing to the entire contents of the directory
_, ok := dst.(Directory)
if ok {
if job.srcDepth > 0 && job.dstDepth > 0 {
*jobs = append(*jobs, listDirJob{
remote: src.Remote(),
srcDepth: job.srcDepth - 1,
dstDepth: job.dstDepth - 1,
})
}
} else {
// FIXME src is dir, dst is file
err := errors.New("can't overwrite file with directory")
Errorf(dst, "%v", err)
s.processError(err)
return true
}
// FIXME src is dir, dst is file
err := errors.New("can't overwrite file with directory")
Errorf(dst, "%v", err)
s.processError(err)
default:
panic("Bad object in DirEntries")
}
}
type matchPair struct {
src, dst DirEntry
}
// Process the two sorted listings, matching up the items in the two
// sorted slices
//
// Into srcOnly go Entries which only exist in the srcList
// Into dstOnly go Entries which only exist in the dstList
// Into matches go matchPair's of src and dst which have the same name
//
// This checks for duplicates and checks the list is sorted.
func matchListings(srcList, dstList DirEntries) (srcOnly DirEntries, dstOnly DirEntries, matches []matchPair) {
for iSrc, iDst := 0, 0; ; iSrc, iDst = iSrc+1, iDst+1 {
var src, dst DirEntry
var srcRemote, dstRemote string
if iSrc < len(srcList) {
src = srcList[iSrc]
srcRemote = src.Remote()
}
if iDst < len(dstList) {
dst = dstList[iDst]
dstRemote = dst.Remote()
}
if src == nil && dst == nil {
break
}
if src != nil && iSrc > 0 {
prev := srcList[iSrc-1].Remote()
if srcRemote == prev {
Logf(src, "Duplicate %s found in source - ignoring", DirEntryType(src))
src = nil // ignore the src
} else if srcRemote < prev {
Errorf(src, "Out of order listing in source")
src = nil // ignore the src
}
}
if dst != nil && iDst > 0 {
prev := dstList[iDst-1].Remote()
if dstRemote == prev {
Logf(dst, "Duplicate %s found in destination - ignoring", DirEntryType(dst))
dst = nil // ignore the dst
} else if dstRemote < prev {
Errorf(dst, "Out of order listing in destination")
dst = nil // ignore the dst
}
}
if src != nil && dst != nil {
if srcRemote < dstRemote {
dst = nil
iDst-- // retry the dst
} else if srcRemote > dstRemote {
src = nil
iSrc-- // retry the src
}
}
// Debugf(nil, "src = %v, dst = %v", src, dst)
switch {
case src == nil && dst == nil:
// do nothing
case src == nil:
dstOnly = append(dstOnly, dst)
case dst == nil:
srcOnly = append(srcOnly, src)
default:
matches = append(matches, matchPair{src: src, dst: dst})
}
}
return
}
// processJob processes a listDirJob listing the source and
// destination directories, comparing them and returning a slice of
// more jobs
//
// returns errors using processError
func (s *syncCopyMove) processJob(job listDirJob) (jobs []listDirJob) {
var (
srcList, dstList DirEntries
srcListErr, dstListErr error
wg sync.WaitGroup
)
// List the src and dst directories
if !job.noSrc {
wg.Add(1)
go func() {
defer wg.Done()
srcList, srcListErr = s.srcListDir(job.remote)
}()
}
if !job.noDst {
wg.Add(1)
go func() {
defer wg.Done()
dstList, dstListErr = s.dstListDir(job.remote)
}()
}
// Wait for listings to complete and report errors
wg.Wait()
if srcListErr != nil {
s.processError(errors.Wrapf(srcListErr, "error reading source directory %q", job.remote))
return nil
}
if dstListErr == ErrorDirNotFound {
// Copy the stuff anyway
} else if dstListErr != nil {
s.processError(errors.Wrapf(dstListErr, "error reading destination directory %q", job.remote))
return nil
}
// Work out what to do and do it
srcOnly, dstOnly, matches := matchListings(srcList, dstList)
for _, src := range srcOnly {
if s.aborting() {
return nil
}
s.srcOnly(src, job, &jobs)
}
for _, dst := range dstOnly {
if s.aborting() {
return nil
}
s.dstOnly(dst, job, &jobs)
}
for _, match := range matches {
if s.aborting() {
return nil
}
s.transfer(match.dst, match.src, job, &jobs)
}
return jobs
return false
}
// Syncs fsrc into fdst