Compare commits

...
Sign in to create a new pull request.

3 commits

Author SHA1 Message Date
Anagh Kumar Baranwal
bb45b6bb51
Move the metaMu lock from the fs to the object
Signed-off-by: Anagh Kumar Baranwal <6824881+darthShadow@users.noreply.github.com>
2023-08-25 17:35:27 +05:30
Anagh Kumar Baranwal
0b35ff3893
delay stat calls until required for creating the object
Signed-off-by: Anagh Kumar Baranwal <6824881+darthShadow@users.noreply.github.com>
2023-08-25 17:35:27 +05:30
Anagh Kumar Baranwal
050c7159de
feat: local: list objects in parallel controlled by the --checkers option -- fixes #6632
Signed-off-by: Anagh Kumar Baranwal <6824881+darthShadow@users.noreply.github.com>
2023-08-25 17:35:27 +05:30

View file

@ -28,13 +28,13 @@ import (
"github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/encoder"
"github.com/rclone/rclone/lib/file" "github.com/rclone/rclone/lib/file"
"github.com/rclone/rclone/lib/readers" "github.com/rclone/rclone/lib/readers"
"golang.org/x/sync/errgroup"
"golang.org/x/text/unicode/norm" "golang.org/x/text/unicode/norm"
) )
// Constants // Constants
const devUnset = 0xdeadbeefcafebabe // a device id meaning it is unset const devUnset = 0xdeadbeefcafebabe // a device id meaning it is unset
const linkSuffix = ".rclonelink" // The suffix added to a translated symbolic link const linkSuffix = ".rclonelink" // The suffix added to a translated symbolic link
const useReadDir = (runtime.GOOS == "windows" || runtime.GOOS == "plan9") // these OSes read FileInfos directly
// Register with Fs // Register with Fs
func init() { func init() {
@ -200,9 +200,9 @@ cause disk fragmentation and can be slow to work with.`,
Help: `Disable setting modtime. Help: `Disable setting modtime.
Normally rclone updates modification time of files after they are done Normally rclone updates modification time of files after they are done
uploading. This can cause permissions issues on Linux platforms when uploading. This can cause permissions issues on Linux platforms when
the user rclone is running as does not own the file uploaded, such as the user rclone is running as does not own the file uploaded, such as
when copying to a CIFS mount owned by another user. If this option is when copying to a CIFS mount owned by another user. If this option is
enabled, rclone will no longer update the modtime after copying a file.`, enabled, rclone will no longer update the modtime after copying a file.`,
Default: false, Default: false,
Advanced: true, Advanced: true,
@ -247,16 +247,16 @@ type Fs struct {
xattrSupported atomic.Int32 // whether xattrs are supported xattrSupported atomic.Int32 // whether xattrs are supported
// do os.Lstat or os.Stat // do os.Lstat or os.Stat
lstat func(name string) (os.FileInfo, error) lstat func(name string) (os.FileInfo, error)
objectMetaMu sync.RWMutex // global lock for Object metadata
} }
// Object represents a local filesystem object // Object represents a local filesystem object
type Object struct { type Object struct {
fs *Fs // The Fs this object is part of fs *Fs // The Fs this object is part of
remote string // The remote path (encoded path) remote string // The remote path (encoded path)
path string // The local path (OS path) path string // The local path (OS path)
// When using these items the fs.objectMetaMu must be held metaMu sync.RWMutex // lock for metadata
// When using these items the metaMu must be held
size int64 // file metadata - always present size int64 // file metadata - always present
mode os.FileMode mode os.FileMode
modTime time.Time modTime time.Time
@ -485,102 +485,131 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
}() }()
for { for {
var fis []os.FileInfo var des []os.DirEntry
if useReadDir {
// Windows and Plan9 read the directory entries with the stat information in which des, err = fd.ReadDir(1024)
// shouldn't fail because of unreadable entries. if err == io.EOF && len(des) == 0 {
fis, err = fd.Readdir(1024) break
if err == io.EOF && len(fis) == 0 {
break
}
} else {
// For other OSes we read the names only (which shouldn't fail) then stat the
// individual ourselves so we can log errors but not fail the directory read.
var names []string
names, err = fd.Readdirnames(1024)
if err == io.EOF && len(names) == 0 {
break
}
if err == nil {
for _, name := range names {
namepath := filepath.Join(fsDirPath, name)
fi, fierr := os.Lstat(namepath)
if os.IsNotExist(fierr) {
// skip entry removed by a concurrent goroutine
continue
}
if fierr != nil {
// Don't report errors on any file names that are excluded
if useFilter {
newRemote := f.cleanRemote(dir, name)
if !filter.IncludeRemote(newRemote) {
continue
}
}
fierr = fmt.Errorf("failed to get info about directory entry %q: %w", namepath, fierr)
fs.Errorf(dir, "%v", fierr)
_ = accounting.Stats(ctx).Error(fserrors.NoRetryError(fierr)) // fail the sync
continue
}
fis = append(fis, fi)
}
}
} }
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to read directory entry: %w", err) return nil, fmt.Errorf("failed to read directory entry: %w", err)
} }
for _, fi := range fis { loopEntries := make(fs.DirEntries, len(des))
name := fi.Name()
mode := fi.Mode() g, gCtx := errgroup.WithContext(ctx)
newRemote := f.cleanRemote(dir, name) g.SetLimit(fs.GetConfig(ctx).Checkers)
// Follow symlinks if required for i, de := range des {
if f.opt.FollowSymlinks && (mode&os.ModeSymlink) != 0 { i, de := i, de // https://golang.org/doc/faq#closures_and_goroutines
localPath := filepath.Join(fsDirPath, name) g.Go(func() error {
fi, err = os.Stat(localPath) // No point in continuing if context has been cancelled
// Quietly skip errors on excluded files and directories if gCtx.Err() != nil {
if err != nil && useFilter && !filter.IncludeRemote(newRemote) { return nil
continue
} }
if os.IsNotExist(err) || isCircularSymlinkError(err) {
// Skip bad symlinks and circular symlinks var (
err = fserrors.NoRetryError(fmt.Errorf("symlink: %w", err)) err error
fs.Errorf(newRemote, "Listing error: %v", err) fi os.FileInfo
err = accounting.Stats(ctx).Error(err) )
continue name := de.Name()
mode := de.Type()
namepath := filepath.Join(fsDirPath, name)
newRemote := f.cleanRemote(dir, name)
if de.IsDir() {
// Ignore directories which are symlinks. These are junction points under windows which
// are kind of a souped up symlink. Unix doesn't have directories which are symlinks.
if (mode & os.ModeSymlink) != 0 {
return nil
}
} }
fi, err = de.Info()
if err != nil { if err != nil {
return nil, err if os.IsNotExist(err) {
// skip entry removed by a concurrent goroutine
return nil
}
if useFilter && !filter.IncludeRemote(newRemote) {
return nil
}
err = fmt.Errorf("failed to get info about directory entry %q: %w", namepath, err)
fs.Errorf(dir, "%v", err)
_ = accounting.Stats(gCtx).Error(fserrors.NoRetryError(err)) // fail the sync
return nil
} }
name = fi.Name()
mode = fi.Mode() mode = fi.Mode()
namepath = filepath.Join(fsDirPath, name)
newRemote = f.cleanRemote(dir, name)
// Follow symlinks if required
if f.opt.FollowSymlinks && (mode&os.ModeSymlink) != 0 {
fi, err = os.Stat(namepath)
if err != nil {
// Quietly skip errors on excluded files and directories
if useFilter && !filter.IncludeRemote(newRemote) {
return nil
}
if os.IsNotExist(err) || isCircularSymlinkError(err) {
// Skip bad symlinks and circular symlinks
err = fserrors.NoRetryError(fmt.Errorf("symlink: %w", err))
fs.Errorf(newRemote, "Listing error: %v", err)
_ = accounting.Stats(gCtx).Error(err)
return nil
}
return err
}
mode = fi.Mode()
}
// No point in continuing if context has been cancelled
if gCtx.Err() != nil {
return nil
}
if fi.IsDir() {
if f.dev == readDevice(fi, f.opt.OneFileSystem) {
d := fs.NewDir(newRemote, fi.ModTime())
loopEntries[i] = d
return nil
}
} else {
// Check whether this link should be translated
if f.opt.TranslateSymlinks && mode&os.ModeSymlink != 0 {
newRemote += linkSuffix
}
// Don't include non directory if not included
// we leave directory filtering to the layer above
if useFilter && !filter.IncludeRemote(newRemote) {
return nil
}
fso, err := f.newObjectWithInfo(newRemote, fi)
if err != nil {
return err
}
if fso.Storable() {
loopEntries[i] = fso
return nil
}
}
return nil
})
}
err = g.Wait()
if err != nil {
return nil, err
}
for _, entry := range loopEntries {
if entry == nil {
continue
} }
if fi.IsDir() { entries = append(entries, entry)
// Ignore directories which are symlinks. These are junction points under windows which
// are kind of a souped up symlink. Unix doesn't have directories which are symlinks.
if (mode&os.ModeSymlink) == 0 && f.dev == readDevice(fi, f.opt.OneFileSystem) {
d := fs.NewDir(newRemote, fi.ModTime())
entries = append(entries, d)
}
} else {
// Check whether this link should be translated
if f.opt.TranslateSymlinks && fi.Mode()&os.ModeSymlink != 0 {
newRemote += linkSuffix
}
// Don't include non directory if not included
// we leave directory filtering to the layer above
if useFilter && !filter.IncludeRemote(newRemote) {
continue
}
fso, err := f.newObjectWithInfo(newRemote, fi)
if err != nil {
return nil, err
}
if fso.Storable() {
entries = append(entries, fso)
}
}
} }
} }
return entries, nil return entries, nil
} }
@ -748,9 +777,9 @@ func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object,
// Temporary Object under construction // Temporary Object under construction
dstObj := f.newObject(remote) dstObj := f.newObject(remote)
dstObj.fs.objectMetaMu.RLock() dstObj.metaMu.RLock()
dstObjMode := dstObj.mode dstObjMode := dstObj.mode
dstObj.fs.objectMetaMu.RUnlock() dstObj.metaMu.RUnlock()
// Check it is a file if it exists // Check it is a file if it exists
err := dstObj.lstat() err := dstObj.lstat()
@ -912,10 +941,10 @@ func (o *Object) Remote() string {
// Hash returns the requested hash of a file as a lowercase hex string // Hash returns the requested hash of a file as a lowercase hex string
func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) { func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) {
// Check that the underlying file hasn't changed // Check that the underlying file hasn't changed
o.fs.objectMetaMu.RLock() o.metaMu.RLock()
oldtime := o.modTime oldtime := o.modTime
oldsize := o.size oldsize := o.size
o.fs.objectMetaMu.RUnlock() o.metaMu.RUnlock()
err := o.lstat() err := o.lstat()
var changed bool var changed bool
if err != nil { if err != nil {
@ -927,14 +956,14 @@ func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) {
return "", fmt.Errorf("hash: failed to stat: %w", err) return "", fmt.Errorf("hash: failed to stat: %w", err)
} }
} else { } else {
o.fs.objectMetaMu.RLock() o.metaMu.RLock()
changed = !o.modTime.Equal(oldtime) || oldsize != o.size changed = !o.modTime.Equal(oldtime) || oldsize != o.size
o.fs.objectMetaMu.RUnlock() o.metaMu.RUnlock()
} }
o.fs.objectMetaMu.RLock() o.metaMu.RLock()
hashValue, hashFound := o.hashes[r] hashValue, hashFound := o.hashes[r]
o.fs.objectMetaMu.RUnlock() o.metaMu.RUnlock()
if changed || !hashFound { if changed || !hashFound {
var in io.ReadCloser var in io.ReadCloser
@ -965,28 +994,28 @@ func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) {
return "", fmt.Errorf("hash: failed to close: %w", closeErr) return "", fmt.Errorf("hash: failed to close: %w", closeErr)
} }
hashValue = hashes[r] hashValue = hashes[r]
o.fs.objectMetaMu.Lock() o.metaMu.Lock()
if o.hashes == nil { if o.hashes == nil {
o.hashes = hashes o.hashes = hashes
} else { } else {
o.hashes[r] = hashValue o.hashes[r] = hashValue
} }
o.fs.objectMetaMu.Unlock() o.metaMu.Unlock()
} }
return hashValue, nil return hashValue, nil
} }
// Size returns the size of an object in bytes // Size returns the size of an object in bytes
func (o *Object) Size() int64 { func (o *Object) Size() int64 {
o.fs.objectMetaMu.RLock() o.metaMu.RLock()
defer o.fs.objectMetaMu.RUnlock() defer o.metaMu.RUnlock()
return o.size return o.size
} }
// ModTime returns the modification time of the object // ModTime returns the modification time of the object
func (o *Object) ModTime(ctx context.Context) time.Time { func (o *Object) ModTime(ctx context.Context) time.Time {
o.fs.objectMetaMu.RLock() o.metaMu.RLock()
defer o.fs.objectMetaMu.RUnlock() defer o.metaMu.RUnlock()
return o.modTime return o.modTime
} }
@ -1015,9 +1044,9 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error {
// Storable returns a boolean showing if this object is storable // Storable returns a boolean showing if this object is storable
func (o *Object) Storable() bool { func (o *Object) Storable() bool {
o.fs.objectMetaMu.RLock() o.metaMu.RLock()
mode := o.mode mode := o.mode
o.fs.objectMetaMu.RUnlock() o.metaMu.RUnlock()
if mode&os.ModeSymlink != 0 && !o.fs.opt.TranslateSymlinks { if mode&os.ModeSymlink != 0 && !o.fs.opt.TranslateSymlinks {
if !o.fs.opt.SkipSymlinks { if !o.fs.opt.SkipSymlinks {
fs.Logf(o, "Can't follow symlink without -L/--copy-links") fs.Logf(o, "Can't follow symlink without -L/--copy-links")
@ -1050,10 +1079,10 @@ func (file *localOpenFile) Read(p []byte) (n int, err error) {
if err != nil { if err != nil {
return 0, fmt.Errorf("can't read status of source file while transferring: %w", err) return 0, fmt.Errorf("can't read status of source file while transferring: %w", err)
} }
file.o.fs.objectMetaMu.RLock() file.o.metaMu.RLock()
oldtime := file.o.modTime oldtime := file.o.modTime
oldsize := file.o.size oldsize := file.o.size
file.o.fs.objectMetaMu.RUnlock() file.o.metaMu.RUnlock()
if oldsize != fi.Size() { if oldsize != fi.Size() {
return 0, fserrors.NoLowLevelRetryError(fmt.Errorf("can't copy - source file is being updated (size changed from %d to %d)", oldsize, fi.Size())) return 0, fserrors.NoLowLevelRetryError(fmt.Errorf("can't copy - source file is being updated (size changed from %d to %d)", oldsize, fi.Size()))
} }
@ -1075,9 +1104,9 @@ func (file *localOpenFile) Close() (err error) {
err = file.in.Close() err = file.in.Close()
if err == nil { if err == nil {
if file.hash.Size() == file.o.Size() { if file.hash.Size() == file.o.Size() {
file.o.fs.objectMetaMu.Lock() file.o.metaMu.Lock()
file.o.hashes = file.hash.Sums() file.o.hashes = file.hash.Sums()
file.o.fs.objectMetaMu.Unlock() file.o.metaMu.Unlock()
} }
} }
return err return err
@ -1269,9 +1298,9 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
// All successful so update the hashes // All successful so update the hashes
if hasher != nil { if hasher != nil {
o.fs.objectMetaMu.Lock() o.metaMu.Lock()
o.hashes = hasher.Sums() o.hashes = hasher.Sums()
o.fs.objectMetaMu.Unlock() o.metaMu.Unlock()
} }
// Set the mtime // Set the mtime
@ -1345,11 +1374,11 @@ func (o *Object) setMetadata(info os.FileInfo) {
if o.fs.opt.NoCheckUpdated && !o.modTime.IsZero() { if o.fs.opt.NoCheckUpdated && !o.modTime.IsZero() {
return return
} }
o.fs.objectMetaMu.Lock() o.metaMu.Lock()
o.size = info.Size() o.size = info.Size()
o.modTime = info.ModTime() o.modTime = info.ModTime()
o.mode = info.Mode() o.mode = info.Mode()
o.fs.objectMetaMu.Unlock() o.metaMu.Unlock()
// Read the size of the link. // Read the size of the link.
// //
// The value in info.Size() is not always correct // The value in info.Size() is not always correct
@ -1368,9 +1397,9 @@ func (o *Object) setMetadata(info os.FileInfo) {
// clearHashCache wipes any cached hashes for the object // clearHashCache wipes any cached hashes for the object
func (o *Object) clearHashCache() { func (o *Object) clearHashCache() {
o.fs.objectMetaMu.Lock() o.metaMu.Lock()
o.hashes = nil o.hashes = nil
o.fs.objectMetaMu.Unlock() o.metaMu.Unlock()
} }
// Stat an Object into info // Stat an Object into info