diff --git a/fs/fserrors/enospc_error.go b/fs/fserrors/enospc_error.go new file mode 100644 index 000000000..a2cb1d496 --- /dev/null +++ b/fs/fserrors/enospc_error.go @@ -0,0 +1,23 @@ +// +build !plan9 + +package fserrors + +import ( + "syscall" + + "github.com/rclone/rclone/lib/errors" +) + +// IsErrNoSpace checks a possibly wrapped error to +// see if it contains a ENOSPC error +func IsErrNoSpace(cause error) (isNoSpc bool) { + errors.Walk(cause, func(c error) bool { + if c == syscall.ENOSPC { + isNoSpc = true + return true + } + isNoSpc = false + return false + }) + return +} diff --git a/fs/fserrors/enospc_error_notsupported.go b/fs/fserrors/enospc_error_notsupported.go new file mode 100644 index 000000000..c4cd1e940 --- /dev/null +++ b/fs/fserrors/enospc_error_notsupported.go @@ -0,0 +1,10 @@ +// +build plan9 + +package fserrors + +// IsErrNoSpace() on plan9 returns false because +// plan9 does not support syscall.ENOSPC error. +func IsErrNoSpace(cause error) (isNoSpc bool) { + isNoSpc = false + return +} diff --git a/go.mod b/go.mod index e2b2bbe32..a8a257675 100644 --- a/go.mod +++ b/go.mod @@ -63,6 +63,7 @@ require ( golang.org/x/sys v0.0.0-20200720211630-cb9d2d5c5666 golang.org/x/text v0.3.3 golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 + golang.org/x/tools v0.0.0-20200820180210-c8f393745106 // indirect google.golang.org/api v0.28.0 google.golang.org/genproto v0.0.0-20200626011028-ee7919e894b5 // indirect google.golang.org/grpc v1.30.0 // indirect diff --git a/go.sum b/go.sum index 89ce6229a..5319ab629 100644 --- a/go.sum +++ b/go.sum @@ -427,6 +427,7 @@ github.com/youmark/pkcs8 v0.0.0-20200520070018-fad002e585ce h1:F5MEHq8k6JiE10MNY github.com/youmark/pkcs8 v0.0.0-20200520070018-fad002e585ce/go.mod h1:ul22v+Nro/R083muKhosV54bj5niojjWZvU8xrevuH4= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yunify/qingstor-sdk-go/v3 v3.2.0 h1:9sB2WZMgjwSUNZhrgvaNGazVltoFUUfuS9f0uCWtTr8= github.com/yunify/qingstor-sdk-go/v3 v3.2.0/go.mod h1:KciFNuMu6F4WLk9nGwwK69sCGKLCdd9f97ac/wfumS4= github.com/zeebo/admission/v3 v3.0.1/go.mod h1:BP3isIv9qa2A7ugEratNq1dnl2oZRXaQUGdU7WXKtbw= @@ -543,6 +544,7 @@ golang.org/x/net v0.0.0-20200501053045-e0ff5e5a1de5/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/net v0.0.0-20200506145744-7e3656a0809f/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200513185701-a91f0712d120/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200707034311-ab3426394381 h1:VXak5I6aEWmAXeQjA+QSZzlgNrpq9mjcfDemuexIKsU= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -654,6 +656,8 @@ golang.org/x/tools v0.0.0-20200512131952-2bc93b1c0c88/go.mod h1:EkVYQZoAsY45+roY golang.org/x/tools v0.0.0-20200515010526-7d3b6ebf133d/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20200622203043-20e05c1c8ffa h1:mMXQKlWCw9mIWgVLLfiycDZjMHMMYqiuakI4E/l2xcA= golang.org/x/tools v0.0.0-20200622203043-20e05c1c8ffa/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20200820180210-c8f393745106 h1:42Zs/g7pjhSIE/wiAuKcp8zp20zv7W2diNU6arpshOA= +golang.org/x/tools v0.0.0-20200820180210-c8f393745106/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= diff --git a/vfs/read_write.go b/vfs/read_write.go index 790c43e1f..7fa04a7d6 100644 --- a/vfs/read_write.go +++ b/vfs/read_write.go @@ -260,7 +260,9 @@ func (fh *RWFileHandle) _readAt(b []byte, off int64, release bool) (n int, err e // Do the writing with fh.mu unlocked fh.mu.Unlock() } + n, err = fh.item.ReadAt(b, off) + if release { fh.mu.Lock() } diff --git a/vfs/vfscache/cache.go b/vfs/vfscache/cache.go index a6cf6eb45..eef969361 100644 --- a/vfs/vfscache/cache.go +++ b/vfs/vfscache/cache.go @@ -17,6 +17,7 @@ import ( "github.com/rclone/rclone/fs" fscache "github.com/rclone/rclone/fs/cache" "github.com/rclone/rclone/fs/config" + "github.com/rclone/rclone/fs/fserrors" "github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/operations" "github.com/rclone/rclone/lib/file" @@ -47,9 +48,16 @@ type Cache struct { writeback *writeback.WriteBack // holds Items for writeback avFn AddVirtualFn // if set, can be called to add dir entries - mu sync.Mutex // protects the following variables - item map[string]*Item // files/directories in the cache - used int64 // total size of files in the cache + mu sync.Mutex // protects the following variables + cond *sync.Cond // cond lock for synchronous cache cleaning + item map[string]*Item // files/directories in the cache + errItems map[string]error // items in error state + used int64 // total size of files in the cache + outOfSpace bool // out of space + cleanerKicked bool // some thread kicked the cleaner upon out of space + kickerMu sync.Mutex // mutex for clearnerKicked + kick chan struct{} // channel for kicking clear to start + } // AddVirtualFn if registered by the WithAddVirtual method, can be @@ -96,6 +104,7 @@ func New(ctx context.Context, fremote fs.Fs, opt *vfscommon.Options, avFn AddVir root: root, metaRoot: metaRoot, item: make(map[string]*Item), + errItems: make(map[string]error), hashType: hashType, hashOption: hashOption, writeback: writeback.New(ctx, opt), @@ -117,6 +126,10 @@ func New(ctx context.Context, fremote fs.Fs, opt *vfscommon.Options, avFn AddVir // Remove any empty directories c.purgeEmptyDirs() + // Create a channel for cleaner to be kicked upon out of space con + c.kick = make(chan struct{}, 1) + c.cond = sync.NewCond(&c.mu) + go c.cleaner(ctx) return c, nil @@ -401,28 +414,125 @@ func (c *Cache) reload(ctx context.Context) error { return nil } -// purgeOld gets rid of any files that are over age -func (c *Cache) purgeOld(maxAge time.Duration) { - c._purgeOld(maxAge, func(item *Item) { - item.remove("too old") - }) +// KickCleaner kicks cache cleaner upon out of space situation +func (c *Cache) KickCleaner() { + /* Use a separate kicker mutex for the kick to go through without waiting for the + cache mutex to avoid letting a thread kick again after the clearer just + finished cleaning and unlock the cache mutex. */ + fs.Debugf(nil, "vfs cache: at the beginning of KickCleaner") + c.kickerMu.Lock() + if !c.cleanerKicked { + c.cleanerKicked = true + fs.Debugf(nil, "vfs cache: in KickCleaner, ready to lock cache mutex") + c.mu.Lock() + c.outOfSpace = true + fs.Logf(nil, "vfs cache: in KickCleaner, ready to kick cleaner") + c.kick <- struct{}{} + c.mu.Unlock() + } + c.kickerMu.Unlock() + + c.mu.Lock() + for c.outOfSpace == true { + fs.Debugf(nil, "vfs cache: in KickCleaner, looping on c.outOfSpace") + c.cond.Wait() + } + fs.Debugf(nil, "vfs cache: in KickCleaner, leaving c.outOfSpace loop") + c.mu.Unlock() } -func (c *Cache) _purgeOld(maxAge time.Duration, remove func(item *Item)) { - c.mu.Lock() - defer c.mu.Unlock() - cutoff := time.Now().Add(-maxAge) - for name, item := range c.item { - if !item.inUse() { - // If not locked and access time too long ago - delete the file - dt := item.getATime().Sub(cutoff) - // fs.Debugf(name, "atime=%v cutoff=%v, dt=%v", item.info.ATime, cutoff, dt) - if dt < 0 { - remove(item) - // Remove the entry - delete(c.item, name) +// removeNotInUse removes items not in use with a possible maxAge cutoff +// called with cache mutex locked and up-to-date c.used (as we update it directly here) +func (c *Cache) removeNotInUse(item *Item, maxAge time.Duration, emptyOnly bool) { + removed, spaceFreed := item.RemoveNotInUse(maxAge, emptyOnly) + // The item space might be freed even if we get an error after the cache file is removed + // The item will not be removed or reset the cache data is dirty (DataDirty) + c.used -= spaceFreed + if removed { + fs.Infof(nil, "vfs cache RemoveNotInUse (maxAge=%d, emptyOnly=%v): item %s was removed, freed %d bytes", maxAge, emptyOnly, item.GetName(), spaceFreed) + // Remove the entry + delete(c.item, item.name) + } else { + fs.Infof(nil, "vfs cache RemoveNotInUse (maxAge=%d, emptyOnly=%v): item %s not removed, freed %d bytes", maxAge, emptyOnly, item.GetName(), spaceFreed) + } + return +} + +// Retry failed resets during purgeClean() +func (c *Cache) retryFailedResets() { + // Some items may have failed to reset becasue there was not enough space + // for saving the cache item's metadata. Redo the Reset()'s here now that + // we may have some available space. + if len(c.errItems) != 0 { + fs.Debugf(nil, "vfs cache reset: before redoing reset errItems = %v", c.errItems) + for itemName := range c.errItems { + _, _, err := c.item[itemName].Reset() + if err == nil || !fserrors.IsErrNoSpace(err) { + // TODO: not trying to handle non-ENOSPC errors yet + delete(c.errItems, itemName) } } + fs.Debugf(nil, "vfs cache reset: after redoing reset errItems = %v", c.errItems) + } +} + +func (c *Cache) purgeClean(quota int64) { + c.mu.Lock() + defer c.mu.Unlock() + + var items Items + + if quota <= 0 || c.used < quota { + return + } + + // Make a slice of clean cache files + for _, item := range c.item { + if !item.IsDataDirty() { + items = append(items, item) + } + } + + sort.Sort(items) + + // Reset items until the quota is OK + for _, item := range items { + if c.used < quota { + break + } + resetResult, spaceFreed, err := item.Reset() + // The item space might be freed even if we get an error after the cache file is removed + // The item will not be removed or reset if the cache data is dirty (DataDirty) + c.used -= spaceFreed + fs.Infof(nil, "vfs cache purgeClean item.Reset %s: %s, freed %d bytes", item.GetName(), resetResult.String(), spaceFreed) + if resetResult == RemovedNotInUse { + delete(c.item, item.name) + } + if err != nil { + fs.Errorf(nil, "vfs cache purgeClean item.Reset %s reset failed, err = %v, freed %d bytes", item.GetName(), err, spaceFreed) + c.errItems[item.name] = err + } + } + + // Resest outOfSpace without checking whether we have reduced cache space below the quota. + // This allows some files to reduce their pendingAccesses count to allow them to be reset + // in the next iteration of the purge cleaner loop. + + c.outOfSpace = false + c.cond.Broadcast() +} + +// purgeOld gets rid of any files that are over age +func (c *Cache) purgeOld(maxAge time.Duration) { + c.mu.Lock() + defer c.mu.Unlock() + // cutoff := time.Now().Add(-maxAge) + for _, item := range c.item { + c.removeNotInUse(item, maxAge, false) + } + if c.used < int64(c.opt.CacheMaxSize) { + c.outOfSpace = false + c.cond.Broadcast() } } @@ -439,16 +549,8 @@ func (c *Cache) purgeEmptyDirs() { } } -// Remove any files that are over quota starting from the -// oldest first -func (c *Cache) purgeOverQuota(quota int64) { - c._purgeOverQuota(quota, func(item *Item) { - item.remove("over quota") - }) -} - // updateUsed updates c.used so it is accurate -func (c *Cache) updateUsed() { +func (c *Cache) updateUsed() (used int64) { c.mu.Lock() defer c.mu.Unlock() @@ -457,15 +559,19 @@ func (c *Cache) updateUsed() { newUsed += item.getDiskSize() } c.used = newUsed + return newUsed } -func (c *Cache) _purgeOverQuota(quota int64, remove func(item *Item)) { +// Remove clean cache files that are not open until the total space +// is reduced below quota starting from the oldest first +func (c *Cache) purgeOverQuota(quota int64) { c.updateUsed() c.mu.Lock() defer c.mu.Unlock() if quota <= 0 || c.used < quota { + return } @@ -482,18 +588,16 @@ func (c *Cache) _purgeOverQuota(quota int64, remove func(item *Item)) { // Remove items until the quota is OK for _, item := range items { - if c.used < quota { - break - } - c.used -= item.getDiskSize() - remove(item) - // Remove the entry - delete(c.item, item.name) + c.removeNotInUse(item, 0, c.used <= quota) + } + if c.used < quota { + c.outOfSpace = false + c.cond.Broadcast() } } // clean empties the cache of stuff if it can -func (c *Cache) clean() { +func (c *Cache) clean(removeCleanFiles bool) { // Cache may be empty so end _, err := os.Stat(c.root) if os.IsNotExist(err) { @@ -504,12 +608,37 @@ func (c *Cache) clean() { oldItems, oldUsed := len(c.item), fs.SizeSuffix(c.used) c.mu.Unlock() - // Remove any files that are over age - c.purgeOld(c.opt.CacheMaxAge) + // loop cleaning the cache until we reach below cache quota + for { + // Remove any files that are over age + c.purgeOld(c.opt.CacheMaxAge) - // Now remove any files that are over quota starting from the - // oldest first - c.purgeOverQuota(int64(c.opt.CacheMaxSize)) + // Now remove files not in use until cache size is below quota starting from the + // oldest first + c.purgeOverQuota(int64(c.opt.CacheMaxSize)) + + // removeCleanFiles indicates that we got ENOSPC error + // We remove cache files that are not dirty if we are still avove the max cache size + if removeCleanFiles { + c.purgeClean(int64(c.opt.CacheMaxSize)) + c.retryFailedResets() + } else { + break + } + + used := c.updateUsed() + if used <= int64(c.opt.CacheMaxSize) && len(c.errItems) == 0 { + break + } + } + + // Was kicked? + if removeCleanFiles { + c.kickerMu.Lock() // Make sure this is called with cache mutex unlocked + // Reenable io threads to kick me + c.cleanerKicked = false + c.kickerMu.Unlock() + } // Stats c.mu.Lock() @@ -526,7 +655,7 @@ func (c *Cache) clean() { fs.Infof(nil, "vfs cache: cleaned: objects %d (was %d) in use %d, to upload %d, uploading %d, total size %v (was %v)", newItems, oldItems, totalInUse, uploadsQueued, uploadsInProgress, newUsed, oldUsed) } -// cleaner calls clean at regular intervals +// cleaner calls clean at regular intervals and upon being kicked for out-of-space condition // // doesn't return until context is cancelled func (c *Cache) cleaner(ctx context.Context) { @@ -535,14 +664,16 @@ func (c *Cache) cleaner(ctx context.Context) { return } // Start cleaning the cache immediately - c.clean() + c.clean(false) // Then every interval specified timer := time.NewTicker(c.opt.CachePollInterval) defer timer.Stop() for { select { + case <-c.kick: // a thread encountering ENOSPC kicked me + c.clean(true) // remove inUse files that are clean (!item.info.Dirty) case <-timer.C: - c.clean() + c.clean(false) // do not remove inUse files case <-ctx.Done(): fs.Debugf(nil, "vfs cache: cleaner exiting") return diff --git a/vfs/vfscache/cache_test.go b/vfs/vfscache/cache_test.go index 4418b7993..e2b3f6d4b 100644 --- a/vfs/vfscache/cache_test.go +++ b/vfs/vfscache/cache_test.go @@ -33,6 +33,19 @@ func itemAsString(c *Cache) []string { return out } +// convert c.item to a string +func itemSpaceAsString(c *Cache) []string { + c.mu.Lock() + defer c.mu.Unlock() + var out []string + for name, item := range c.item { + space := item.info.Rs.Size() + out = append(out, fmt.Sprintf("name=%q opens=%d size=%d space=%d", filepath.ToSlash(name), item.opens, item.info.Size, space)) + } + sort.Strings(out) + return out +} + // open an item and write to it func itemWrite(t *testing.T, item *Item, contents string) { require.NoError(t, item.Open(nil)) @@ -174,7 +187,7 @@ func TestCacheNew(t *testing.T) { assertPathNotExist(t, p) // clean - have tested the internals already - c.clean() + c.clean(false) } func TestCacheOpens(t *testing.T) { @@ -279,15 +292,7 @@ func TestCachePurgeOld(t *testing.T) { defer cleanup() // Test funcs - var removed []string - removeFile := func(item *Item) { - removed = append(removed, item.name) - item.remove("TestCachePurgeOld") - } - - removed = nil - c._purgeOld(-10*time.Second, removeFile) - assert.Equal(t, []string(nil), removed) + c.purgeOld(-10 * time.Second) potato2 := c.Item("sub/dir2/potato2") require.NoError(t, potato2.Open(nil)) @@ -301,17 +306,23 @@ func TestCachePurgeOld(t *testing.T) { `name="sub/dir2/potato2" opens=0 size=0`, }, itemAsString(c)) - removed = nil - c._purgeOld(-10*time.Second, removeFile) + c.purgeOld(-10 * time.Second) + assert.Equal(t, []string{ - "sub/dir2/potato2", - }, removed) + `name="sub/dir/potato" opens=2 size=0`, + }, itemAsString(c)) require.NoError(t, potato.Close(nil)) - removed = nil - c._purgeOld(-10*time.Second, removeFile) - assert.Equal(t, []string(nil), removed) + assert.Equal(t, []string{ + `name="sub/dir/potato" opens=1 size=0`, + }, itemAsString(c)) + + c.purgeOld(-10 * time.Second) + + assert.Equal(t, []string{ + `name="sub/dir/potato" opens=1 size=0`, + }, itemAsString(c)) require.NoError(t, potato.Close(nil)) @@ -319,19 +330,13 @@ func TestCachePurgeOld(t *testing.T) { `name="sub/dir/potato" opens=0 size=0`, }, itemAsString(c)) - removed = nil - c._purgeOld(10*time.Second, removeFile) - assert.Equal(t, []string(nil), removed) + c.purgeOld(10 * time.Second) assert.Equal(t, []string{ `name="sub/dir/potato" opens=0 size=0`, }, itemAsString(c)) - removed = nil - c._purgeOld(-10*time.Second, removeFile) - assert.Equal(t, []string{ - "sub/dir/potato", - }, removed) + c.purgeOld(-10 * time.Second) assert.Equal(t, []string(nil), itemAsString(c)) } @@ -341,23 +346,6 @@ func TestCachePurgeOverQuota(t *testing.T) { defer cleanup() // Test funcs - var removed []string - remove := func(item *Item) { - removed = append(removed, item.name) - item.remove("TestCachePurgeOverQuota") - } - - removed = nil - c._purgeOverQuota(-1, remove) - assert.Equal(t, []string(nil), removed) - - removed = nil - c._purgeOverQuota(0, remove) - assert.Equal(t, []string(nil), removed) - - removed = nil - c._purgeOverQuota(1, remove) - assert.Equal(t, []string(nil), removed) // Make some test files potato := c.Item("sub/dir/potato") @@ -372,9 +360,7 @@ func TestCachePurgeOverQuota(t *testing.T) { }, itemAsString(c)) // Check nothing removed - removed = nil - c._purgeOverQuota(1, remove) - assert.Equal(t, []string(nil), removed) + c.purgeOverQuota(1) // Close the files require.NoError(t, potato.Close(nil)) @@ -393,11 +379,7 @@ func TestCachePurgeOverQuota(t *testing.T) { potato2.info.ATime = t1 // Check only potato removed to get below quota - removed = nil - c._purgeOverQuota(10, remove) - assert.Equal(t, []string{ - "sub/dir/potato", - }, removed) + c.purgeOverQuota(10) assert.Equal(t, int64(6), c.used) assert.Equal(t, []string{ @@ -423,11 +405,7 @@ func TestCachePurgeOverQuota(t *testing.T) { potato.info.ATime = t2 // Check only potato2 removed to get below quota - removed = nil - c._purgeOverQuota(10, remove) - assert.Equal(t, []string{ - "sub/dir2/potato2", - }, removed) + c.purgeOverQuota(10) assert.Equal(t, int64(5), c.used) c.purgeEmptyDirs() @@ -436,22 +414,82 @@ func TestCachePurgeOverQuota(t *testing.T) { }, itemAsString(c)) // Now purge everything - removed = nil - c._purgeOverQuota(1, remove) - assert.Equal(t, []string{ - "sub/dir/potato", - }, removed) + c.purgeOverQuota(1) assert.Equal(t, int64(0), c.used) c.purgeEmptyDirs() assert.Equal(t, []string(nil), itemAsString(c)) // Check nothing left behind - c.clean() + c.clean(false) assert.Equal(t, int64(0), c.used) assert.Equal(t, []string(nil), itemAsString(c)) } +// test reset clean files +func TestCachePurgeClean(t *testing.T) { + r, c, cleanup := newItemTestCache(t) + defer cleanup() + contents, obj, patato1 := newFile(t, r, c, "existing") + _ = contents + + // Open the object to create metadata for it + require.NoError(t, patato1.Open(obj)) + require.NoError(t, patato1.Open(obj)) + + size, err := patato1.GetSize() + require.NoError(t, err) + assert.Equal(t, int64(100), size) + + // Read something to instantiate the cache file + buf := make([]byte, 10) + _, err = patato1.ReadAt(buf, 10) + require.NoError(t, err) + + // Test cache file present + _, err = os.Stat(patato1.c.toOSPath(patato1.name)) + require.NoError(t, err) + + // Add some potatos + potato2 := c.Item("sub/dir/potato2") + require.NoError(t, potato2.Open(nil)) + require.NoError(t, potato2.Truncate(5)) + + potato3 := c.Item("sub/dir/potato3") + require.NoError(t, potato3.Open(nil)) + require.NoError(t, potato3.Truncate(6)) + + c.updateUsed() + c.purgeClean(1) + assert.Equal(t, []string{ + `name="existing" opens=2 size=100 space=0`, + `name="sub/dir/potato2" opens=1 size=5 space=5`, + `name="sub/dir/potato3" opens=1 size=6 space=6`, + }, itemSpaceAsString(c)) + assert.Equal(t, int64(11), c.used) + + require.NoError(t, potato2.Close(nil)) + c.purgeClean(1) + assert.Equal(t, []string{ + `name="existing" opens=2 size=100 space=0`, + `name="sub/dir/potato3" opens=1 size=6 space=6`, + }, itemSpaceAsString(c)) + assert.Equal(t, int64(6), c.used) + + require.NoError(t, patato1.Close(nil)) + require.NoError(t, patato1.Close(nil)) + require.NoError(t, potato3.Close(nil)) + + // Remove all files now. The are all not in use. + // purgeClean does not remove empty cache files. purgeOverQuota does. + // So we use purgeOverQuota here for the cleanup. + c.purgeOverQuota(1) + + c.purgeEmptyDirs() + + assert.Equal(t, []string(nil), itemAsString(c)) +} + func TestCacheInUse(t *testing.T) { _, c, cleanup := newTestCache(t) defer cleanup() diff --git a/vfs/vfscache/downloaders/downloaders.go b/vfs/vfscache/downloaders/downloaders.go index f2c22ee2c..b7cde1d9a 100644 --- a/vfs/vfscache/downloaders/downloaders.go +++ b/vfs/vfscache/downloaders/downloaders.go @@ -10,6 +10,7 @@ import ( "github.com/rclone/rclone/fs/accounting" "github.com/rclone/rclone/fs/asyncreader" "github.com/rclone/rclone/fs/chunkedreader" + "github.com/rclone/rclone/fs/fserrors" "github.com/rclone/rclone/lib/ranges" "github.com/rclone/rclone/vfs/vfscommon" ) @@ -146,7 +147,9 @@ func (dls *Downloaders) _countErrors(n int64, err error) { return } if err != nil { + //if err != syscall.ENOSPC { dls.errorCount++ + //} dls.lastErr = err fs.Infof(dls.src, "vfs cache: downloader: error count now %d: %v", dls.errorCount, err) } @@ -404,6 +407,11 @@ func (dls *Downloaders) kickWaiters() (err error) { fs.Errorf(dls.src, "vfs cache: restart download failed: %v", err) } } + if fserrors.IsErrNoSpace(dls.lastErr) { + fs.Errorf(dls.src, "vfs cache: cache is out of space %d/%d: last error: %v", dls.errorCount, maxErrorCount, dls.lastErr) + dls._closeWaiters(dls.lastErr) + return dls.lastErr + } if dls.errorCount > maxErrorCount { fs.Errorf(dls.src, "vfs cache: too many errors %d/%d: last error: %v", dls.errorCount, maxErrorCount, dls.lastErr) @@ -600,6 +608,7 @@ func (dl *downloader) download() (n int64, err error) { if err != nil && errors.Cause(err) != asyncreader.ErrorStreamAbandoned { return n, errors.Wrap(err, "vfs reader: failed to write to cache file") } + return n, nil } diff --git a/vfs/vfscache/item.go b/vfs/vfscache/item.go index 17317a34a..2f3749609 100644 --- a/vfs/vfscache/item.go +++ b/vfs/vfscache/item.go @@ -11,6 +11,7 @@ import ( "github.com/pkg/errors" "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/fserrors" "github.com/rclone/rclone/fs/operations" "github.com/rclone/rclone/lib/file" "github.com/rclone/rclone/lib/ranges" @@ -47,19 +48,20 @@ import ( // The Info field is written to the backing store to store status type Item struct { // read only - c *Cache // cache this is part of - - mu sync.Mutex // protect the variables - name string // name in the VFS - opens int // number of times file is open - downloaders *downloaders.Downloaders // a record of the downloaders in action - may be nil - o fs.Object // object we are caching - may be nil - fd *os.File // handle we are using to read and write to the file - metaDirty bool // set if the info needs writeback - modified bool // set if the file has been modified since the last Open - info Info // info about the file to persist to backing store - writeBackID writeback.Handle // id of any writebacks in progress - + c *Cache // cache this is part of + mu sync.Mutex // protect the variables + cond *sync.Cond // synchronize with cache cleaner + name string // name in the VFS + opens int // number of times file is open + downloaders *downloaders.Downloaders // a record of the downloaders in action - may be nil + o fs.Object // object we are caching - may be nil + fd *os.File // handle we are using to read and write to the file + metaDirty bool // set if the info needs writeback + modified bool // set if the file has been modified since the last Open + info Info // info about the file to persist to backing store + writeBackID writeback.Handle // id of any writebacks in progress + pendingAccesses int // number of threads - cache reset not allowed if not zero + beingReset bool // cache cleaner is resetting the cache file, access not allowed } // Info is persisted to backing store @@ -75,6 +77,24 @@ type Info struct { // Items are a slice of *Item ordered by ATime type Items []*Item +// ResetResult reports the actual action taken in the Reset function and reason +type ResetResult int + +// Constants used to report actual action taken in the Reset function and reason +const ( + SkippedDirty ResetResult = iota // Dirty item cannot be reset + SkippedPendingAccess // Reset pending access can lead to deadlock + SkippedEmpty // Reset empty item does not save space + RemovedNotInUse // Item not used. Remove instead of reset + ResetFailed // Reset failed with an error + ResetComplete // Reset completed successfully +) + +func (rr ResetResult) String() string { + return [...]string{"Dirty item skipped", "In-access item skipped", "Empty item skipped", + "Not-in-use item removed", "Item reset failed", "Item reset completed"}[rr] +} + func (v Items) Len() int { return len(v) } func (v Items) Swap(i, j int) { v[i], v[j] = v[j], v[i] } func (v Items) Less(i, j int) bool { @@ -112,7 +132,7 @@ func newItem(c *Cache, name string) (item *Item) { ATime: now, }, } - + item.cond = sync.NewCond(&item.mu) // check the cache file exists osPath := c.toOSPath(name) fi, statErr := os.Stat(osPath) @@ -340,6 +360,13 @@ func (item *Item) _getSize() (size int64, err error) { return size, err } +// GetName gets the vfs name of the item +func (item *Item) GetName() (name string) { + item.mu.Lock() + defer item.mu.Unlock() + return item.name +} + // GetSize gets the current size of the item func (item *Item) GetSize() (size int64, err error) { item.mu.Lock() @@ -399,34 +426,20 @@ func (item *Item) IsDirty() bool { return item.metaDirty || item.info.Dirty } -// Open the local file from the object passed in (which may be nil) -// which implies we are about to create the file -func (item *Item) Open(o fs.Object) (err error) { - // defer log.Trace(o, "item=%p", item)("err=%v", &err) +// IsDataDirty returns true if the item's data is dirty +func (item *Item) IsDataDirty() bool { item.mu.Lock() defer item.mu.Unlock() + return item.info.Dirty +} - item.info.ATime = time.Now() - item.opens++ - - osPath, err := item.c.mkdir(item.name) // No locking in Cache - if err != nil { - return errors.Wrap(err, "vfs cache item: open mkdir failed") - } - - err = item._checkObject(o) - if err != nil { - return errors.Wrap(err, "vfs cache item: check object failed") - } - - if item.opens != 1 { - return nil - } +// Create the cache file and store the metadata on disk +// Called with item.mu locked +func (item *Item) _createFile(osPath string) (err error) { if item.fd != nil { return errors.New("vfs cache item: internal error: didn't Close file") } item.modified = false - fd, err := file.OpenFile(osPath, os.O_RDWR, 0600) if err != nil { return errors.Wrap(err, "vfs cache item: open failed") @@ -439,9 +452,67 @@ func (item *Item) Open(o fs.Object) (err error) { err = item._save() if err != nil { - return err + closeErr := item.fd.Close() + if closeErr != nil { + fs.Errorf(item.name, "vfs cache: item.fd.Close: closeErr: %v", err) + } + item.fd = nil + return errors.Wrap(err, "vfs cache item: _save failed") + } + return err +} + +// Open the local file from the object passed in. Wraps open() +// to provide recovery from out of space error. +func (item *Item) Open(o fs.Object) (err error) { + for retries := 0; retries < fs.Config.LowLevelRetries; retries++ { + item.preAccess() + err = item.open(o) + item.postAccess() + if err == nil { + break + } + fs.Errorf(item.name, "vfs cache: failed to open item: %v", err) + if !fserrors.IsErrNoSpace(err) && err.Error() != "no space left on device" { + fs.Errorf(item.name, "Non-out-of-space error encountered during open") + break + } + item.c.KickCleaner() + } + return err +} + +// Open the local file from the object passed in (which may be nil) +// which implies we are about to create the file +func (item *Item) open(o fs.Object) (err error) { + // defer log.Trace(o, "item=%p", item)("err=%v", &err) + item.mu.Lock() + defer item.mu.Unlock() + + item.info.ATime = time.Now() + + osPath, err := item.c.mkdir(item.name) // No locking in Cache + if err != nil { + return errors.Wrap(err, "vfs cache item: open mkdir failed") } + err = item._checkObject(o) + if err != nil { + return errors.Wrap(err, "vfs cache item: check object failed") + } + + item.opens++ + if item.opens != 1 { + return nil + } + + err = item._createFile(osPath) + if err != nil { + item._remove("item.open failed on _createFile, remove cache data/metadata files") + item.fd = nil + item.opens-- + return errors.Wrap(err, "vfs cache item: create cache file failed") + } // Unlock the Item.mu so we can call some methods which take Cache.mu item.mu.Unlock() @@ -767,6 +838,197 @@ func (item *Item) remove(reason string) (wasWriting bool) { return item._remove(reason) } +// RemoveNotInUse is called to remove cache file that has not been accessed recently +// It may also be called for removing empty cache files too when the quota is already reached. +func (item *Item) RemoveNotInUse(maxAge time.Duration, emptyOnly bool) (removed bool, spaceFreed int64) { + item.mu.Lock() + defer item.mu.Unlock() + + spaceFreed = 0 + removed = false + + if item.opens != 0 || item.metaDirty || item.info.Dirty { + return + } + + removeIt := false + if maxAge == 0 { + removeIt = true // quota-driven removal + } + if maxAge != 0 { + cutoff := time.Now().Add(-maxAge) + // If not locked and access time too long ago - delete the file + accessTime := item.info.ATime + if accessTime.Sub(cutoff) <= 0 { + removeIt = true + } + } + if removeIt { + spaceUsed := item.info.Rs.Size() + if !emptyOnly || spaceUsed == 0 { + spaceFreed = spaceUsed + removed = true + if item._remove("Removing old cache file not in use") { + fs.Errorf(item.name, "item removed when it was writing/uploaded") + } + } + } + return +} + +// Reset is called by the cache purge functions only to reset (empty the contents) cache files that +// are not dirty. It is used when cache space runs out and we see some ENOSPC error. +func (item *Item) Reset() (rr ResetResult, spaceFreed int64, err error) { + item.mu.Lock() + defer item.mu.Unlock() + + // The item is not being used now. Just remove it instead of resetting it. + if item.opens == 0 && !item.metaDirty && !item.info.Dirty { + spaceFreed = item.info.Rs.Size() + if item._remove("Removing old cache file not in use") { + fs.Errorf(item.name, "item removed when it was writing/uploaded") + } + return RemovedNotInUse, spaceFreed, nil + } + + // do not reset dirty file + if item.info.Dirty { + return SkippedDirty, 0, nil + } + + /* A wait on pendingAccessCnt to become 0 can lead to deadlock when an item.Open bumps + up the pendingAccesses count, calls item.open, which calls cache.put. The cache.put + operation needs the cache mutex, which is held here. We skip this file now. The + caller (the cache cleaner thread) may retry resetting this item if the cache size does + not reduce below quota. */ + if item.pendingAccesses > 0 { + return SkippedPendingAccess, 0, nil + } + + /* Do not need to reset an empty cache file unless it was being reset and the reset failed. + Some thread(s) may be waiting on the reset's succesful completion in that case. */ + if item.info.Rs.Size() == 0 && item.beingReset == false { + return SkippedEmpty, 0, nil + } + + item.beingReset = true + + /* Error handling from this point on (setting item.fd and item.beingReset): + Since Reset is called by the cache cleaner thread, there is no direct way to return + the error to the io threads. Set item.fd to nil upon internal errors, so that the + io threads will return internal errors seeing a nil fd. In the case when the error + is ENOSPC, keep the item in isBeingReset state and that will keep the item.ReadAt + waiting at its beginning. The cache purge loop will try to redo the reset after cache + space is made available again. This recovery design should allow most io threads to + eventually go through, unless large files are written/overwritten concurrently and + the total size of these files exceed the cache storage limit. */ + + // Close the downloaders + // Accumulate and log errors + checkErr := func(e error) { + if e != nil { + fs.Errorf(item.o, "vfs cache: item reset failed: %v", e) + if err == nil { + err = e + } + } + } + + if downloaders := item.downloaders; downloaders != nil { + item.downloaders = nil + // FIXME need to unlock to kill downloader - should we + // re-arrange locking so this isn't necessary? maybe + // downloader should use the item mutex for locking? or put a + // finer lock on Rs? + // + // downloader.Write calls ensure which needs the lock + // close downloader with mutex unlocked + item.mu.Unlock() + checkErr(downloaders.Close(nil)) + item.mu.Lock() + } + + // close the file handle + // fd can be nil if we tried Reset and failed before because of ENOSPC during reset + if item.fd != nil { + checkErr(item.fd.Close()) + if err != nil { + // Could not close the cache file + item.beingReset = false + item.cond.Broadcast() + return ResetFailed, 0, err + } + item.fd = nil + } + + spaceFreed = item.info.Rs.Size() + + // This should not be possible. We get here only if cache data is not dirty. + if item._remove("cache out of space, item is clean") { + fs.Errorf(item.o, "vfs cache item removed when it was writing/uploaded") + } + + // can we have an item with no dirty data (so that we can get here) and nil item.o at the same time? + fso := item.o + checkErr(item._checkObject(fso)) + if err != nil { + item.beingReset = false + item.cond.Broadcast() + return ResetFailed, spaceFreed, err + } + + osPath := item.c.toOSPath(item.name) + checkErr(item._createFile(osPath)) + if err != nil { + item._remove("cache reset failed on _createFile, removed cache data file") + item.fd = nil // This allows a new Reset redo to have a clean state to deal with + if !fserrors.IsErrNoSpace(err) { + item.beingReset = false + item.cond.Broadcast() + } + return ResetFailed, spaceFreed, err + } + + // Create the downloaders + if item.o != nil { + item.downloaders = downloaders.New(item, item.c.opt, item.name, item.o) + } + + /* The item will stay in the beingReset state if we get an error that prevents us from + reaching this point. The cache purge loop will redo the failed Reset. */ + item.beingReset = false + item.cond.Broadcast() + + return ResetComplete, spaceFreed, err +} + +// ProtectCache either waits for an ongoing cache reset to finish or increases pendingReads +// to protect against cache reset on this item while the thread potentially uses the cache file +// Cache cleaner waits until pendingReads is zero before resetting cache. +func (item *Item) preAccess() { + item.mu.Lock() + defer item.mu.Unlock() + + if item.beingReset { + for { + item.cond.Wait() + if !item.beingReset { + break + } + } + } + item.pendingAccesses++ +} + +// postAccess reduces the pendingReads count enabling cache reset upon ENOSPC +func (item *Item) postAccess() { + item.mu.Lock() + defer item.mu.Unlock() + + item.pendingAccesses-- + item.cond.Broadcast() +} + // _present returns true if the whole file has been downloaded // // call with the lock held @@ -811,6 +1073,10 @@ func (item *Item) _ensure(offset, size int64) (err error) { } r := ranges.Range{Pos: offset, Size: size} present := item.info.Rs.Present(r) + /* This statement simulates a cache space error for test purpose */ + /* if present != true && item.info.Rs.Size() > 32*1024*1024 { + return errors.New("no space left on device") + } */ fs.Debugf(nil, "vfs cache: looking for range=%+v in %+v - present %v", r, item.info.Rs, present) item.mu.Unlock() defer item.mu.Lock() @@ -887,6 +1153,27 @@ func (item *Item) setModTime(modTime time.Time) { // ReadAt bytes from the file at off func (item *Item) ReadAt(b []byte, off int64) (n int, err error) { + n = 0 + for retries := 0; retries < fs.Config.LowLevelRetries; retries++ { + item.preAccess() + n, err = item.readAt(b, off) + item.postAccess() + if err == nil { + break + } + fs.Errorf(item.name, "vfs cache: failed to _ensure cache %v", err) + if !fserrors.IsErrNoSpace(err) && err.Error() != "no space left on device" { + fs.Debugf(item.name, "vfs cache: failed to _ensure cache %v is not out of space", err) + break + } + item.c.KickCleaner() + } + + return n, err +} + +// ReadAt bytes from the file at off +func (item *Item) readAt(b []byte, off int64) (n int, err error) { item.mu.Lock() if item.fd == nil { item.mu.Unlock() @@ -896,15 +1183,17 @@ func (item *Item) ReadAt(b []byte, off int64) (n int, err error) { item.mu.Unlock() return 0, io.EOF } + defer item.mu.Unlock() + err = item._ensure(off, int64(len(b))) if err != nil { - item.mu.Unlock() - return n, err + return 0, err } + item.info.ATime = time.Now() - item.mu.Unlock() - // Do the reading with Item.mu unlocked - return item.fd.ReadAt(b, off) + // Do the reading with Item.mu unlocked and cache protected by preAccess + n, err = item.fd.ReadAt(b, off) + return n, err } // WriteAt bytes to the file at off