Compare commits
3 commits
tcl/master
...
feat/local
Author | SHA1 | Date | |
---|---|---|---|
|
bb45b6bb51 | ||
|
0b35ff3893 | ||
|
050c7159de |
1 changed files with 149 additions and 120 deletions
|
@ -28,13 +28,13 @@ import (
|
||||||
"github.com/rclone/rclone/lib/encoder"
|
"github.com/rclone/rclone/lib/encoder"
|
||||||
"github.com/rclone/rclone/lib/file"
|
"github.com/rclone/rclone/lib/file"
|
||||||
"github.com/rclone/rclone/lib/readers"
|
"github.com/rclone/rclone/lib/readers"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
"golang.org/x/text/unicode/norm"
|
"golang.org/x/text/unicode/norm"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Constants
|
// Constants
|
||||||
const devUnset = 0xdeadbeefcafebabe // a device id meaning it is unset
|
const devUnset = 0xdeadbeefcafebabe // a device id meaning it is unset
|
||||||
const linkSuffix = ".rclonelink" // The suffix added to a translated symbolic link
|
const linkSuffix = ".rclonelink" // The suffix added to a translated symbolic link
|
||||||
const useReadDir = (runtime.GOOS == "windows" || runtime.GOOS == "plan9") // these OSes read FileInfos directly
|
|
||||||
|
|
||||||
// Register with Fs
|
// Register with Fs
|
||||||
func init() {
|
func init() {
|
||||||
|
@ -248,7 +248,6 @@ type Fs struct {
|
||||||
|
|
||||||
// do os.Lstat or os.Stat
|
// do os.Lstat or os.Stat
|
||||||
lstat func(name string) (os.FileInfo, error)
|
lstat func(name string) (os.FileInfo, error)
|
||||||
objectMetaMu sync.RWMutex // global lock for Object metadata
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Object represents a local filesystem object
|
// Object represents a local filesystem object
|
||||||
|
@ -256,7 +255,8 @@ type Object struct {
|
||||||
fs *Fs // The Fs this object is part of
|
fs *Fs // The Fs this object is part of
|
||||||
remote string // The remote path (encoded path)
|
remote string // The remote path (encoded path)
|
||||||
path string // The local path (OS path)
|
path string // The local path (OS path)
|
||||||
// When using these items the fs.objectMetaMu must be held
|
metaMu sync.RWMutex // lock for metadata
|
||||||
|
// When using these items the metaMu must be held
|
||||||
size int64 // file metadata - always present
|
size int64 // file metadata - always present
|
||||||
mode os.FileMode
|
mode os.FileMode
|
||||||
modTime time.Time
|
modTime time.Time
|
||||||
|
@ -485,102 +485,131 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
|
||||||
}()
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
var fis []os.FileInfo
|
var des []os.DirEntry
|
||||||
if useReadDir {
|
|
||||||
// Windows and Plan9 read the directory entries with the stat information in which
|
des, err = fd.ReadDir(1024)
|
||||||
// shouldn't fail because of unreadable entries.
|
if err == io.EOF && len(des) == 0 {
|
||||||
fis, err = fd.Readdir(1024)
|
|
||||||
if err == io.EOF && len(fis) == 0 {
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
// For other OSes we read the names only (which shouldn't fail) then stat the
|
|
||||||
// individual ourselves so we can log errors but not fail the directory read.
|
|
||||||
var names []string
|
|
||||||
names, err = fd.Readdirnames(1024)
|
|
||||||
if err == io.EOF && len(names) == 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if err == nil {
|
|
||||||
for _, name := range names {
|
|
||||||
namepath := filepath.Join(fsDirPath, name)
|
|
||||||
fi, fierr := os.Lstat(namepath)
|
|
||||||
if os.IsNotExist(fierr) {
|
|
||||||
// skip entry removed by a concurrent goroutine
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if fierr != nil {
|
|
||||||
// Don't report errors on any file names that are excluded
|
|
||||||
if useFilter {
|
|
||||||
newRemote := f.cleanRemote(dir, name)
|
|
||||||
if !filter.IncludeRemote(newRemote) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fierr = fmt.Errorf("failed to get info about directory entry %q: %w", namepath, fierr)
|
|
||||||
fs.Errorf(dir, "%v", fierr)
|
|
||||||
_ = accounting.Stats(ctx).Error(fserrors.NoRetryError(fierr)) // fail the sync
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
fis = append(fis, fi)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to read directory entry: %w", err)
|
return nil, fmt.Errorf("failed to read directory entry: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, fi := range fis {
|
loopEntries := make(fs.DirEntries, len(des))
|
||||||
name := fi.Name()
|
|
||||||
mode := fi.Mode()
|
g, gCtx := errgroup.WithContext(ctx)
|
||||||
|
g.SetLimit(fs.GetConfig(ctx).Checkers)
|
||||||
|
for i, de := range des {
|
||||||
|
i, de := i, de // https://golang.org/doc/faq#closures_and_goroutines
|
||||||
|
g.Go(func() error {
|
||||||
|
// No point in continuing if context has been cancelled
|
||||||
|
if gCtx.Err() != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
err error
|
||||||
|
fi os.FileInfo
|
||||||
|
)
|
||||||
|
name := de.Name()
|
||||||
|
mode := de.Type()
|
||||||
|
namepath := filepath.Join(fsDirPath, name)
|
||||||
newRemote := f.cleanRemote(dir, name)
|
newRemote := f.cleanRemote(dir, name)
|
||||||
|
|
||||||
|
if de.IsDir() {
|
||||||
|
// Ignore directories which are symlinks. These are junction points under windows which
|
||||||
|
// are kind of a souped up symlink. Unix doesn't have directories which are symlinks.
|
||||||
|
if (mode & os.ModeSymlink) != 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fi, err = de.Info()
|
||||||
|
if err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
// skip entry removed by a concurrent goroutine
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if useFilter && !filter.IncludeRemote(newRemote) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
err = fmt.Errorf("failed to get info about directory entry %q: %w", namepath, err)
|
||||||
|
fs.Errorf(dir, "%v", err)
|
||||||
|
_ = accounting.Stats(gCtx).Error(fserrors.NoRetryError(err)) // fail the sync
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
name = fi.Name()
|
||||||
|
mode = fi.Mode()
|
||||||
|
namepath = filepath.Join(fsDirPath, name)
|
||||||
|
newRemote = f.cleanRemote(dir, name)
|
||||||
|
|
||||||
// Follow symlinks if required
|
// Follow symlinks if required
|
||||||
if f.opt.FollowSymlinks && (mode&os.ModeSymlink) != 0 {
|
if f.opt.FollowSymlinks && (mode&os.ModeSymlink) != 0 {
|
||||||
localPath := filepath.Join(fsDirPath, name)
|
fi, err = os.Stat(namepath)
|
||||||
fi, err = os.Stat(localPath)
|
if err != nil {
|
||||||
// Quietly skip errors on excluded files and directories
|
// Quietly skip errors on excluded files and directories
|
||||||
if err != nil && useFilter && !filter.IncludeRemote(newRemote) {
|
if useFilter && !filter.IncludeRemote(newRemote) {
|
||||||
continue
|
return nil
|
||||||
}
|
}
|
||||||
if os.IsNotExist(err) || isCircularSymlinkError(err) {
|
if os.IsNotExist(err) || isCircularSymlinkError(err) {
|
||||||
// Skip bad symlinks and circular symlinks
|
// Skip bad symlinks and circular symlinks
|
||||||
err = fserrors.NoRetryError(fmt.Errorf("symlink: %w", err))
|
err = fserrors.NoRetryError(fmt.Errorf("symlink: %w", err))
|
||||||
fs.Errorf(newRemote, "Listing error: %v", err)
|
fs.Errorf(newRemote, "Listing error: %v", err)
|
||||||
err = accounting.Stats(ctx).Error(err)
|
_ = accounting.Stats(gCtx).Error(err)
|
||||||
continue
|
return nil
|
||||||
}
|
}
|
||||||
if err != nil {
|
return err
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
mode = fi.Mode()
|
mode = fi.Mode()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// No point in continuing if context has been cancelled
|
||||||
|
if gCtx.Err() != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
if fi.IsDir() {
|
if fi.IsDir() {
|
||||||
// Ignore directories which are symlinks. These are junction points under windows which
|
if f.dev == readDevice(fi, f.opt.OneFileSystem) {
|
||||||
// are kind of a souped up symlink. Unix doesn't have directories which are symlinks.
|
|
||||||
if (mode&os.ModeSymlink) == 0 && f.dev == readDevice(fi, f.opt.OneFileSystem) {
|
|
||||||
d := fs.NewDir(newRemote, fi.ModTime())
|
d := fs.NewDir(newRemote, fi.ModTime())
|
||||||
entries = append(entries, d)
|
loopEntries[i] = d
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Check whether this link should be translated
|
// Check whether this link should be translated
|
||||||
if f.opt.TranslateSymlinks && fi.Mode()&os.ModeSymlink != 0 {
|
if f.opt.TranslateSymlinks && mode&os.ModeSymlink != 0 {
|
||||||
newRemote += linkSuffix
|
newRemote += linkSuffix
|
||||||
}
|
}
|
||||||
// Don't include non directory if not included
|
// Don't include non directory if not included
|
||||||
// we leave directory filtering to the layer above
|
// we leave directory filtering to the layer above
|
||||||
if useFilter && !filter.IncludeRemote(newRemote) {
|
if useFilter && !filter.IncludeRemote(newRemote) {
|
||||||
continue
|
return nil
|
||||||
}
|
}
|
||||||
fso, err := f.newObjectWithInfo(newRemote, fi)
|
fso, err := f.newObjectWithInfo(newRemote, fi)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
if fso.Storable() {
|
if fso.Storable() {
|
||||||
entries = append(entries, fso)
|
loopEntries[i] = fso
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
err = g.Wait()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, entry := range loopEntries {
|
||||||
|
if entry == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
entries = append(entries, entry)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return entries, nil
|
return entries, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -748,9 +777,9 @@ func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object,
|
||||||
|
|
||||||
// Temporary Object under construction
|
// Temporary Object under construction
|
||||||
dstObj := f.newObject(remote)
|
dstObj := f.newObject(remote)
|
||||||
dstObj.fs.objectMetaMu.RLock()
|
dstObj.metaMu.RLock()
|
||||||
dstObjMode := dstObj.mode
|
dstObjMode := dstObj.mode
|
||||||
dstObj.fs.objectMetaMu.RUnlock()
|
dstObj.metaMu.RUnlock()
|
||||||
|
|
||||||
// Check it is a file if it exists
|
// Check it is a file if it exists
|
||||||
err := dstObj.lstat()
|
err := dstObj.lstat()
|
||||||
|
@ -912,10 +941,10 @@ func (o *Object) Remote() string {
|
||||||
// Hash returns the requested hash of a file as a lowercase hex string
|
// Hash returns the requested hash of a file as a lowercase hex string
|
||||||
func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) {
|
func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) {
|
||||||
// Check that the underlying file hasn't changed
|
// Check that the underlying file hasn't changed
|
||||||
o.fs.objectMetaMu.RLock()
|
o.metaMu.RLock()
|
||||||
oldtime := o.modTime
|
oldtime := o.modTime
|
||||||
oldsize := o.size
|
oldsize := o.size
|
||||||
o.fs.objectMetaMu.RUnlock()
|
o.metaMu.RUnlock()
|
||||||
err := o.lstat()
|
err := o.lstat()
|
||||||
var changed bool
|
var changed bool
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -927,14 +956,14 @@ func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) {
|
||||||
return "", fmt.Errorf("hash: failed to stat: %w", err)
|
return "", fmt.Errorf("hash: failed to stat: %w", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
o.fs.objectMetaMu.RLock()
|
o.metaMu.RLock()
|
||||||
changed = !o.modTime.Equal(oldtime) || oldsize != o.size
|
changed = !o.modTime.Equal(oldtime) || oldsize != o.size
|
||||||
o.fs.objectMetaMu.RUnlock()
|
o.metaMu.RUnlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
o.fs.objectMetaMu.RLock()
|
o.metaMu.RLock()
|
||||||
hashValue, hashFound := o.hashes[r]
|
hashValue, hashFound := o.hashes[r]
|
||||||
o.fs.objectMetaMu.RUnlock()
|
o.metaMu.RUnlock()
|
||||||
|
|
||||||
if changed || !hashFound {
|
if changed || !hashFound {
|
||||||
var in io.ReadCloser
|
var in io.ReadCloser
|
||||||
|
@ -965,28 +994,28 @@ func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) {
|
||||||
return "", fmt.Errorf("hash: failed to close: %w", closeErr)
|
return "", fmt.Errorf("hash: failed to close: %w", closeErr)
|
||||||
}
|
}
|
||||||
hashValue = hashes[r]
|
hashValue = hashes[r]
|
||||||
o.fs.objectMetaMu.Lock()
|
o.metaMu.Lock()
|
||||||
if o.hashes == nil {
|
if o.hashes == nil {
|
||||||
o.hashes = hashes
|
o.hashes = hashes
|
||||||
} else {
|
} else {
|
||||||
o.hashes[r] = hashValue
|
o.hashes[r] = hashValue
|
||||||
}
|
}
|
||||||
o.fs.objectMetaMu.Unlock()
|
o.metaMu.Unlock()
|
||||||
}
|
}
|
||||||
return hashValue, nil
|
return hashValue, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Size returns the size of an object in bytes
|
// Size returns the size of an object in bytes
|
||||||
func (o *Object) Size() int64 {
|
func (o *Object) Size() int64 {
|
||||||
o.fs.objectMetaMu.RLock()
|
o.metaMu.RLock()
|
||||||
defer o.fs.objectMetaMu.RUnlock()
|
defer o.metaMu.RUnlock()
|
||||||
return o.size
|
return o.size
|
||||||
}
|
}
|
||||||
|
|
||||||
// ModTime returns the modification time of the object
|
// ModTime returns the modification time of the object
|
||||||
func (o *Object) ModTime(ctx context.Context) time.Time {
|
func (o *Object) ModTime(ctx context.Context) time.Time {
|
||||||
o.fs.objectMetaMu.RLock()
|
o.metaMu.RLock()
|
||||||
defer o.fs.objectMetaMu.RUnlock()
|
defer o.metaMu.RUnlock()
|
||||||
return o.modTime
|
return o.modTime
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1015,9 +1044,9 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error {
|
||||||
|
|
||||||
// Storable returns a boolean showing if this object is storable
|
// Storable returns a boolean showing if this object is storable
|
||||||
func (o *Object) Storable() bool {
|
func (o *Object) Storable() bool {
|
||||||
o.fs.objectMetaMu.RLock()
|
o.metaMu.RLock()
|
||||||
mode := o.mode
|
mode := o.mode
|
||||||
o.fs.objectMetaMu.RUnlock()
|
o.metaMu.RUnlock()
|
||||||
if mode&os.ModeSymlink != 0 && !o.fs.opt.TranslateSymlinks {
|
if mode&os.ModeSymlink != 0 && !o.fs.opt.TranslateSymlinks {
|
||||||
if !o.fs.opt.SkipSymlinks {
|
if !o.fs.opt.SkipSymlinks {
|
||||||
fs.Logf(o, "Can't follow symlink without -L/--copy-links")
|
fs.Logf(o, "Can't follow symlink without -L/--copy-links")
|
||||||
|
@ -1050,10 +1079,10 @@ func (file *localOpenFile) Read(p []byte) (n int, err error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("can't read status of source file while transferring: %w", err)
|
return 0, fmt.Errorf("can't read status of source file while transferring: %w", err)
|
||||||
}
|
}
|
||||||
file.o.fs.objectMetaMu.RLock()
|
file.o.metaMu.RLock()
|
||||||
oldtime := file.o.modTime
|
oldtime := file.o.modTime
|
||||||
oldsize := file.o.size
|
oldsize := file.o.size
|
||||||
file.o.fs.objectMetaMu.RUnlock()
|
file.o.metaMu.RUnlock()
|
||||||
if oldsize != fi.Size() {
|
if oldsize != fi.Size() {
|
||||||
return 0, fserrors.NoLowLevelRetryError(fmt.Errorf("can't copy - source file is being updated (size changed from %d to %d)", oldsize, fi.Size()))
|
return 0, fserrors.NoLowLevelRetryError(fmt.Errorf("can't copy - source file is being updated (size changed from %d to %d)", oldsize, fi.Size()))
|
||||||
}
|
}
|
||||||
|
@ -1075,9 +1104,9 @@ func (file *localOpenFile) Close() (err error) {
|
||||||
err = file.in.Close()
|
err = file.in.Close()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
if file.hash.Size() == file.o.Size() {
|
if file.hash.Size() == file.o.Size() {
|
||||||
file.o.fs.objectMetaMu.Lock()
|
file.o.metaMu.Lock()
|
||||||
file.o.hashes = file.hash.Sums()
|
file.o.hashes = file.hash.Sums()
|
||||||
file.o.fs.objectMetaMu.Unlock()
|
file.o.metaMu.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
@ -1269,9 +1298,9 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
||||||
|
|
||||||
// All successful so update the hashes
|
// All successful so update the hashes
|
||||||
if hasher != nil {
|
if hasher != nil {
|
||||||
o.fs.objectMetaMu.Lock()
|
o.metaMu.Lock()
|
||||||
o.hashes = hasher.Sums()
|
o.hashes = hasher.Sums()
|
||||||
o.fs.objectMetaMu.Unlock()
|
o.metaMu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set the mtime
|
// Set the mtime
|
||||||
|
@ -1345,11 +1374,11 @@ func (o *Object) setMetadata(info os.FileInfo) {
|
||||||
if o.fs.opt.NoCheckUpdated && !o.modTime.IsZero() {
|
if o.fs.opt.NoCheckUpdated && !o.modTime.IsZero() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
o.fs.objectMetaMu.Lock()
|
o.metaMu.Lock()
|
||||||
o.size = info.Size()
|
o.size = info.Size()
|
||||||
o.modTime = info.ModTime()
|
o.modTime = info.ModTime()
|
||||||
o.mode = info.Mode()
|
o.mode = info.Mode()
|
||||||
o.fs.objectMetaMu.Unlock()
|
o.metaMu.Unlock()
|
||||||
// Read the size of the link.
|
// Read the size of the link.
|
||||||
//
|
//
|
||||||
// The value in info.Size() is not always correct
|
// The value in info.Size() is not always correct
|
||||||
|
@ -1368,9 +1397,9 @@ func (o *Object) setMetadata(info os.FileInfo) {
|
||||||
|
|
||||||
// clearHashCache wipes any cached hashes for the object
|
// clearHashCache wipes any cached hashes for the object
|
||||||
func (o *Object) clearHashCache() {
|
func (o *Object) clearHashCache() {
|
||||||
o.fs.objectMetaMu.Lock()
|
o.metaMu.Lock()
|
||||||
o.hashes = nil
|
o.hashes = nil
|
||||||
o.fs.objectMetaMu.Unlock()
|
o.metaMu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stat an Object into info
|
// Stat an Object into info
|
||||||
|
|
Loading…
Add table
Reference in a new issue