From 050c7159de6c1474bb4279f078cad85ba56d399c Mon Sep 17 00:00:00 2001 From: Anagh Kumar Baranwal <6824881+darthShadow@users.noreply.github.com> Date: Tue, 1 Aug 2023 18:42:00 +0530 Subject: [PATCH] feat: local: list objects in parallel controlled by the --checkers option -- fixes #6632 Signed-off-by: Anagh Kumar Baranwal <6824881+darthShadow@users.noreply.github.com> --- backend/local/local.go | 187 ++++++++++++++++++++++++++--------------- 1 file changed, 121 insertions(+), 66 deletions(-) diff --git a/backend/local/local.go b/backend/local/local.go index 5cee05af4..330739624 100644 --- a/backend/local/local.go +++ b/backend/local/local.go @@ -28,6 +28,7 @@ import ( "github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/file" "github.com/rclone/rclone/lib/readers" + "golang.org/x/sync/errgroup" "golang.org/x/text/unicode/norm" ) @@ -200,9 +201,9 @@ cause disk fragmentation and can be slow to work with.`, Help: `Disable setting modtime. Normally rclone updates modification time of files after they are done -uploading. This can cause permissions issues on Linux platforms when +uploading. This can cause permissions issues on Linux platforms when the user rclone is running as does not own the file uploaded, such as -when copying to a CIFS mount owned by another user. If this option is +when copying to a CIFS mount owned by another user. If this option is enabled, rclone will no longer update the modtime after copying a file.`, Default: false, Advanced: true, @@ -486,6 +487,7 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e for { var fis []os.FileInfo + if useReadDir { // Windows and Plan9 read the directory entries with the stat information in which // shouldn't fail because of unreadable entries. @@ -495,92 +497,145 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e } } else { // For other OSes we read the names only (which shouldn't fail) then stat the - // individual ourselves so we can log errors but not fail the directory read. + // individual ourselves, so we can log errors but not fail the directory read. var names []string names, err = fd.Readdirnames(1024) if err == io.EOF && len(names) == 0 { break } + + fis = make([]os.FileInfo, len(names)) + + g, gCtx := errgroup.WithContext(ctx) + g.SetLimit(fs.GetConfig(ctx).Checkers) if err == nil { - for _, name := range names { - namepath := filepath.Join(fsDirPath, name) - fi, fierr := os.Lstat(namepath) - if os.IsNotExist(fierr) { - // skip entry removed by a concurrent goroutine - continue - } - if fierr != nil { - // Don't report errors on any file names that are excluded - if useFilter { - newRemote := f.cleanRemote(dir, name) - if !filter.IncludeRemote(newRemote) { - continue - } + for i, name := range names { + i, name := i, name // https://golang.org/doc/faq#closures_and_goroutines + g.Go(func() error { + // No point in continuing if context has been cancelled + if gCtx.Err() != nil { + return nil } - fierr = fmt.Errorf("failed to get info about directory entry %q: %w", namepath, fierr) - fs.Errorf(dir, "%v", fierr) - _ = accounting.Stats(ctx).Error(fserrors.NoRetryError(fierr)) // fail the sync - continue - } - fis = append(fis, fi) + + var err error + namepath := filepath.Join(fsDirPath, name) + fi, fierr := os.Lstat(namepath) + if os.IsNotExist(fierr) { + // skip entry removed by a concurrent goroutine + return nil + } + if fierr != nil { + if useFilter { + newRemote := f.cleanRemote(dir, name) + if !filter.IncludeRemote(newRemote) { + return nil + } + } + err = fmt.Errorf("failed to get info about directory entry %q: %w", namepath, fierr) + fs.Errorf(dir, "%v", err) + _ = accounting.Stats(gCtx).Error(fserrors.NoRetryError(err)) // fail the sync + return nil + } + fis[i] = fi + return nil + }) } } + err = g.Wait() } if err != nil { return nil, fmt.Errorf("failed to read directory entry: %w", err) } - for _, fi := range fis { - name := fi.Name() - mode := fi.Mode() - newRemote := f.cleanRemote(dir, name) - // Follow symlinks if required - if f.opt.FollowSymlinks && (mode&os.ModeSymlink) != 0 { - localPath := filepath.Join(fsDirPath, name) - fi, err = os.Stat(localPath) - // Quietly skip errors on excluded files and directories - if err != nil && useFilter && !filter.IncludeRemote(newRemote) { - continue - } - if os.IsNotExist(err) || isCircularSymlinkError(err) { - // Skip bad symlinks and circular symlinks - err = fserrors.NoRetryError(fmt.Errorf("symlink: %w", err)) - fs.Errorf(newRemote, "Listing error: %v", err) - err = accounting.Stats(ctx).Error(err) - continue - } - if err != nil { - return nil, err - } - mode = fi.Mode() + loopEntries := make(fs.DirEntries, len(fis)) + + g, gCtx := errgroup.WithContext(ctx) + g.SetLimit(fs.GetConfig(ctx).Checkers) + for i, fi := range fis { + if fi == nil { + continue } - if fi.IsDir() { - // Ignore directories which are symlinks. These are junction points under windows which - // are kind of a souped up symlink. Unix doesn't have directories which are symlinks. - if (mode&os.ModeSymlink) == 0 && f.dev == readDevice(fi, f.opt.OneFileSystem) { - d := fs.NewDir(newRemote, fi.ModTime()) - entries = append(entries, d) + i, fi := i, fi // https://golang.org/doc/faq#closures_and_goroutines + g.Go(func() error { + // No point in continuing if context has been cancelled + if gCtx.Err() != nil { + return nil } - } else { - // Check whether this link should be translated - if f.opt.TranslateSymlinks && fi.Mode()&os.ModeSymlink != 0 { - newRemote += linkSuffix + + var err error + name := fi.Name() + mode := fi.Mode() + newRemote := f.cleanRemote(dir, name) + + // Follow symlinks if required + if f.opt.FollowSymlinks && (mode&os.ModeSymlink) != 0 { + localPath := filepath.Join(fsDirPath, name) + fi, err = os.Stat(localPath) + if err != nil { + // Quietly skip errors on excluded files and directories + if useFilter && !filter.IncludeRemote(newRemote) { + return nil + } + if os.IsNotExist(err) || isCircularSymlinkError(err) { + // Skip bad symlinks and circular symlinks + err = fserrors.NoRetryError(fmt.Errorf("symlink: %w", err)) + fs.Errorf(newRemote, "Listing error: %v", err) + _ = accounting.Stats(gCtx).Error(err) + return nil + } + return err + } + mode = fi.Mode() } - // Don't include non directory if not included - // we leave directory filtering to the layer above - if useFilter && !filter.IncludeRemote(newRemote) { - continue + + // No point in continuing if context has been cancelled + if gCtx.Err() != nil { + return nil } - fso, err := f.newObjectWithInfo(newRemote, fi) - if err != nil { - return nil, err - } - if fso.Storable() { - entries = append(entries, fso) + + if fi.IsDir() { + // Ignore directories which are symlinks. These are junction points under windows which + // are kind of a souped up symlink. Unix doesn't have directories which are symlinks. + if (mode&os.ModeSymlink) == 0 && f.dev == readDevice(fi, f.opt.OneFileSystem) { + d := fs.NewDir(newRemote, fi.ModTime()) + loopEntries[i] = d + return nil + } + } else { + // Check whether this link should be translated + if f.opt.TranslateSymlinks && fi.Mode()&os.ModeSymlink != 0 { + newRemote += linkSuffix + } + // Don't include non directory if not included + // we leave directory filtering to the layer above + if useFilter && !filter.IncludeRemote(newRemote) { + return nil + } + fso, err := f.newObjectWithInfo(newRemote, fi) + if err != nil { + return err + } + if fso.Storable() { + loopEntries[i] = fso + return nil + } } + return nil + }) + } + err = g.Wait() + if err != nil { + return nil, err + } + + for _, entry := range loopEntries { + if entry == nil { + continue } + entries = append(entries, entry) } } + return entries, nil }