forked from TrueCloudLab/rclone
fs: Pin created backends until parents are finalized
This attempts to solve the backend lifecycle problem by - Pinning backends mentioned on the command line into the cache indefinitely - Unpinning backends when the containing structure (VFS, wrapping backend) is destroyed See: https://forum.rclone.org/t/rclone-rc-backend-command-not-working-as-expected/18834
This commit is contained in:
parent
0d066bdf46
commit
70c8566cb8
7 changed files with 23 additions and 5 deletions
1
backend/cache/cache.go
vendored
1
backend/cache/cache.go
vendored
|
@ -385,6 +385,7 @@ func NewFs(name, rootPath string, m configmap.Mapper) (fs.Fs, error) {
|
||||||
cleanupChan: make(chan bool, 1),
|
cleanupChan: make(chan bool, 1),
|
||||||
notifiedRemotes: make(map[string]bool),
|
notifiedRemotes: make(map[string]bool),
|
||||||
}
|
}
|
||||||
|
cache.PinUntilFinalized(f.Fs, f)
|
||||||
f.rateLimiter = rate.NewLimiter(rate.Limit(float64(opt.Rps)), opt.TotalWorkers)
|
f.rateLimiter = rate.NewLimiter(rate.Limit(float64(opt.Rps)), opt.TotalWorkers)
|
||||||
|
|
||||||
f.plexConnector = &plexConnector{}
|
f.plexConnector = &plexConnector{}
|
||||||
|
|
|
@ -262,6 +262,7 @@ func NewFs(name, rpath string, m configmap.Mapper) (fs.Fs, error) {
|
||||||
root: rpath,
|
root: rpath,
|
||||||
opt: *opt,
|
opt: *opt,
|
||||||
}
|
}
|
||||||
|
cache.PinUntilFinalized(f.base, f)
|
||||||
f.dirSort = true // processEntries requires that meta Objects prerun data chunks atm.
|
f.dirSort = true // processEntries requires that meta Objects prerun data chunks atm.
|
||||||
|
|
||||||
if err := f.configure(opt.NameFormat, opt.MetaFormat, opt.HashType); err != nil {
|
if err := f.configure(opt.NameFormat, opt.MetaFormat, opt.HashType); err != nil {
|
||||||
|
|
|
@ -186,6 +186,7 @@ func NewFs(name, rpath string, m configmap.Mapper) (fs.Fs, error) {
|
||||||
opt: *opt,
|
opt: *opt,
|
||||||
cipher: cipher,
|
cipher: cipher,
|
||||||
}
|
}
|
||||||
|
cache.PinUntilFinalized(f.Fs, f)
|
||||||
// the features here are ones we could support, and they are
|
// the features here are ones we could support, and they are
|
||||||
// ANDed with the ones from wrappedFs
|
// ANDed with the ones from wrappedFs
|
||||||
f.features = (&fs.Features{
|
f.features = (&fs.Features{
|
||||||
|
|
|
@ -97,6 +97,7 @@ func New(remote, root string, cacheTime time.Duration) (*Fs, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
f.Fs = myFs
|
f.Fs = myFs
|
||||||
|
cache.PinUntilFinalized(f.Fs, f)
|
||||||
return f, err
|
return f, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -89,8 +89,10 @@ func NewFsFile(remote string) (fs.Fs, string) {
|
||||||
f, err := cache.Get(remote)
|
f, err := cache.Get(remote)
|
||||||
switch err {
|
switch err {
|
||||||
case fs.ErrorIsFile:
|
case fs.ErrorIsFile:
|
||||||
|
cache.Pin(f) // pin indefinitely since it was on the CLI
|
||||||
return f, path.Base(fsPath)
|
return f, path.Base(fsPath)
|
||||||
case nil:
|
case nil:
|
||||||
|
cache.Pin(f) // pin indefinitely since it was on the CLI
|
||||||
return f, ""
|
return f, ""
|
||||||
default:
|
default:
|
||||||
err = fs.CountError(err)
|
err = fs.CountError(err)
|
||||||
|
@ -139,6 +141,7 @@ func newFsDir(remote string) fs.Fs {
|
||||||
err = fs.CountError(err)
|
err = fs.CountError(err)
|
||||||
log.Fatalf("Failed to create file system for %q: %v", remote, err)
|
log.Fatalf("Failed to create file system for %q: %v", remote, err)
|
||||||
}
|
}
|
||||||
|
cache.Pin(f) // pin indefinitely since it was on the CLI
|
||||||
return f
|
return f
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -197,6 +200,7 @@ func NewFsSrcDstFiles(args []string) (fsrc fs.Fs, srcFileName string, fdst fs.Fs
|
||||||
_ = fs.CountError(err)
|
_ = fs.CountError(err)
|
||||||
log.Fatalf("Failed to create file system for destination %q: %v", dstRemote, err)
|
log.Fatalf("Failed to create file system for destination %q: %v", dstRemote, err)
|
||||||
}
|
}
|
||||||
|
cache.Pin(fdst) // pin indefinitely since it was on the CLI
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
13
fs/cache/cache.go
vendored
13
fs/cache/cache.go
vendored
|
@ -2,6 +2,7 @@
|
||||||
package cache
|
package cache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"runtime"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/rclone/rclone/fs"
|
"github.com/rclone/rclone/fs"
|
||||||
|
@ -80,6 +81,18 @@ func Pin(f fs.Fs) {
|
||||||
c.Pin(fs.ConfigString(f))
|
c.Pin(fs.ConfigString(f))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PinUntilFinalized pins f into the cache until x is garbage collected
|
||||||
|
//
|
||||||
|
// This calls runtime.SetFinalizer on x so it shouldn't have a
|
||||||
|
// finalizer already.
|
||||||
|
func PinUntilFinalized(f fs.Fs, x interface{}) {
|
||||||
|
Pin(f)
|
||||||
|
runtime.SetFinalizer(x, func(_ interface{}) {
|
||||||
|
Unpin(f)
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
// Unpin f from the cache
|
// Unpin f from the cache
|
||||||
func Unpin(f fs.Fs) {
|
func Unpin(f fs.Fs) {
|
||||||
c.Pin(fs.ConfigString(f))
|
c.Pin(fs.ConfigString(f))
|
||||||
|
|
|
@ -234,8 +234,8 @@ func New(f fs.Fs, opt *vfscommon.Options) *VFS {
|
||||||
|
|
||||||
// Pin the Fs into the cache so that when we use cache.NewFs
|
// Pin the Fs into the cache so that when we use cache.NewFs
|
||||||
// with the same remote string we get this one. The Pin is
|
// with the same remote string we get this one. The Pin is
|
||||||
// removed by Shutdown
|
// removed when the vfs is finalized
|
||||||
cache.Pin(f)
|
cache.PinUntilFinalized(f, vfs)
|
||||||
|
|
||||||
return vfs
|
return vfs
|
||||||
}
|
}
|
||||||
|
@ -293,9 +293,6 @@ func (vfs *VFS) Shutdown() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unpin the Fs from the cache
|
|
||||||
cache.Unpin(vfs.f)
|
|
||||||
|
|
||||||
// Remove from active cache
|
// Remove from active cache
|
||||||
activeMu.Lock()
|
activeMu.Lock()
|
||||||
configName := fs.ConfigString(vfs.f)
|
configName := fs.ConfigString(vfs.f)
|
||||||
|
|
Loading…
Reference in a new issue