Compare commits
3 commits
tcl/master
...
feat/local
Author | SHA1 | Date | |
---|---|---|---|
|
bb45b6bb51 | ||
|
0b35ff3893 | ||
|
050c7159de |
1 changed files with 149 additions and 120 deletions
|
@ -28,13 +28,13 @@ import (
|
|||
"github.com/rclone/rclone/lib/encoder"
|
||||
"github.com/rclone/rclone/lib/file"
|
||||
"github.com/rclone/rclone/lib/readers"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"golang.org/x/text/unicode/norm"
|
||||
)
|
||||
|
||||
// Constants
|
||||
const devUnset = 0xdeadbeefcafebabe // a device id meaning it is unset
|
||||
const linkSuffix = ".rclonelink" // The suffix added to a translated symbolic link
|
||||
const useReadDir = (runtime.GOOS == "windows" || runtime.GOOS == "plan9") // these OSes read FileInfos directly
|
||||
const devUnset = 0xdeadbeefcafebabe // a device id meaning it is unset
|
||||
const linkSuffix = ".rclonelink" // The suffix added to a translated symbolic link
|
||||
|
||||
// Register with Fs
|
||||
func init() {
|
||||
|
@ -200,9 +200,9 @@ cause disk fragmentation and can be slow to work with.`,
|
|||
Help: `Disable setting modtime.
|
||||
|
||||
Normally rclone updates modification time of files after they are done
|
||||
uploading. This can cause permissions issues on Linux platforms when
|
||||
uploading. This can cause permissions issues on Linux platforms when
|
||||
the user rclone is running as does not own the file uploaded, such as
|
||||
when copying to a CIFS mount owned by another user. If this option is
|
||||
when copying to a CIFS mount owned by another user. If this option is
|
||||
enabled, rclone will no longer update the modtime after copying a file.`,
|
||||
Default: false,
|
||||
Advanced: true,
|
||||
|
@ -247,16 +247,16 @@ type Fs struct {
|
|||
xattrSupported atomic.Int32 // whether xattrs are supported
|
||||
|
||||
// do os.Lstat or os.Stat
|
||||
lstat func(name string) (os.FileInfo, error)
|
||||
objectMetaMu sync.RWMutex // global lock for Object metadata
|
||||
lstat func(name string) (os.FileInfo, error)
|
||||
}
|
||||
|
||||
// Object represents a local filesystem object
|
||||
type Object struct {
|
||||
fs *Fs // The Fs this object is part of
|
||||
remote string // The remote path (encoded path)
|
||||
path string // The local path (OS path)
|
||||
// When using these items the fs.objectMetaMu must be held
|
||||
fs *Fs // The Fs this object is part of
|
||||
remote string // The remote path (encoded path)
|
||||
path string // The local path (OS path)
|
||||
metaMu sync.RWMutex // lock for metadata
|
||||
// When using these items the metaMu must be held
|
||||
size int64 // file metadata - always present
|
||||
mode os.FileMode
|
||||
modTime time.Time
|
||||
|
@ -485,102 +485,131 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
|
|||
}()
|
||||
|
||||
for {
|
||||
var fis []os.FileInfo
|
||||
if useReadDir {
|
||||
// Windows and Plan9 read the directory entries with the stat information in which
|
||||
// shouldn't fail because of unreadable entries.
|
||||
fis, err = fd.Readdir(1024)
|
||||
if err == io.EOF && len(fis) == 0 {
|
||||
break
|
||||
}
|
||||
} else {
|
||||
// For other OSes we read the names only (which shouldn't fail) then stat the
|
||||
// individual ourselves so we can log errors but not fail the directory read.
|
||||
var names []string
|
||||
names, err = fd.Readdirnames(1024)
|
||||
if err == io.EOF && len(names) == 0 {
|
||||
break
|
||||
}
|
||||
if err == nil {
|
||||
for _, name := range names {
|
||||
namepath := filepath.Join(fsDirPath, name)
|
||||
fi, fierr := os.Lstat(namepath)
|
||||
if os.IsNotExist(fierr) {
|
||||
// skip entry removed by a concurrent goroutine
|
||||
continue
|
||||
}
|
||||
if fierr != nil {
|
||||
// Don't report errors on any file names that are excluded
|
||||
if useFilter {
|
||||
newRemote := f.cleanRemote(dir, name)
|
||||
if !filter.IncludeRemote(newRemote) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
fierr = fmt.Errorf("failed to get info about directory entry %q: %w", namepath, fierr)
|
||||
fs.Errorf(dir, "%v", fierr)
|
||||
_ = accounting.Stats(ctx).Error(fserrors.NoRetryError(fierr)) // fail the sync
|
||||
continue
|
||||
}
|
||||
fis = append(fis, fi)
|
||||
}
|
||||
}
|
||||
var des []os.DirEntry
|
||||
|
||||
des, err = fd.ReadDir(1024)
|
||||
if err == io.EOF && len(des) == 0 {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read directory entry: %w", err)
|
||||
}
|
||||
|
||||
for _, fi := range fis {
|
||||
name := fi.Name()
|
||||
mode := fi.Mode()
|
||||
newRemote := f.cleanRemote(dir, name)
|
||||
// Follow symlinks if required
|
||||
if f.opt.FollowSymlinks && (mode&os.ModeSymlink) != 0 {
|
||||
localPath := filepath.Join(fsDirPath, name)
|
||||
fi, err = os.Stat(localPath)
|
||||
// Quietly skip errors on excluded files and directories
|
||||
if err != nil && useFilter && !filter.IncludeRemote(newRemote) {
|
||||
continue
|
||||
loopEntries := make(fs.DirEntries, len(des))
|
||||
|
||||
g, gCtx := errgroup.WithContext(ctx)
|
||||
g.SetLimit(fs.GetConfig(ctx).Checkers)
|
||||
for i, de := range des {
|
||||
i, de := i, de // https://golang.org/doc/faq#closures_and_goroutines
|
||||
g.Go(func() error {
|
||||
// No point in continuing if context has been cancelled
|
||||
if gCtx.Err() != nil {
|
||||
return nil
|
||||
}
|
||||
if os.IsNotExist(err) || isCircularSymlinkError(err) {
|
||||
// Skip bad symlinks and circular symlinks
|
||||
err = fserrors.NoRetryError(fmt.Errorf("symlink: %w", err))
|
||||
fs.Errorf(newRemote, "Listing error: %v", err)
|
||||
err = accounting.Stats(ctx).Error(err)
|
||||
continue
|
||||
|
||||
var (
|
||||
err error
|
||||
fi os.FileInfo
|
||||
)
|
||||
name := de.Name()
|
||||
mode := de.Type()
|
||||
namepath := filepath.Join(fsDirPath, name)
|
||||
newRemote := f.cleanRemote(dir, name)
|
||||
|
||||
if de.IsDir() {
|
||||
// Ignore directories which are symlinks. These are junction points under windows which
|
||||
// are kind of a souped up symlink. Unix doesn't have directories which are symlinks.
|
||||
if (mode & os.ModeSymlink) != 0 {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
fi, err = de.Info()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if os.IsNotExist(err) {
|
||||
// skip entry removed by a concurrent goroutine
|
||||
return nil
|
||||
}
|
||||
if useFilter && !filter.IncludeRemote(newRemote) {
|
||||
return nil
|
||||
}
|
||||
err = fmt.Errorf("failed to get info about directory entry %q: %w", namepath, err)
|
||||
fs.Errorf(dir, "%v", err)
|
||||
_ = accounting.Stats(gCtx).Error(fserrors.NoRetryError(err)) // fail the sync
|
||||
return nil
|
||||
}
|
||||
|
||||
name = fi.Name()
|
||||
mode = fi.Mode()
|
||||
namepath = filepath.Join(fsDirPath, name)
|
||||
newRemote = f.cleanRemote(dir, name)
|
||||
|
||||
// Follow symlinks if required
|
||||
if f.opt.FollowSymlinks && (mode&os.ModeSymlink) != 0 {
|
||||
fi, err = os.Stat(namepath)
|
||||
if err != nil {
|
||||
// Quietly skip errors on excluded files and directories
|
||||
if useFilter && !filter.IncludeRemote(newRemote) {
|
||||
return nil
|
||||
}
|
||||
if os.IsNotExist(err) || isCircularSymlinkError(err) {
|
||||
// Skip bad symlinks and circular symlinks
|
||||
err = fserrors.NoRetryError(fmt.Errorf("symlink: %w", err))
|
||||
fs.Errorf(newRemote, "Listing error: %v", err)
|
||||
_ = accounting.Stats(gCtx).Error(err)
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
mode = fi.Mode()
|
||||
}
|
||||
|
||||
// No point in continuing if context has been cancelled
|
||||
if gCtx.Err() != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if fi.IsDir() {
|
||||
if f.dev == readDevice(fi, f.opt.OneFileSystem) {
|
||||
d := fs.NewDir(newRemote, fi.ModTime())
|
||||
loopEntries[i] = d
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
// Check whether this link should be translated
|
||||
if f.opt.TranslateSymlinks && mode&os.ModeSymlink != 0 {
|
||||
newRemote += linkSuffix
|
||||
}
|
||||
// Don't include non directory if not included
|
||||
// we leave directory filtering to the layer above
|
||||
if useFilter && !filter.IncludeRemote(newRemote) {
|
||||
return nil
|
||||
}
|
||||
fso, err := f.newObjectWithInfo(newRemote, fi)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if fso.Storable() {
|
||||
loopEntries[i] = fso
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
err = g.Wait()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, entry := range loopEntries {
|
||||
if entry == nil {
|
||||
continue
|
||||
}
|
||||
if fi.IsDir() {
|
||||
// Ignore directories which are symlinks. These are junction points under windows which
|
||||
// are kind of a souped up symlink. Unix doesn't have directories which are symlinks.
|
||||
if (mode&os.ModeSymlink) == 0 && f.dev == readDevice(fi, f.opt.OneFileSystem) {
|
||||
d := fs.NewDir(newRemote, fi.ModTime())
|
||||
entries = append(entries, d)
|
||||
}
|
||||
} else {
|
||||
// Check whether this link should be translated
|
||||
if f.opt.TranslateSymlinks && fi.Mode()&os.ModeSymlink != 0 {
|
||||
newRemote += linkSuffix
|
||||
}
|
||||
// Don't include non directory if not included
|
||||
// we leave directory filtering to the layer above
|
||||
if useFilter && !filter.IncludeRemote(newRemote) {
|
||||
continue
|
||||
}
|
||||
fso, err := f.newObjectWithInfo(newRemote, fi)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if fso.Storable() {
|
||||
entries = append(entries, fso)
|
||||
}
|
||||
}
|
||||
entries = append(entries, entry)
|
||||
}
|
||||
}
|
||||
|
||||
return entries, nil
|
||||
}
|
||||
|
||||
|
@ -748,9 +777,9 @@ func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object,
|
|||
|
||||
// Temporary Object under construction
|
||||
dstObj := f.newObject(remote)
|
||||
dstObj.fs.objectMetaMu.RLock()
|
||||
dstObj.metaMu.RLock()
|
||||
dstObjMode := dstObj.mode
|
||||
dstObj.fs.objectMetaMu.RUnlock()
|
||||
dstObj.metaMu.RUnlock()
|
||||
|
||||
// Check it is a file if it exists
|
||||
err := dstObj.lstat()
|
||||
|
@ -912,10 +941,10 @@ func (o *Object) Remote() string {
|
|||
// Hash returns the requested hash of a file as a lowercase hex string
|
||||
func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) {
|
||||
// Check that the underlying file hasn't changed
|
||||
o.fs.objectMetaMu.RLock()
|
||||
o.metaMu.RLock()
|
||||
oldtime := o.modTime
|
||||
oldsize := o.size
|
||||
o.fs.objectMetaMu.RUnlock()
|
||||
o.metaMu.RUnlock()
|
||||
err := o.lstat()
|
||||
var changed bool
|
||||
if err != nil {
|
||||
|
@ -927,14 +956,14 @@ func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) {
|
|||
return "", fmt.Errorf("hash: failed to stat: %w", err)
|
||||
}
|
||||
} else {
|
||||
o.fs.objectMetaMu.RLock()
|
||||
o.metaMu.RLock()
|
||||
changed = !o.modTime.Equal(oldtime) || oldsize != o.size
|
||||
o.fs.objectMetaMu.RUnlock()
|
||||
o.metaMu.RUnlock()
|
||||
}
|
||||
|
||||
o.fs.objectMetaMu.RLock()
|
||||
o.metaMu.RLock()
|
||||
hashValue, hashFound := o.hashes[r]
|
||||
o.fs.objectMetaMu.RUnlock()
|
||||
o.metaMu.RUnlock()
|
||||
|
||||
if changed || !hashFound {
|
||||
var in io.ReadCloser
|
||||
|
@ -965,28 +994,28 @@ func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) {
|
|||
return "", fmt.Errorf("hash: failed to close: %w", closeErr)
|
||||
}
|
||||
hashValue = hashes[r]
|
||||
o.fs.objectMetaMu.Lock()
|
||||
o.metaMu.Lock()
|
||||
if o.hashes == nil {
|
||||
o.hashes = hashes
|
||||
} else {
|
||||
o.hashes[r] = hashValue
|
||||
}
|
||||
o.fs.objectMetaMu.Unlock()
|
||||
o.metaMu.Unlock()
|
||||
}
|
||||
return hashValue, nil
|
||||
}
|
||||
|
||||
// Size returns the size of an object in bytes
|
||||
func (o *Object) Size() int64 {
|
||||
o.fs.objectMetaMu.RLock()
|
||||
defer o.fs.objectMetaMu.RUnlock()
|
||||
o.metaMu.RLock()
|
||||
defer o.metaMu.RUnlock()
|
||||
return o.size
|
||||
}
|
||||
|
||||
// ModTime returns the modification time of the object
|
||||
func (o *Object) ModTime(ctx context.Context) time.Time {
|
||||
o.fs.objectMetaMu.RLock()
|
||||
defer o.fs.objectMetaMu.RUnlock()
|
||||
o.metaMu.RLock()
|
||||
defer o.metaMu.RUnlock()
|
||||
return o.modTime
|
||||
}
|
||||
|
||||
|
@ -1015,9 +1044,9 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error {
|
|||
|
||||
// Storable returns a boolean showing if this object is storable
|
||||
func (o *Object) Storable() bool {
|
||||
o.fs.objectMetaMu.RLock()
|
||||
o.metaMu.RLock()
|
||||
mode := o.mode
|
||||
o.fs.objectMetaMu.RUnlock()
|
||||
o.metaMu.RUnlock()
|
||||
if mode&os.ModeSymlink != 0 && !o.fs.opt.TranslateSymlinks {
|
||||
if !o.fs.opt.SkipSymlinks {
|
||||
fs.Logf(o, "Can't follow symlink without -L/--copy-links")
|
||||
|
@ -1050,10 +1079,10 @@ func (file *localOpenFile) Read(p []byte) (n int, err error) {
|
|||
if err != nil {
|
||||
return 0, fmt.Errorf("can't read status of source file while transferring: %w", err)
|
||||
}
|
||||
file.o.fs.objectMetaMu.RLock()
|
||||
file.o.metaMu.RLock()
|
||||
oldtime := file.o.modTime
|
||||
oldsize := file.o.size
|
||||
file.o.fs.objectMetaMu.RUnlock()
|
||||
file.o.metaMu.RUnlock()
|
||||
if oldsize != fi.Size() {
|
||||
return 0, fserrors.NoLowLevelRetryError(fmt.Errorf("can't copy - source file is being updated (size changed from %d to %d)", oldsize, fi.Size()))
|
||||
}
|
||||
|
@ -1075,9 +1104,9 @@ func (file *localOpenFile) Close() (err error) {
|
|||
err = file.in.Close()
|
||||
if err == nil {
|
||||
if file.hash.Size() == file.o.Size() {
|
||||
file.o.fs.objectMetaMu.Lock()
|
||||
file.o.metaMu.Lock()
|
||||
file.o.hashes = file.hash.Sums()
|
||||
file.o.fs.objectMetaMu.Unlock()
|
||||
file.o.metaMu.Unlock()
|
||||
}
|
||||
}
|
||||
return err
|
||||
|
@ -1269,9 +1298,9 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|||
|
||||
// All successful so update the hashes
|
||||
if hasher != nil {
|
||||
o.fs.objectMetaMu.Lock()
|
||||
o.metaMu.Lock()
|
||||
o.hashes = hasher.Sums()
|
||||
o.fs.objectMetaMu.Unlock()
|
||||
o.metaMu.Unlock()
|
||||
}
|
||||
|
||||
// Set the mtime
|
||||
|
@ -1345,11 +1374,11 @@ func (o *Object) setMetadata(info os.FileInfo) {
|
|||
if o.fs.opt.NoCheckUpdated && !o.modTime.IsZero() {
|
||||
return
|
||||
}
|
||||
o.fs.objectMetaMu.Lock()
|
||||
o.metaMu.Lock()
|
||||
o.size = info.Size()
|
||||
o.modTime = info.ModTime()
|
||||
o.mode = info.Mode()
|
||||
o.fs.objectMetaMu.Unlock()
|
||||
o.metaMu.Unlock()
|
||||
// Read the size of the link.
|
||||
//
|
||||
// The value in info.Size() is not always correct
|
||||
|
@ -1368,9 +1397,9 @@ func (o *Object) setMetadata(info os.FileInfo) {
|
|||
|
||||
// clearHashCache wipes any cached hashes for the object
|
||||
func (o *Object) clearHashCache() {
|
||||
o.fs.objectMetaMu.Lock()
|
||||
o.metaMu.Lock()
|
||||
o.hashes = nil
|
||||
o.fs.objectMetaMu.Unlock()
|
||||
o.metaMu.Unlock()
|
||||
}
|
||||
|
||||
// Stat an Object into info
|
||||
|
|
Loading…
Add table
Reference in a new issue