diff --git a/cmd/cmount/fs.go b/cmd/cmount/fs.go index 32a6b8c21..be41038e4 100644 --- a/cmd/cmount/fs.go +++ b/cmd/cmount/fs.go @@ -5,6 +5,7 @@ package cmount import ( + "context" "io" "os" "path" @@ -18,9 +19,11 @@ import ( "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/log" "github.com/rclone/rclone/vfs" + "golang.org/x/time/rate" ) const fhUnset = ^uint64(0) +const tpsBurst = 100 // FS represents the top level filing system type FS struct { @@ -29,19 +32,31 @@ type FS struct { ready chan (struct{}) mu sync.Mutex // to protect the below handles []vfs.Handle - destroyed int32 // read/write with sync/atomic + destroyed int32 // read/write with sync/atomic + tps *rate.Limiter // for limiting number of transactions per second } // NewFS makes a new FS -func NewFS(VFS *vfs.VFS) *FS { +func NewFS(VFS *vfs.VFS, opt *mountlib.Options) *FS { fsys := &FS{ VFS: VFS, f: VFS.Fs(), ready: make(chan (struct{})), } + if opt.TPSLimit > 0 { + fsys.tps = rate.NewLimiter(rate.Limit(opt.TPSLimit), tpsBurst) + fs.Infof(nil, "Starting mount transaction limiter: max %g transactions/s with burst %d", opt.TPSLimit, tpsBurst) + } return fsys } +// Limit the number of transactions per second +func (fsys *FS) rateLimit() { + if fsys.tps != nil { + _ = fsys.tps.Wait(context.Background()) + } +} + // Open a handle returning an integer file handle func (fsys *FS) openHandle(handle vfs.Handle) (fh uint64) { fsys.mu.Lock() @@ -195,6 +210,7 @@ func (fsys *FS) Destroy() { // Getattr reads the attributes for path func (fsys *FS) Getattr(path string, stat *fuse.Stat_t, fh uint64) (errc int) { defer log.Trace(path, "fh=0x%X", fh)("errc=%v", &errc) + fsys.rateLimit() node, _, errc := fsys.getNode(path, fh) if errc == 0 { errc = fsys.stat(node, stat) @@ -205,6 +221,7 @@ func (fsys *FS) Getattr(path string, stat *fuse.Stat_t, fh uint64) (errc int) { // Opendir opens path as a directory func (fsys *FS) Opendir(path string) (errc int, fh uint64) { defer log.Trace(path, "")("errc=%d, fh=0x%X", &errc, &fh) + fsys.rateLimit() handle, err := fsys.VFS.OpenFile(path, os.O_RDONLY, 0777) if err != nil { return translateError(err), fhUnset @@ -219,6 +236,7 @@ func (fsys *FS) Readdir(dirPath string, fh uint64) (errc int) { itemsRead := -1 defer log.Trace(dirPath, "ofst=%d, fh=0x%X", ofst, fh)("items=%d, errc=%d", &itemsRead, &errc) + fsys.rateLimit() dir, errc := fsys.lookupDir(dirPath) if errc != 0 { @@ -270,12 +288,14 @@ func (fsys *FS) Readdir(dirPath string, // Releasedir finished reading the directory func (fsys *FS) Releasedir(path string, fh uint64) (errc int) { defer log.Trace(path, "fh=0x%X", fh)("errc=%d", &errc) + fsys.rateLimit() return fsys.closeHandle(fh) } // Statfs reads overall stats on the filesystem func (fsys *FS) Statfs(path string, stat *fuse.Statfs_t) (errc int) { defer log.Trace(path, "")("stat=%+v, errc=%d", stat, &errc) + fsys.rateLimit() const blockSize = 4096 total, _, free := fsys.VFS.Statfs() stat.Blocks = uint64(total) / blockSize // Total data blocks in file system. @@ -295,6 +315,7 @@ func (fsys *FS) Statfs(path string, stat *fuse.Statfs_t) (errc int) { // OpenEx opens a file func (fsys *FS) OpenEx(path string, fi *fuse.FileInfo_t) (errc int) { defer log.Trace(path, "flags=0x%X", fi.Flags)("errc=%d, fh=0x%X", &errc, &fi.Fh) + fsys.rateLimit() fi.Fh = fhUnset // translate the fuse flags to os flags @@ -325,6 +346,7 @@ func (fsys *FS) Open(path string, flags int) (errc int, fh uint64) { // CreateEx creates and opens a file. func (fsys *FS) CreateEx(filePath string, mode uint32, fi *fuse.FileInfo_t) (errc int) { defer log.Trace(filePath, "flags=0x%X, mode=0%o", fi.Flags, mode)("errc=%d, fh=0x%X", &errc, &fi.Fh) + fsys.rateLimit() fi.Fh = fhUnset leaf, parentDir, errc := fsys.lookupParentDir(filePath) if errc != 0 { @@ -356,6 +378,7 @@ func (fsys *FS) Create(filePath string, flags int, mode uint32) (errc int, fh ui // Truncate truncates a file to size func (fsys *FS) Truncate(path string, size int64, fh uint64) (errc int) { defer log.Trace(path, "size=%d, fh=0x%X", size, fh)("errc=%d", &errc) + fsys.rateLimit() node, handle, errc := fsys.getNode(path, fh) if errc != 0 { return errc @@ -375,6 +398,7 @@ func (fsys *FS) Truncate(path string, size int64, fh uint64) (errc int) { // Read data from file handle func (fsys *FS) Read(path string, buff []byte, ofst int64, fh uint64) (n int) { defer log.Trace(path, "ofst=%d, fh=0x%X", ofst, fh)("n=%d", &n) + fsys.rateLimit() handle, errc := fsys.getHandle(fh) if errc != 0 { return errc @@ -390,6 +414,7 @@ func (fsys *FS) Read(path string, buff []byte, ofst int64, fh uint64) (n int) { // Write data to file handle func (fsys *FS) Write(path string, buff []byte, ofst int64, fh uint64) (n int) { defer log.Trace(path, "ofst=%d, fh=0x%X", ofst, fh)("n=%d", &n) + fsys.rateLimit() handle, errc := fsys.getHandle(fh) if errc != 0 { return errc @@ -404,6 +429,7 @@ func (fsys *FS) Write(path string, buff []byte, ofst int64, fh uint64) (n int) { // Flush flushes an open file descriptor or path func (fsys *FS) Flush(path string, fh uint64) (errc int) { defer log.Trace(path, "fh=0x%X", fh)("errc=%d", &errc) + fsys.rateLimit() handle, errc := fsys.getHandle(fh) if errc != 0 { return errc @@ -414,6 +440,7 @@ func (fsys *FS) Flush(path string, fh uint64) (errc int) { // Release closes the file if still open func (fsys *FS) Release(path string, fh uint64) (errc int) { defer log.Trace(path, "fh=0x%X", fh)("errc=%d", &errc) + fsys.rateLimit() handle, errc := fsys.getHandle(fh) if errc != 0 { return errc @@ -425,6 +452,7 @@ func (fsys *FS) Release(path string, fh uint64) (errc int) { // Unlink removes a file. func (fsys *FS) Unlink(filePath string) (errc int) { defer log.Trace(filePath, "")("errc=%d", &errc) + fsys.rateLimit() leaf, parentDir, errc := fsys.lookupParentDir(filePath) if errc != 0 { return errc @@ -435,6 +463,7 @@ func (fsys *FS) Unlink(filePath string) (errc int) { // Mkdir creates a directory. func (fsys *FS) Mkdir(dirPath string, mode uint32) (errc int) { defer log.Trace(dirPath, "mode=0%o", mode)("errc=%d", &errc) + fsys.rateLimit() leaf, parentDir, errc := fsys.lookupParentDir(dirPath) if errc != 0 { return errc @@ -446,6 +475,7 @@ func (fsys *FS) Mkdir(dirPath string, mode uint32) (errc int) { // Rmdir removes a directory func (fsys *FS) Rmdir(dirPath string) (errc int) { defer log.Trace(dirPath, "")("errc=%d", &errc) + fsys.rateLimit() leaf, parentDir, errc := fsys.lookupParentDir(dirPath) if errc != 0 { return errc @@ -456,6 +486,7 @@ func (fsys *FS) Rmdir(dirPath string) (errc int) { // Rename renames a file. func (fsys *FS) Rename(oldPath string, newPath string) (errc int) { defer log.Trace(oldPath, "newPath=%q", newPath)("errc=%d", &errc) + fsys.rateLimit() return translateError(fsys.VFS.Rename(oldPath, newPath)) } @@ -467,6 +498,7 @@ var invalidDateCutoff = time.Date(1601, 1, 2, 0, 0, 0, 0, time.UTC) // Utimens changes the access and modification times of a file. func (fsys *FS) Utimens(path string, tmsp []fuse.Timespec) (errc int) { defer log.Trace(path, "tmsp=%+v", tmsp)("errc=%d", &errc) + fsys.rateLimit() node, errc := fsys.lookupNode(path) if errc != 0 { return errc @@ -487,12 +519,14 @@ func (fsys *FS) Utimens(path string, tmsp []fuse.Timespec) (errc int) { // Mknod creates a file node. func (fsys *FS) Mknod(path string, mode uint32, dev uint64) (errc int) { defer log.Trace(path, "mode=0x%X, dev=0x%X", mode, dev)("errc=%d", &errc) + fsys.rateLimit() return -fuse.ENOSYS } // Fsync synchronizes file contents. func (fsys *FS) Fsync(path string, datasync bool, fh uint64) (errc int) { defer log.Trace(path, "datasync=%v, fh=0x%X", datasync, fh)("errc=%d", &errc) + fsys.rateLimit() // This is a no-op for rclone return 0 } @@ -500,24 +534,28 @@ func (fsys *FS) Fsync(path string, datasync bool, fh uint64) (errc int) { // Link creates a hard link to a file. func (fsys *FS) Link(oldpath string, newpath string) (errc int) { defer log.Trace(oldpath, "newpath=%q", newpath)("errc=%d", &errc) + fsys.rateLimit() return -fuse.ENOSYS } // Symlink creates a symbolic link. func (fsys *FS) Symlink(target string, newpath string) (errc int) { defer log.Trace(target, "newpath=%q", newpath)("errc=%d", &errc) + fsys.rateLimit() return -fuse.ENOSYS } // Readlink reads the target of a symbolic link. func (fsys *FS) Readlink(path string) (errc int, linkPath string) { defer log.Trace(path, "")("linkPath=%q, errc=%d", &linkPath, &errc) + fsys.rateLimit() return -fuse.ENOSYS, "" } // Chmod changes the permission bits of a file. func (fsys *FS) Chmod(path string, mode uint32) (errc int) { defer log.Trace(path, "mode=0%o", mode)("errc=%d", &errc) + fsys.rateLimit() // This is a no-op for rclone return 0 } @@ -525,6 +563,7 @@ func (fsys *FS) Chmod(path string, mode uint32) (errc int) { // Chown changes the owner and group of a file. func (fsys *FS) Chown(path string, uid uint32, gid uint32) (errc int) { defer log.Trace(path, "uid=%d, gid=%d", uid, gid)("errc=%d", &errc) + fsys.rateLimit() // This is a no-op for rclone return 0 } @@ -532,6 +571,7 @@ func (fsys *FS) Chown(path string, uid uint32, gid uint32) (errc int) { // Access checks file access permissions. func (fsys *FS) Access(path string, mask uint32) (errc int) { defer log.Trace(path, "mask=0%o", mask)("errc=%d", &errc) + fsys.rateLimit() // This is a no-op for rclone return 0 } @@ -539,27 +579,32 @@ func (fsys *FS) Access(path string, mask uint32) (errc int) { // Fsyncdir synchronizes directory contents. func (fsys *FS) Fsyncdir(path string, datasync bool, fh uint64) (errc int) { defer log.Trace(path, "datasync=%v, fh=0x%X", datasync, fh)("errc=%d", &errc) + fsys.rateLimit() // This is a no-op for rclone return 0 } // Setxattr sets extended attributes. func (fsys *FS) Setxattr(path string, name string, value []byte, flags int) (errc int) { + fsys.rateLimit() return -fuse.ENOSYS } // Getxattr gets extended attributes. func (fsys *FS) Getxattr(path string, name string) (errc int, value []byte) { + fsys.rateLimit() return -fuse.ENOSYS, nil } // Removexattr removes extended attributes. func (fsys *FS) Removexattr(path string, name string) (errc int) { + fsys.rateLimit() return -fuse.ENOSYS } // Listxattr lists extended attributes. func (fsys *FS) Listxattr(path string, fill func(name string) bool) (errc int) { + fsys.rateLimit() return -fuse.ENOSYS } diff --git a/cmd/cmount/mount.go b/cmd/cmount/mount.go index de5691399..6657d7165 100644 --- a/cmd/cmount/mount.go +++ b/cmd/cmount/mount.go @@ -138,7 +138,7 @@ func mount(VFS *vfs.VFS, mountPath string, opt *mountlib.Options) (<-chan error, // Create underlying FS f := VFS.Fs() - fsys := NewFS(VFS) + fsys := NewFS(VFS, opt) host := fuse.NewFileSystemHost(fsys) host.SetCapReaddirPlus(true) // only works on Windows host.SetCapCaseInsensitive(f.Features().CaseInsensitive)