Compare commits

...
Sign in to create a new pull request.

3 commits

Author SHA1 Message Date
Anagh Kumar Baranwal
bb45b6bb51
Move the metaMu lock from the fs to the object
Signed-off-by: Anagh Kumar Baranwal <6824881+darthShadow@users.noreply.github.com>
2023-08-25 17:35:27 +05:30
Anagh Kumar Baranwal
0b35ff3893
delay stat calls until required for creating the object
Signed-off-by: Anagh Kumar Baranwal <6824881+darthShadow@users.noreply.github.com>
2023-08-25 17:35:27 +05:30
Anagh Kumar Baranwal
050c7159de
feat: local: list objects in parallel controlled by the --checkers option -- fixes #6632
Signed-off-by: Anagh Kumar Baranwal <6824881+darthShadow@users.noreply.github.com>
2023-08-25 17:35:27 +05:30

View file

@ -28,13 +28,13 @@ import (
"github.com/rclone/rclone/lib/encoder"
"github.com/rclone/rclone/lib/file"
"github.com/rclone/rclone/lib/readers"
"golang.org/x/sync/errgroup"
"golang.org/x/text/unicode/norm"
)
// Constants
const devUnset = 0xdeadbeefcafebabe // a device id meaning it is unset
const linkSuffix = ".rclonelink" // The suffix added to a translated symbolic link
const useReadDir = (runtime.GOOS == "windows" || runtime.GOOS == "plan9") // these OSes read FileInfos directly
const devUnset = 0xdeadbeefcafebabe // a device id meaning it is unset
const linkSuffix = ".rclonelink" // The suffix added to a translated symbolic link
// Register with Fs
func init() {
@ -200,9 +200,9 @@ cause disk fragmentation and can be slow to work with.`,
Help: `Disable setting modtime.
Normally rclone updates modification time of files after they are done
uploading. This can cause permissions issues on Linux platforms when
uploading. This can cause permissions issues on Linux platforms when
the user rclone is running as does not own the file uploaded, such as
when copying to a CIFS mount owned by another user. If this option is
when copying to a CIFS mount owned by another user. If this option is
enabled, rclone will no longer update the modtime after copying a file.`,
Default: false,
Advanced: true,
@ -247,16 +247,16 @@ type Fs struct {
xattrSupported atomic.Int32 // whether xattrs are supported
// do os.Lstat or os.Stat
lstat func(name string) (os.FileInfo, error)
objectMetaMu sync.RWMutex // global lock for Object metadata
lstat func(name string) (os.FileInfo, error)
}
// Object represents a local filesystem object
type Object struct {
fs *Fs // The Fs this object is part of
remote string // The remote path (encoded path)
path string // The local path (OS path)
// When using these items the fs.objectMetaMu must be held
fs *Fs // The Fs this object is part of
remote string // The remote path (encoded path)
path string // The local path (OS path)
metaMu sync.RWMutex // lock for metadata
// When using these items the metaMu must be held
size int64 // file metadata - always present
mode os.FileMode
modTime time.Time
@ -485,102 +485,131 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
}()
for {
var fis []os.FileInfo
if useReadDir {
// Windows and Plan9 read the directory entries with the stat information in which
// shouldn't fail because of unreadable entries.
fis, err = fd.Readdir(1024)
if err == io.EOF && len(fis) == 0 {
break
}
} else {
// For other OSes we read the names only (which shouldn't fail) then stat the
// individual ourselves so we can log errors but not fail the directory read.
var names []string
names, err = fd.Readdirnames(1024)
if err == io.EOF && len(names) == 0 {
break
}
if err == nil {
for _, name := range names {
namepath := filepath.Join(fsDirPath, name)
fi, fierr := os.Lstat(namepath)
if os.IsNotExist(fierr) {
// skip entry removed by a concurrent goroutine
continue
}
if fierr != nil {
// Don't report errors on any file names that are excluded
if useFilter {
newRemote := f.cleanRemote(dir, name)
if !filter.IncludeRemote(newRemote) {
continue
}
}
fierr = fmt.Errorf("failed to get info about directory entry %q: %w", namepath, fierr)
fs.Errorf(dir, "%v", fierr)
_ = accounting.Stats(ctx).Error(fserrors.NoRetryError(fierr)) // fail the sync
continue
}
fis = append(fis, fi)
}
}
var des []os.DirEntry
des, err = fd.ReadDir(1024)
if err == io.EOF && len(des) == 0 {
break
}
if err != nil {
return nil, fmt.Errorf("failed to read directory entry: %w", err)
}
for _, fi := range fis {
name := fi.Name()
mode := fi.Mode()
newRemote := f.cleanRemote(dir, name)
// Follow symlinks if required
if f.opt.FollowSymlinks && (mode&os.ModeSymlink) != 0 {
localPath := filepath.Join(fsDirPath, name)
fi, err = os.Stat(localPath)
// Quietly skip errors on excluded files and directories
if err != nil && useFilter && !filter.IncludeRemote(newRemote) {
continue
loopEntries := make(fs.DirEntries, len(des))
g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(fs.GetConfig(ctx).Checkers)
for i, de := range des {
i, de := i, de // https://golang.org/doc/faq#closures_and_goroutines
g.Go(func() error {
// No point in continuing if context has been cancelled
if gCtx.Err() != nil {
return nil
}
if os.IsNotExist(err) || isCircularSymlinkError(err) {
// Skip bad symlinks and circular symlinks
err = fserrors.NoRetryError(fmt.Errorf("symlink: %w", err))
fs.Errorf(newRemote, "Listing error: %v", err)
err = accounting.Stats(ctx).Error(err)
continue
var (
err error
fi os.FileInfo
)
name := de.Name()
mode := de.Type()
namepath := filepath.Join(fsDirPath, name)
newRemote := f.cleanRemote(dir, name)
if de.IsDir() {
// Ignore directories which are symlinks. These are junction points under windows which
// are kind of a souped up symlink. Unix doesn't have directories which are symlinks.
if (mode & os.ModeSymlink) != 0 {
return nil
}
}
fi, err = de.Info()
if err != nil {
return nil, err
if os.IsNotExist(err) {
// skip entry removed by a concurrent goroutine
return nil
}
if useFilter && !filter.IncludeRemote(newRemote) {
return nil
}
err = fmt.Errorf("failed to get info about directory entry %q: %w", namepath, err)
fs.Errorf(dir, "%v", err)
_ = accounting.Stats(gCtx).Error(fserrors.NoRetryError(err)) // fail the sync
return nil
}
name = fi.Name()
mode = fi.Mode()
namepath = filepath.Join(fsDirPath, name)
newRemote = f.cleanRemote(dir, name)
// Follow symlinks if required
if f.opt.FollowSymlinks && (mode&os.ModeSymlink) != 0 {
fi, err = os.Stat(namepath)
if err != nil {
// Quietly skip errors on excluded files and directories
if useFilter && !filter.IncludeRemote(newRemote) {
return nil
}
if os.IsNotExist(err) || isCircularSymlinkError(err) {
// Skip bad symlinks and circular symlinks
err = fserrors.NoRetryError(fmt.Errorf("symlink: %w", err))
fs.Errorf(newRemote, "Listing error: %v", err)
_ = accounting.Stats(gCtx).Error(err)
return nil
}
return err
}
mode = fi.Mode()
}
// No point in continuing if context has been cancelled
if gCtx.Err() != nil {
return nil
}
if fi.IsDir() {
if f.dev == readDevice(fi, f.opt.OneFileSystem) {
d := fs.NewDir(newRemote, fi.ModTime())
loopEntries[i] = d
return nil
}
} else {
// Check whether this link should be translated
if f.opt.TranslateSymlinks && mode&os.ModeSymlink != 0 {
newRemote += linkSuffix
}
// Don't include non directory if not included
// we leave directory filtering to the layer above
if useFilter && !filter.IncludeRemote(newRemote) {
return nil
}
fso, err := f.newObjectWithInfo(newRemote, fi)
if err != nil {
return err
}
if fso.Storable() {
loopEntries[i] = fso
return nil
}
}
return nil
})
}
err = g.Wait()
if err != nil {
return nil, err
}
for _, entry := range loopEntries {
if entry == nil {
continue
}
if fi.IsDir() {
// Ignore directories which are symlinks. These are junction points under windows which
// are kind of a souped up symlink. Unix doesn't have directories which are symlinks.
if (mode&os.ModeSymlink) == 0 && f.dev == readDevice(fi, f.opt.OneFileSystem) {
d := fs.NewDir(newRemote, fi.ModTime())
entries = append(entries, d)
}
} else {
// Check whether this link should be translated
if f.opt.TranslateSymlinks && fi.Mode()&os.ModeSymlink != 0 {
newRemote += linkSuffix
}
// Don't include non directory if not included
// we leave directory filtering to the layer above
if useFilter && !filter.IncludeRemote(newRemote) {
continue
}
fso, err := f.newObjectWithInfo(newRemote, fi)
if err != nil {
return nil, err
}
if fso.Storable() {
entries = append(entries, fso)
}
}
entries = append(entries, entry)
}
}
return entries, nil
}
@ -748,9 +777,9 @@ func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object,
// Temporary Object under construction
dstObj := f.newObject(remote)
dstObj.fs.objectMetaMu.RLock()
dstObj.metaMu.RLock()
dstObjMode := dstObj.mode
dstObj.fs.objectMetaMu.RUnlock()
dstObj.metaMu.RUnlock()
// Check it is a file if it exists
err := dstObj.lstat()
@ -912,10 +941,10 @@ func (o *Object) Remote() string {
// Hash returns the requested hash of a file as a lowercase hex string
func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) {
// Check that the underlying file hasn't changed
o.fs.objectMetaMu.RLock()
o.metaMu.RLock()
oldtime := o.modTime
oldsize := o.size
o.fs.objectMetaMu.RUnlock()
o.metaMu.RUnlock()
err := o.lstat()
var changed bool
if err != nil {
@ -927,14 +956,14 @@ func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) {
return "", fmt.Errorf("hash: failed to stat: %w", err)
}
} else {
o.fs.objectMetaMu.RLock()
o.metaMu.RLock()
changed = !o.modTime.Equal(oldtime) || oldsize != o.size
o.fs.objectMetaMu.RUnlock()
o.metaMu.RUnlock()
}
o.fs.objectMetaMu.RLock()
o.metaMu.RLock()
hashValue, hashFound := o.hashes[r]
o.fs.objectMetaMu.RUnlock()
o.metaMu.RUnlock()
if changed || !hashFound {
var in io.ReadCloser
@ -965,28 +994,28 @@ func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) {
return "", fmt.Errorf("hash: failed to close: %w", closeErr)
}
hashValue = hashes[r]
o.fs.objectMetaMu.Lock()
o.metaMu.Lock()
if o.hashes == nil {
o.hashes = hashes
} else {
o.hashes[r] = hashValue
}
o.fs.objectMetaMu.Unlock()
o.metaMu.Unlock()
}
return hashValue, nil
}
// Size returns the size of an object in bytes
func (o *Object) Size() int64 {
o.fs.objectMetaMu.RLock()
defer o.fs.objectMetaMu.RUnlock()
o.metaMu.RLock()
defer o.metaMu.RUnlock()
return o.size
}
// ModTime returns the modification time of the object
func (o *Object) ModTime(ctx context.Context) time.Time {
o.fs.objectMetaMu.RLock()
defer o.fs.objectMetaMu.RUnlock()
o.metaMu.RLock()
defer o.metaMu.RUnlock()
return o.modTime
}
@ -1015,9 +1044,9 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error {
// Storable returns a boolean showing if this object is storable
func (o *Object) Storable() bool {
o.fs.objectMetaMu.RLock()
o.metaMu.RLock()
mode := o.mode
o.fs.objectMetaMu.RUnlock()
o.metaMu.RUnlock()
if mode&os.ModeSymlink != 0 && !o.fs.opt.TranslateSymlinks {
if !o.fs.opt.SkipSymlinks {
fs.Logf(o, "Can't follow symlink without -L/--copy-links")
@ -1050,10 +1079,10 @@ func (file *localOpenFile) Read(p []byte) (n int, err error) {
if err != nil {
return 0, fmt.Errorf("can't read status of source file while transferring: %w", err)
}
file.o.fs.objectMetaMu.RLock()
file.o.metaMu.RLock()
oldtime := file.o.modTime
oldsize := file.o.size
file.o.fs.objectMetaMu.RUnlock()
file.o.metaMu.RUnlock()
if oldsize != fi.Size() {
return 0, fserrors.NoLowLevelRetryError(fmt.Errorf("can't copy - source file is being updated (size changed from %d to %d)", oldsize, fi.Size()))
}
@ -1075,9 +1104,9 @@ func (file *localOpenFile) Close() (err error) {
err = file.in.Close()
if err == nil {
if file.hash.Size() == file.o.Size() {
file.o.fs.objectMetaMu.Lock()
file.o.metaMu.Lock()
file.o.hashes = file.hash.Sums()
file.o.fs.objectMetaMu.Unlock()
file.o.metaMu.Unlock()
}
}
return err
@ -1269,9 +1298,9 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
// All successful so update the hashes
if hasher != nil {
o.fs.objectMetaMu.Lock()
o.metaMu.Lock()
o.hashes = hasher.Sums()
o.fs.objectMetaMu.Unlock()
o.metaMu.Unlock()
}
// Set the mtime
@ -1345,11 +1374,11 @@ func (o *Object) setMetadata(info os.FileInfo) {
if o.fs.opt.NoCheckUpdated && !o.modTime.IsZero() {
return
}
o.fs.objectMetaMu.Lock()
o.metaMu.Lock()
o.size = info.Size()
o.modTime = info.ModTime()
o.mode = info.Mode()
o.fs.objectMetaMu.Unlock()
o.metaMu.Unlock()
// Read the size of the link.
//
// The value in info.Size() is not always correct
@ -1368,9 +1397,9 @@ func (o *Object) setMetadata(info os.FileInfo) {
// clearHashCache wipes any cached hashes for the object
func (o *Object) clearHashCache() {
o.fs.objectMetaMu.Lock()
o.metaMu.Lock()
o.hashes = nil
o.fs.objectMetaMu.Unlock()
o.metaMu.Unlock()
}
// Stat an Object into info