Compare commits

...
Sign in to create a new pull request.

3 commits

Author SHA1 Message Date
Anagh Kumar Baranwal
bb45b6bb51
Move the metaMu lock from the fs to the object
Signed-off-by: Anagh Kumar Baranwal <6824881+darthShadow@users.noreply.github.com>
2023-08-25 17:35:27 +05:30
Anagh Kumar Baranwal
0b35ff3893
delay stat calls until required for creating the object
Signed-off-by: Anagh Kumar Baranwal <6824881+darthShadow@users.noreply.github.com>
2023-08-25 17:35:27 +05:30
Anagh Kumar Baranwal
050c7159de
feat: local: list objects in parallel controlled by the --checkers option -- fixes #6632
Signed-off-by: Anagh Kumar Baranwal <6824881+darthShadow@users.noreply.github.com>
2023-08-25 17:35:27 +05:30

View file

@ -28,13 +28,13 @@ import (
"github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/encoder"
"github.com/rclone/rclone/lib/file" "github.com/rclone/rclone/lib/file"
"github.com/rclone/rclone/lib/readers" "github.com/rclone/rclone/lib/readers"
"golang.org/x/sync/errgroup"
"golang.org/x/text/unicode/norm" "golang.org/x/text/unicode/norm"
) )
// Constants // Constants
const devUnset = 0xdeadbeefcafebabe // a device id meaning it is unset const devUnset = 0xdeadbeefcafebabe // a device id meaning it is unset
const linkSuffix = ".rclonelink" // The suffix added to a translated symbolic link const linkSuffix = ".rclonelink" // The suffix added to a translated symbolic link
const useReadDir = (runtime.GOOS == "windows" || runtime.GOOS == "plan9") // these OSes read FileInfos directly
// Register with Fs // Register with Fs
func init() { func init() {
@ -247,16 +247,16 @@ type Fs struct {
xattrSupported atomic.Int32 // whether xattrs are supported xattrSupported atomic.Int32 // whether xattrs are supported
// do os.Lstat or os.Stat // do os.Lstat or os.Stat
lstat func(name string) (os.FileInfo, error) lstat func(name string) (os.FileInfo, error)
objectMetaMu sync.RWMutex // global lock for Object metadata
} }
// Object represents a local filesystem object // Object represents a local filesystem object
type Object struct { type Object struct {
fs *Fs // The Fs this object is part of fs *Fs // The Fs this object is part of
remote string // The remote path (encoded path) remote string // The remote path (encoded path)
path string // The local path (OS path) path string // The local path (OS path)
// When using these items the fs.objectMetaMu must be held metaMu sync.RWMutex // lock for metadata
// When using these items the metaMu must be held
size int64 // file metadata - always present size int64 // file metadata - always present
mode os.FileMode mode os.FileMode
modTime time.Time modTime time.Time
@ -485,102 +485,131 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
}() }()
for { for {
var fis []os.FileInfo var des []os.DirEntry
if useReadDir {
// Windows and Plan9 read the directory entries with the stat information in which des, err = fd.ReadDir(1024)
// shouldn't fail because of unreadable entries. if err == io.EOF && len(des) == 0 {
fis, err = fd.Readdir(1024) break
if err == io.EOF && len(fis) == 0 {
break
}
} else {
// For other OSes we read the names only (which shouldn't fail) then stat the
// individual ourselves so we can log errors but not fail the directory read.
var names []string
names, err = fd.Readdirnames(1024)
if err == io.EOF && len(names) == 0 {
break
}
if err == nil {
for _, name := range names {
namepath := filepath.Join(fsDirPath, name)
fi, fierr := os.Lstat(namepath)
if os.IsNotExist(fierr) {
// skip entry removed by a concurrent goroutine
continue
}
if fierr != nil {
// Don't report errors on any file names that are excluded
if useFilter {
newRemote := f.cleanRemote(dir, name)
if !filter.IncludeRemote(newRemote) {
continue
}
}
fierr = fmt.Errorf("failed to get info about directory entry %q: %w", namepath, fierr)
fs.Errorf(dir, "%v", fierr)
_ = accounting.Stats(ctx).Error(fserrors.NoRetryError(fierr)) // fail the sync
continue
}
fis = append(fis, fi)
}
}
} }
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to read directory entry: %w", err) return nil, fmt.Errorf("failed to read directory entry: %w", err)
} }
for _, fi := range fis { loopEntries := make(fs.DirEntries, len(des))
name := fi.Name()
mode := fi.Mode() g, gCtx := errgroup.WithContext(ctx)
newRemote := f.cleanRemote(dir, name) g.SetLimit(fs.GetConfig(ctx).Checkers)
// Follow symlinks if required for i, de := range des {
if f.opt.FollowSymlinks && (mode&os.ModeSymlink) != 0 { i, de := i, de // https://golang.org/doc/faq#closures_and_goroutines
localPath := filepath.Join(fsDirPath, name) g.Go(func() error {
fi, err = os.Stat(localPath) // No point in continuing if context has been cancelled
// Quietly skip errors on excluded files and directories if gCtx.Err() != nil {
if err != nil && useFilter && !filter.IncludeRemote(newRemote) { return nil
continue
} }
if os.IsNotExist(err) || isCircularSymlinkError(err) {
// Skip bad symlinks and circular symlinks var (
err = fserrors.NoRetryError(fmt.Errorf("symlink: %w", err)) err error
fs.Errorf(newRemote, "Listing error: %v", err) fi os.FileInfo
err = accounting.Stats(ctx).Error(err) )
continue name := de.Name()
mode := de.Type()
namepath := filepath.Join(fsDirPath, name)
newRemote := f.cleanRemote(dir, name)
if de.IsDir() {
// Ignore directories which are symlinks. These are junction points under windows which
// are kind of a souped up symlink. Unix doesn't have directories which are symlinks.
if (mode & os.ModeSymlink) != 0 {
return nil
}
} }
fi, err = de.Info()
if err != nil { if err != nil {
return nil, err if os.IsNotExist(err) {
// skip entry removed by a concurrent goroutine
return nil
}
if useFilter && !filter.IncludeRemote(newRemote) {
return nil
}
err = fmt.Errorf("failed to get info about directory entry %q: %w", namepath, err)
fs.Errorf(dir, "%v", err)
_ = accounting.Stats(gCtx).Error(fserrors.NoRetryError(err)) // fail the sync
return nil
} }
name = fi.Name()
mode = fi.Mode() mode = fi.Mode()
namepath = filepath.Join(fsDirPath, name)
newRemote = f.cleanRemote(dir, name)
// Follow symlinks if required
if f.opt.FollowSymlinks && (mode&os.ModeSymlink) != 0 {
fi, err = os.Stat(namepath)
if err != nil {
// Quietly skip errors on excluded files and directories
if useFilter && !filter.IncludeRemote(newRemote) {
return nil
}
if os.IsNotExist(err) || isCircularSymlinkError(err) {
// Skip bad symlinks and circular symlinks
err = fserrors.NoRetryError(fmt.Errorf("symlink: %w", err))
fs.Errorf(newRemote, "Listing error: %v", err)
_ = accounting.Stats(gCtx).Error(err)
return nil
}
return err
}
mode = fi.Mode()
}
// No point in continuing if context has been cancelled
if gCtx.Err() != nil {
return nil
}
if fi.IsDir() {
if f.dev == readDevice(fi, f.opt.OneFileSystem) {
d := fs.NewDir(newRemote, fi.ModTime())
loopEntries[i] = d
return nil
}
} else {
// Check whether this link should be translated
if f.opt.TranslateSymlinks && mode&os.ModeSymlink != 0 {
newRemote += linkSuffix
}
// Don't include non directory if not included
// we leave directory filtering to the layer above
if useFilter && !filter.IncludeRemote(newRemote) {
return nil
}
fso, err := f.newObjectWithInfo(newRemote, fi)
if err != nil {
return err
}
if fso.Storable() {
loopEntries[i] = fso
return nil
}
}
return nil
})
}
err = g.Wait()
if err != nil {
return nil, err
}
for _, entry := range loopEntries {
if entry == nil {
continue
} }
if fi.IsDir() { entries = append(entries, entry)
// Ignore directories which are symlinks. These are junction points under windows which
// are kind of a souped up symlink. Unix doesn't have directories which are symlinks.
if (mode&os.ModeSymlink) == 0 && f.dev == readDevice(fi, f.opt.OneFileSystem) {
d := fs.NewDir(newRemote, fi.ModTime())
entries = append(entries, d)
}
} else {
// Check whether this link should be translated
if f.opt.TranslateSymlinks && fi.Mode()&os.ModeSymlink != 0 {
newRemote += linkSuffix
}
// Don't include non directory if not included
// we leave directory filtering to the layer above
if useFilter && !filter.IncludeRemote(newRemote) {
continue
}
fso, err := f.newObjectWithInfo(newRemote, fi)
if err != nil {
return nil, err
}
if fso.Storable() {
entries = append(entries, fso)
}
}
} }
} }
return entries, nil return entries, nil
} }
@ -748,9 +777,9 @@ func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object,
// Temporary Object under construction // Temporary Object under construction
dstObj := f.newObject(remote) dstObj := f.newObject(remote)
dstObj.fs.objectMetaMu.RLock() dstObj.metaMu.RLock()
dstObjMode := dstObj.mode dstObjMode := dstObj.mode
dstObj.fs.objectMetaMu.RUnlock() dstObj.metaMu.RUnlock()
// Check it is a file if it exists // Check it is a file if it exists
err := dstObj.lstat() err := dstObj.lstat()
@ -912,10 +941,10 @@ func (o *Object) Remote() string {
// Hash returns the requested hash of a file as a lowercase hex string // Hash returns the requested hash of a file as a lowercase hex string
func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) { func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) {
// Check that the underlying file hasn't changed // Check that the underlying file hasn't changed
o.fs.objectMetaMu.RLock() o.metaMu.RLock()
oldtime := o.modTime oldtime := o.modTime
oldsize := o.size oldsize := o.size
o.fs.objectMetaMu.RUnlock() o.metaMu.RUnlock()
err := o.lstat() err := o.lstat()
var changed bool var changed bool
if err != nil { if err != nil {
@ -927,14 +956,14 @@ func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) {
return "", fmt.Errorf("hash: failed to stat: %w", err) return "", fmt.Errorf("hash: failed to stat: %w", err)
} }
} else { } else {
o.fs.objectMetaMu.RLock() o.metaMu.RLock()
changed = !o.modTime.Equal(oldtime) || oldsize != o.size changed = !o.modTime.Equal(oldtime) || oldsize != o.size
o.fs.objectMetaMu.RUnlock() o.metaMu.RUnlock()
} }
o.fs.objectMetaMu.RLock() o.metaMu.RLock()
hashValue, hashFound := o.hashes[r] hashValue, hashFound := o.hashes[r]
o.fs.objectMetaMu.RUnlock() o.metaMu.RUnlock()
if changed || !hashFound { if changed || !hashFound {
var in io.ReadCloser var in io.ReadCloser
@ -965,28 +994,28 @@ func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) {
return "", fmt.Errorf("hash: failed to close: %w", closeErr) return "", fmt.Errorf("hash: failed to close: %w", closeErr)
} }
hashValue = hashes[r] hashValue = hashes[r]
o.fs.objectMetaMu.Lock() o.metaMu.Lock()
if o.hashes == nil { if o.hashes == nil {
o.hashes = hashes o.hashes = hashes
} else { } else {
o.hashes[r] = hashValue o.hashes[r] = hashValue
} }
o.fs.objectMetaMu.Unlock() o.metaMu.Unlock()
} }
return hashValue, nil return hashValue, nil
} }
// Size returns the size of an object in bytes // Size returns the size of an object in bytes
func (o *Object) Size() int64 { func (o *Object) Size() int64 {
o.fs.objectMetaMu.RLock() o.metaMu.RLock()
defer o.fs.objectMetaMu.RUnlock() defer o.metaMu.RUnlock()
return o.size return o.size
} }
// ModTime returns the modification time of the object // ModTime returns the modification time of the object
func (o *Object) ModTime(ctx context.Context) time.Time { func (o *Object) ModTime(ctx context.Context) time.Time {
o.fs.objectMetaMu.RLock() o.metaMu.RLock()
defer o.fs.objectMetaMu.RUnlock() defer o.metaMu.RUnlock()
return o.modTime return o.modTime
} }
@ -1015,9 +1044,9 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error {
// Storable returns a boolean showing if this object is storable // Storable returns a boolean showing if this object is storable
func (o *Object) Storable() bool { func (o *Object) Storable() bool {
o.fs.objectMetaMu.RLock() o.metaMu.RLock()
mode := o.mode mode := o.mode
o.fs.objectMetaMu.RUnlock() o.metaMu.RUnlock()
if mode&os.ModeSymlink != 0 && !o.fs.opt.TranslateSymlinks { if mode&os.ModeSymlink != 0 && !o.fs.opt.TranslateSymlinks {
if !o.fs.opt.SkipSymlinks { if !o.fs.opt.SkipSymlinks {
fs.Logf(o, "Can't follow symlink without -L/--copy-links") fs.Logf(o, "Can't follow symlink without -L/--copy-links")
@ -1050,10 +1079,10 @@ func (file *localOpenFile) Read(p []byte) (n int, err error) {
if err != nil { if err != nil {
return 0, fmt.Errorf("can't read status of source file while transferring: %w", err) return 0, fmt.Errorf("can't read status of source file while transferring: %w", err)
} }
file.o.fs.objectMetaMu.RLock() file.o.metaMu.RLock()
oldtime := file.o.modTime oldtime := file.o.modTime
oldsize := file.o.size oldsize := file.o.size
file.o.fs.objectMetaMu.RUnlock() file.o.metaMu.RUnlock()
if oldsize != fi.Size() { if oldsize != fi.Size() {
return 0, fserrors.NoLowLevelRetryError(fmt.Errorf("can't copy - source file is being updated (size changed from %d to %d)", oldsize, fi.Size())) return 0, fserrors.NoLowLevelRetryError(fmt.Errorf("can't copy - source file is being updated (size changed from %d to %d)", oldsize, fi.Size()))
} }
@ -1075,9 +1104,9 @@ func (file *localOpenFile) Close() (err error) {
err = file.in.Close() err = file.in.Close()
if err == nil { if err == nil {
if file.hash.Size() == file.o.Size() { if file.hash.Size() == file.o.Size() {
file.o.fs.objectMetaMu.Lock() file.o.metaMu.Lock()
file.o.hashes = file.hash.Sums() file.o.hashes = file.hash.Sums()
file.o.fs.objectMetaMu.Unlock() file.o.metaMu.Unlock()
} }
} }
return err return err
@ -1269,9 +1298,9 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
// All successful so update the hashes // All successful so update the hashes
if hasher != nil { if hasher != nil {
o.fs.objectMetaMu.Lock() o.metaMu.Lock()
o.hashes = hasher.Sums() o.hashes = hasher.Sums()
o.fs.objectMetaMu.Unlock() o.metaMu.Unlock()
} }
// Set the mtime // Set the mtime
@ -1345,11 +1374,11 @@ func (o *Object) setMetadata(info os.FileInfo) {
if o.fs.opt.NoCheckUpdated && !o.modTime.IsZero() { if o.fs.opt.NoCheckUpdated && !o.modTime.IsZero() {
return return
} }
o.fs.objectMetaMu.Lock() o.metaMu.Lock()
o.size = info.Size() o.size = info.Size()
o.modTime = info.ModTime() o.modTime = info.ModTime()
o.mode = info.Mode() o.mode = info.Mode()
o.fs.objectMetaMu.Unlock() o.metaMu.Unlock()
// Read the size of the link. // Read the size of the link.
// //
// The value in info.Size() is not always correct // The value in info.Size() is not always correct
@ -1368,9 +1397,9 @@ func (o *Object) setMetadata(info os.FileInfo) {
// clearHashCache wipes any cached hashes for the object // clearHashCache wipes any cached hashes for the object
func (o *Object) clearHashCache() { func (o *Object) clearHashCache() {
o.fs.objectMetaMu.Lock() o.metaMu.Lock()
o.hashes = nil o.hashes = nil
o.fs.objectMetaMu.Unlock() o.metaMu.Unlock()
} }
// Stat an Object into info // Stat an Object into info