fs/cache: make sure we call the Shutdown method on backends
This change ensures we call the Shutdown method on backends when they drop out of the fs/cache and at program exit. Some backends implement the optional fs.Shutdowner interface. Until now, Shutdown is only checked and called, when a backend is wrapped (e.g. crypt, compress, ...). To have a general way to perform operations at the end of the backend lifecycle with proper error handling, we can call Shutdown at cache clear time. We add a finalize hook to the cache which will be called when values drop out of the cache. Previous discussion: https://forum.rclone.org/t/31336
This commit is contained in:
parent
326c43ab3f
commit
5de9278650
4 changed files with 74 additions and 7 deletions
|
@ -321,6 +321,12 @@ func Run(Retry bool, showStats bool, cmd *cobra.Command, f func() error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// clear cache and shutdown backends
|
||||||
|
cache.Clear()
|
||||||
|
if lastErr := accounting.GlobalStats().GetLastError(); cmdErr == nil {
|
||||||
|
cmdErr = lastErr
|
||||||
|
}
|
||||||
|
|
||||||
// Log the final error message and exit
|
// Log the final error message and exit
|
||||||
if cmdErr != nil {
|
if cmdErr != nil {
|
||||||
nerrs := accounting.GlobalStats().GetErrors()
|
nerrs := accounting.GlobalStats().GetErrors()
|
||||||
|
|
5
fs/cache/cache.go
vendored
5
fs/cache/cache.go
vendored
|
@ -25,6 +25,11 @@ func createOnFirstUse() {
|
||||||
c = cache.New()
|
c = cache.New()
|
||||||
c.SetExpireDuration(ci.FsCacheExpireDuration)
|
c.SetExpireDuration(ci.FsCacheExpireDuration)
|
||||||
c.SetExpireInterval(ci.FsCacheExpireInterval)
|
c.SetExpireInterval(ci.FsCacheExpireInterval)
|
||||||
|
c.SetFinalizer(func(value interface{}) {
|
||||||
|
if s, ok := value.(fs.Shutdowner); ok {
|
||||||
|
_ = fs.CountError(s.Shutdown(context.Background()))
|
||||||
|
}
|
||||||
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
36
lib/cache/cache.go
vendored
36
lib/cache/cache.go
vendored
|
@ -16,6 +16,7 @@ type Cache struct {
|
||||||
expireRunning bool
|
expireRunning bool
|
||||||
expireDuration time.Duration // expire the cache entry when it is older than this
|
expireDuration time.Duration // expire the cache entry when it is older than this
|
||||||
expireInterval time.Duration // interval to run the cache expire
|
expireInterval time.Duration // interval to run the cache expire
|
||||||
|
finalize func(value interface{})
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a new cache with the default expire duration and interval
|
// New creates a new cache with the default expire duration and interval
|
||||||
|
@ -25,6 +26,7 @@ func New() *Cache {
|
||||||
expireRunning: false,
|
expireRunning: false,
|
||||||
expireDuration: 300 * time.Second,
|
expireDuration: 300 * time.Second,
|
||||||
expireInterval: 60 * time.Second,
|
expireInterval: 60 * time.Second,
|
||||||
|
finalize: func(_ interface{}) {},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -154,7 +156,10 @@ func (c *Cache) GetMaybe(key string) (value interface{}, found bool) {
|
||||||
// Returns true if the entry was found
|
// Returns true if the entry was found
|
||||||
func (c *Cache) Delete(key string) bool {
|
func (c *Cache) Delete(key string) bool {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
_, found := c.cache[key]
|
entry, found := c.cache[key]
|
||||||
|
if found {
|
||||||
|
c.finalize(entry.value)
|
||||||
|
}
|
||||||
delete(c.cache, key)
|
delete(c.cache, key)
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
return found
|
return found
|
||||||
|
@ -165,11 +170,13 @@ func (c *Cache) Delete(key string) bool {
|
||||||
// Returns number of entries deleted
|
// Returns number of entries deleted
|
||||||
func (c *Cache) DeletePrefix(prefix string) (deleted int) {
|
func (c *Cache) DeletePrefix(prefix string) (deleted int) {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
for k := range c.cache {
|
for key, entry := range c.cache {
|
||||||
if strings.HasPrefix(k, prefix) {
|
if !strings.HasPrefix(key, prefix) {
|
||||||
delete(c.cache, k)
|
continue
|
||||||
deleted++
|
|
||||||
}
|
}
|
||||||
|
c.finalize(entry.value)
|
||||||
|
delete(c.cache, key)
|
||||||
|
deleted++
|
||||||
}
|
}
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
return deleted
|
return deleted
|
||||||
|
@ -183,12 +190,17 @@ func (c *Cache) Rename(oldKey, newKey string) (value interface{}, found bool) {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
if newEntry, newFound := c.cache[newKey]; newFound {
|
if newEntry, newFound := c.cache[newKey]; newFound {
|
||||||
// If new entry is found use that
|
// If new entry is found use that
|
||||||
|
if _, oldFound := c.cache[oldKey]; oldFound {
|
||||||
|
// If there's an old entry, we drop it and also try shutdown.
|
||||||
|
c.finalize(c.cache[oldKey].value)
|
||||||
|
}
|
||||||
delete(c.cache, oldKey)
|
delete(c.cache, oldKey)
|
||||||
value, found = newEntry.value, newFound
|
value, found = newEntry.value, newFound
|
||||||
c.used(newEntry)
|
c.used(newEntry)
|
||||||
} else if oldEntry, oldFound := c.cache[oldKey]; oldFound {
|
} else if oldEntry, oldFound := c.cache[oldKey]; oldFound {
|
||||||
// If old entry is found rename it to new and use that
|
// If old entry is found rename it to new and use that
|
||||||
c.cache[newKey] = oldEntry
|
c.cache[newKey] = oldEntry
|
||||||
|
// No need to shutdown here, as value lives on under newKey
|
||||||
delete(c.cache, oldKey)
|
delete(c.cache, oldKey)
|
||||||
c.used(oldEntry)
|
c.used(oldEntry)
|
||||||
value, found = oldEntry.value, oldFound
|
value, found = oldEntry.value, oldFound
|
||||||
|
@ -204,6 +216,7 @@ func (c *Cache) cacheExpire() {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
for key, entry := range c.cache {
|
for key, entry := range c.cache {
|
||||||
if entry.pinCount <= 0 && now.Sub(entry.lastUsed) > c.expireDuration {
|
if entry.pinCount <= 0 && now.Sub(entry.lastUsed) > c.expireDuration {
|
||||||
|
c.finalize(entry.value)
|
||||||
delete(c.cache, key)
|
delete(c.cache, key)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -218,10 +231,12 @@ func (c *Cache) cacheExpire() {
|
||||||
// Clear removes everything from the cache
|
// Clear removes everything from the cache
|
||||||
func (c *Cache) Clear() {
|
func (c *Cache) Clear() {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
for k := range c.cache {
|
for key, entry := range c.cache {
|
||||||
delete(c.cache, k)
|
c.finalize(entry.value)
|
||||||
|
delete(c.cache, key)
|
||||||
}
|
}
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Entries returns the number of entries in the cache
|
// Entries returns the number of entries in the cache
|
||||||
|
@ -231,3 +246,10 @@ func (c *Cache) Entries() int {
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
return entries
|
return entries
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetFinalizer sets a function to be called when a value drops out of the cache
|
||||||
|
func (c *Cache) SetFinalizer(finalize func(interface{})) {
|
||||||
|
c.mu.Lock()
|
||||||
|
c.finalize = finalize
|
||||||
|
c.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
34
lib/cache/cache_test.go
vendored
34
lib/cache/cache_test.go
vendored
|
@ -333,3 +333,37 @@ func TestCacheRename(t *testing.T) {
|
||||||
|
|
||||||
assert.Equal(t, 1, c.Entries())
|
assert.Equal(t, 1, c.Entries())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCacheFinalize(t *testing.T) {
|
||||||
|
c := New()
|
||||||
|
numCalled := 0
|
||||||
|
c.SetFinalizer(func(v interface{}) {
|
||||||
|
numCalled++
|
||||||
|
})
|
||||||
|
create := func(path string) (interface{}, bool, error) {
|
||||||
|
return path, true, nil
|
||||||
|
}
|
||||||
|
_, _ = c.Get("ok", create)
|
||||||
|
assert.Equal(t, 0, numCalled)
|
||||||
|
c.Clear()
|
||||||
|
assert.Equal(t, 1, numCalled)
|
||||||
|
|
||||||
|
_, _ = c.Get("ok", create)
|
||||||
|
c.Delete("ok")
|
||||||
|
assert.Equal(t, 2, numCalled)
|
||||||
|
|
||||||
|
_, _ = c.Get("ok", create)
|
||||||
|
c.DeletePrefix("ok")
|
||||||
|
assert.Equal(t, 3, numCalled)
|
||||||
|
|
||||||
|
_, _ = c.Get("old", create)
|
||||||
|
_, _ = c.Get("new", create)
|
||||||
|
c.Rename("old", "new")
|
||||||
|
assert.Equal(t, 4, numCalled)
|
||||||
|
|
||||||
|
c.expireDuration = 1 * time.Millisecond
|
||||||
|
_, _ = c.Get("ok", create)
|
||||||
|
time.Sleep(2 * time.Millisecond)
|
||||||
|
c.cacheExpire() // "ok" and "new" fall out of cache
|
||||||
|
assert.Equal(t, 6, numCalled)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue