cache: refactor entries caching pattern for #1904 (#1924)

This commit is contained in:
remusb 2017-12-18 14:55:37 +02:00 committed by GitHub
parent 29d34426bc
commit 6b5989712f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 559 additions and 310 deletions

View file

@ -130,7 +130,6 @@ func (b *Persistent) Connect() error {
})
b.db = db
return nil
}
@ -159,36 +158,6 @@ func (b *Persistent) getBucket(dir string, createIfMissing bool, tx *bolt.Tx) *b
return bucket
}
// updateRootTs is a convenience method to update an object timestamp to mark that it was used recently
func (b *Persistent) updateRootTs(tx *bolt.Tx, path string, t time.Duration) {
tsBucket := tx.Bucket([]byte(RootTsBucket))
ts := time.Now().Add(t)
found := false
// delete previous timestamps for the same object
c := tsBucket.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
if bytes.Equal(v, []byte(path)) {
if tsInCache := time.Unix(0, btoi(k)); tsInCache.After(ts) && !found {
found = true
continue
}
err := c.Delete()
if err != nil {
fs.Debugf(path, "failed to clean object: %v", err)
}
}
}
if found {
return
}
err := tsBucket.Put(itob(ts.UnixNano()), []byte(path))
if err != nil {
fs.Debugf(path, "failed to timestamp chunk: %v", err)
}
}
// AddDir will update a CachedDirectory metadata and all its entries
func (b *Persistent) AddDir(cachedDir *Directory) error {
return b.db.Update(func(tx *bolt.Tx) error {
@ -205,8 +174,6 @@ func (b *Persistent) AddDir(cachedDir *Directory) error {
if err != nil {
return err
}
b.updateRootTs(tx, cachedDir.abs(), cachedDir.CacheFs.fileAge)
return nil
})
}
@ -225,7 +192,7 @@ func (b *Persistent) GetDirEntries(cachedDir *Directory) (fs.DirEntries, error)
if val != nil {
err := json.Unmarshal(val, cachedDir)
if err != nil {
fs.Debugf(cachedDir.abs(), "error during unmarshalling obj: %v", err)
return errors.Errorf("error during unmarshalling obj: %v", err)
}
} else {
return errors.Errorf("missing cached dir: %v", cachedDir)
@ -276,7 +243,32 @@ func (b *Persistent) GetDirEntries(cachedDir *Directory) (fs.DirEntries, error)
// RemoveDir will delete a CachedDirectory, all its objects and all the chunks stored for it
func (b *Persistent) RemoveDir(fp string) error {
err := b.ExpireDir(fp)
var err error
parentDir, dirName := path.Split(fp)
if fp == "" {
err = b.db.Update(func(tx *bolt.Tx) error {
err := tx.DeleteBucket([]byte(RootBucket))
if err != nil {
fs.Debugf(fp, "couldn't delete from cache: %v", err)
return err
}
_, _ = tx.CreateBucketIfNotExists([]byte(RootBucket))
return nil
})
} else {
err = b.db.Update(func(tx *bolt.Tx) error {
bucket := b.getBucket(cleanPath(parentDir), false, tx)
if bucket == nil {
return errors.Errorf("couldn't open bucket (%v)", fp)
}
// delete the cached dir
err := bucket.DeleteBucket([]byte(cleanPath(dirName)))
if err != nil {
fs.Debugf(fp, "couldn't delete from cache: %v", err)
}
return nil
})
}
// delete chunks on disk
// safe to ignore as the files might not have been open
@ -289,35 +281,33 @@ func (b *Persistent) RemoveDir(fp string) error {
// ExpireDir will flush a CachedDirectory and all its objects from the objects
// chunks will remain as they are
func (b *Persistent) ExpireDir(fp string) error {
parentDir, dirName := path.Split(fp)
if fp == "" {
return b.db.Update(func(tx *bolt.Tx) error {
err := tx.DeleteBucket([]byte(RootBucket))
if err != nil {
fs.Debugf(fp, "couldn't delete from cache: %v", err)
return err
}
err = tx.DeleteBucket([]byte(RootTsBucket))
if err != nil {
fs.Debugf(fp, "couldn't delete from cache: %v", err)
return err
}
_, _ = tx.CreateBucketIfNotExists([]byte(RootBucket))
_, _ = tx.CreateBucketIfNotExists([]byte(RootTsBucket))
return nil
})
}
func (b *Persistent) ExpireDir(cd *Directory) error {
t := time.Now().Add(cd.CacheFs.fileAge * -1)
cd.CacheTs = &t
// expire all parents
return b.db.Update(func(tx *bolt.Tx) error {
bucket := b.getBucket(cleanPath(parentDir), false, tx)
if bucket == nil {
return errors.Errorf("couldn't open bucket (%v)", fp)
}
// delete the cached dir
err := bucket.DeleteBucket([]byte(cleanPath(dirName)))
if err != nil {
fs.Debugf(fp, "couldn't delete from cache: %v", err)
// expire all the parents
currentDir := cd.abs()
for { // until we get to the root
bucket := b.getBucket(currentDir, false, tx)
if bucket != nil {
val := bucket.Get([]byte("."))
if val != nil {
cd2 := &Directory{CacheFs: cd.CacheFs}
err := json.Unmarshal(val, cd2)
if err == nil {
fs.Debugf(cd, "cache: expired %v", currentDir)
cd2.CacheTs = &t
enc2, _ := json.Marshal(cd2)
_ = bucket.Put([]byte("."), enc2)
}
}
}
if currentDir == "" {
break
}
currentDir = cleanPath(path.Dir(currentDir))
}
return nil
})
@ -354,7 +344,6 @@ func (b *Persistent) AddObject(cachedObject *Object) error {
if err != nil {
return errors.Errorf("couldn't cache object (%v) info: %v", cachedObject, err)
}
b.updateRootTs(tx, cachedObject.abs(), cachedObject.CacheFs.fileAge)
return nil
})
}
@ -543,54 +532,6 @@ func (b *Persistent) CleanChunksBySize(maxSize int64) {
}
}
// CleanEntriesByAge will cleanup on a cron basis
func (b *Persistent) CleanEntriesByAge(entryAge time.Duration) {
b.cleanupMux.Lock()
defer b.cleanupMux.Unlock()
var cntEntries int
err := b.db.Update(func(tx *bolt.Tx) error {
min := itob(0)
max := itob(time.Now().UnixNano())
rootTsBucket := tx.Bucket([]byte(RootTsBucket))
if rootTsBucket == nil {
return errors.Errorf("Couldn't open (%v) bucket", rootTsBucket)
}
// iterate through ts
c := rootTsBucket.Cursor()
for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) <= 0; k, v = c.Next() {
if v == nil {
continue
}
// get the path
absPath := string(v)
absDir, absName := path.Split(absPath)
// delete this ts entry
err := c.Delete()
if err != nil {
fs.Errorf(absPath, "failed deleting object during cleanup: %v", err)
continue
}
// search for the entry in the root bucket, skip it if it's not found
parentBucket := b.getBucket(cleanPath(absDir), false, tx)
if parentBucket == nil {
continue
}
_ = parentBucket.Delete([]byte(cleanPath(absName)))
cntEntries = cntEntries + 1
}
fs.Infof("cache", "deleted (%v) entries", cntEntries)
return nil
})
if err != nil {
fs.Errorf("cache", "cleanup failed: %v", err)
}
}
// Stats returns a go map with the stats key values
func (b *Persistent) Stats() (map[string]map[string]interface{}, error) {
r := make(map[string]map[string]interface{})
@ -670,6 +611,9 @@ func (b *Persistent) Stats() (map[string]map[string]interface{}, error) {
// Purge will flush the entire cache
func (b *Persistent) Purge() {
b.cleanupMux.Lock()
defer b.cleanupMux.Unlock()
_ = b.db.Update(func(tx *bolt.Tx) error {
_ = tx.DeleteBucket([]byte(RootBucket))
_ = tx.DeleteBucket([]byte(RootTsBucket))
@ -716,25 +660,6 @@ func (b *Persistent) GetChunkTs(path string, offset int64) (time.Time, error) {
return t, err
}
// GetRootTs retrieves the current timestamp of an object or dir
func (b *Persistent) GetRootTs(path string) (time.Time, error) {
var t time.Time
err := b.db.View(func(tx *bolt.Tx) error {
tsBucket := tx.Bucket([]byte(RootTsBucket))
c := tsBucket.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
if bytes.Equal(v, []byte(path)) {
t = time.Unix(0, btoi(k))
return nil
}
}
return errors.Errorf("not found %v", path)
})
return t, err
}
func (b *Persistent) iterateBuckets(buk *bolt.Bucket, bucketFn func(name string), kvFn func(key string, val []byte)) error {
err := b.db.View(func(tx *bolt.Tx) error {
var c *bolt.Cursor
@ -766,6 +691,9 @@ func (b *Persistent) iterateBuckets(buk *bolt.Bucket, bucketFn func(name string)
// Close should be called when the program ends gracefully
func (b *Persistent) Close() {
b.cleanupMux.Lock()
defer b.cleanupMux.Unlock()
err := b.db.Close()
if err != nil {
fs.Errorf(b, "closing handle: %v", err)