forked from TrueCloudLab/frostfs-node
[#536] blobovnicza: Add blobovniczatree DB cache
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
c672f59ab8
commit
7456c8556a
10 changed files with 124 additions and 20 deletions
|
@ -56,6 +56,7 @@ type Blobovniczas struct {
|
|||
|
||||
commondbManager *dbManager
|
||||
activeDBManager *activeDBManager
|
||||
dbCache *dbCache
|
||||
}
|
||||
|
||||
var _ common.Storage = (*Blobovniczas)(nil)
|
||||
|
@ -77,6 +78,7 @@ func NewBlobovniczaTree(opts ...Option) (blz *Blobovniczas) {
|
|||
|
||||
blz.commondbManager = newDBManager(blz.rootPath, blz.blzOpts, blz.blzLeafWidth, blz.readOnly, blz.metrics.Blobovnicza(), blz.log)
|
||||
blz.activeDBManager = newActiveDBManager(blz.commondbManager, blz.blzLeafWidth)
|
||||
blz.dbCache = newDBCache(blz.openedCacheSize, blz.commondbManager)
|
||||
|
||||
return blz
|
||||
}
|
||||
|
|
103
pkg/local_object_storage/blobstor/blobovniczatree/cache.go
Normal file
103
pkg/local_object_storage/blobstor/blobovniczatree/cache.go
Normal file
|
@ -0,0 +1,103 @@
|
|||
package blobovniczatree
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
|
||||
"github.com/hashicorp/golang-lru/v2/simplelru"
|
||||
)
|
||||
|
||||
// dbCache caches sharedDB instances that are NOT open for Put.
|
||||
//
|
||||
// Uses dbManager for opening/closing sharedDB instances.
|
||||
// Stores a reference to an cached sharedDB, so dbManager does not close it.
|
||||
type dbCache struct {
|
||||
cacheGuard *sync.RWMutex
|
||||
cache simplelru.LRUCache[string, *sharedDB]
|
||||
pathLock *utilSync.KeyLocker[string]
|
||||
closed bool
|
||||
|
||||
dbManager *dbManager
|
||||
}
|
||||
|
||||
func newDBCache(size int, dbManager *dbManager) *dbCache {
|
||||
cache, err := simplelru.NewLRU[string, *sharedDB](size, func(_ string, evictedDB *sharedDB) {
|
||||
evictedDB.Close()
|
||||
})
|
||||
if err != nil {
|
||||
// occurs only if the size is not positive
|
||||
panic(fmt.Errorf("could not create LRU cache of size %d: %w", size, err))
|
||||
}
|
||||
return &dbCache{
|
||||
cacheGuard: &sync.RWMutex{},
|
||||
cache: cache,
|
||||
dbManager: dbManager,
|
||||
pathLock: utilSync.NewKeyLocker[string](),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *dbCache) Open() {
|
||||
c.cacheGuard.Lock()
|
||||
defer c.cacheGuard.Unlock()
|
||||
|
||||
c.closed = false
|
||||
}
|
||||
|
||||
func (c *dbCache) Close() {
|
||||
c.cacheGuard.Lock()
|
||||
defer c.cacheGuard.Unlock()
|
||||
c.cache.Purge()
|
||||
c.closed = true
|
||||
}
|
||||
|
||||
func (c *dbCache) GetOrCreate(path string) *sharedDB {
|
||||
value := c.getExisted(path)
|
||||
if value != nil {
|
||||
return value
|
||||
}
|
||||
return c.create(path)
|
||||
}
|
||||
|
||||
func (c *dbCache) getExisted(path string) *sharedDB {
|
||||
c.cacheGuard.Lock()
|
||||
defer c.cacheGuard.Unlock()
|
||||
|
||||
if value, ok := c.cache.Get(path); ok {
|
||||
return value
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *dbCache) create(path string) *sharedDB {
|
||||
c.pathLock.Lock(path)
|
||||
defer c.pathLock.Unlock(path)
|
||||
|
||||
value := c.getExisted(path)
|
||||
if value != nil {
|
||||
return value
|
||||
}
|
||||
|
||||
value = c.dbManager.GetByPath(path)
|
||||
|
||||
_, err := value.Open() //open db to hold reference, closed by evictedDB.Close() or if cache closed
|
||||
if err != nil {
|
||||
return value
|
||||
}
|
||||
if added := c.put(path, value); !added {
|
||||
value.Close()
|
||||
}
|
||||
return value
|
||||
}
|
||||
|
||||
func (c *dbCache) put(path string, db *sharedDB) bool {
|
||||
c.cacheGuard.Lock()
|
||||
defer c.cacheGuard.Unlock()
|
||||
|
||||
if !c.closed {
|
||||
c.cache.Add(path, db)
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
|
@ -50,7 +50,7 @@ func TestBlobovniczaTree_Concurrency(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
_, err = st.Get(context.Background(), common.GetPrm{Address: addr})
|
||||
require.NoError(t, err) // fails very often, correlated to how many goroutines are started
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
|
|
@ -21,15 +21,13 @@ func (b *Blobovniczas) Open(readOnly bool) error {
|
|||
func (b *Blobovniczas) Init() error {
|
||||
b.log.Debug(logs.BlobovniczatreeInitializingBlobovniczas)
|
||||
|
||||
b.openManagers()
|
||||
|
||||
if b.readOnly {
|
||||
b.log.Debug(logs.BlobovniczatreeReadonlyModeSkipBlobovniczasInitialization)
|
||||
return nil
|
||||
}
|
||||
|
||||
return b.iterateLeaves(context.TODO(), func(p string) (bool, error) {
|
||||
shBlz := b.openBlobovniczaNoCache(p)
|
||||
shBlz := b.getBlobovniczaWithoutCaching(p)
|
||||
_, err := shBlz.Open()
|
||||
if err != nil {
|
||||
return true, err
|
||||
|
@ -44,23 +42,25 @@ func (b *Blobovniczas) Init() error {
|
|||
func (b *Blobovniczas) openManagers() {
|
||||
b.commondbManager.Open() //order important
|
||||
b.activeDBManager.Open()
|
||||
b.dbCache.Open()
|
||||
}
|
||||
|
||||
// Close implements common.Storage.
|
||||
func (b *Blobovniczas) Close() error {
|
||||
b.activeDBManager.Close() //order important
|
||||
b.dbCache.Close() //order important
|
||||
b.activeDBManager.Close()
|
||||
b.commondbManager.Close()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// opens and returns blobovnicza with path p.
|
||||
// returns blobovnicza with path p
|
||||
//
|
||||
// If blobovnicza is already opened and cached, instance from cache is returned w/o changes.
|
||||
func (b *Blobovniczas) openBlobovnicza(p string) *sharedDB {
|
||||
return b.openBlobovniczaNoCache(p)
|
||||
// If blobovnicza is already cached, instance from cache is returned w/o changes.
|
||||
func (b *Blobovniczas) getBlobovnicza(p string) *sharedDB {
|
||||
return b.dbCache.GetOrCreate(p)
|
||||
}
|
||||
|
||||
func (b *Blobovniczas) openBlobovniczaNoCache(p string) *sharedDB {
|
||||
func (b *Blobovniczas) getBlobovniczaWithoutCaching(p string) *sharedDB {
|
||||
return b.commondbManager.GetByPath(p)
|
||||
}
|
||||
|
|
|
@ -47,7 +47,7 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
|
|||
|
||||
if prm.StorageID != nil {
|
||||
id := blobovnicza.NewIDFromBytes(prm.StorageID)
|
||||
shBlz := b.openBlobovnicza(id.String())
|
||||
shBlz := b.getBlobovnicza(id.String())
|
||||
blz, err := shBlz.Open()
|
||||
if err != nil {
|
||||
return res, err
|
||||
|
@ -94,7 +94,7 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
|
|||
//
|
||||
// returns no error if object was removed from some blobovnicza of the same level.
|
||||
func (b *Blobovniczas) deleteObjectFromLevel(ctx context.Context, prm blobovnicza.DeletePrm, blzPath string) (common.DeleteRes, error) {
|
||||
shBlz := b.openBlobovnicza(blzPath)
|
||||
shBlz := b.getBlobovnicza(blzPath)
|
||||
blz, err := shBlz.Open()
|
||||
if err != nil {
|
||||
return common.DeleteRes{}, err
|
||||
|
|
|
@ -36,7 +36,7 @@ func (b *Blobovniczas) Exists(ctx context.Context, prm common.ExistsPrm) (common
|
|||
|
||||
if prm.StorageID != nil {
|
||||
id := blobovnicza.NewIDFromBytes(prm.StorageID)
|
||||
shBlz := b.openBlobovnicza(id.String())
|
||||
shBlz := b.getBlobovnicza(id.String())
|
||||
blz, err := shBlz.Open()
|
||||
if err != nil {
|
||||
return common.ExistsRes{}, err
|
||||
|
|
|
@ -47,7 +47,7 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
|
|||
|
||||
if prm.StorageID != nil {
|
||||
id := blobovnicza.NewIDFromBytes(prm.StorageID)
|
||||
shBlz := b.openBlobovnicza(id.String())
|
||||
shBlz := b.getBlobovnicza(id.String())
|
||||
blz, err := shBlz.Open()
|
||||
if err != nil {
|
||||
return res, err
|
||||
|
@ -93,7 +93,7 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
|
|||
// returns error if object could not be read from any blobovnicza of the same level.
|
||||
func (b *Blobovniczas) getObjectFromLevel(ctx context.Context, prm blobovnicza.GetPrm, blzPath string) (common.GetRes, error) {
|
||||
// open blobovnicza (cached inside)
|
||||
shBlz := b.openBlobovnicza(blzPath)
|
||||
shBlz := b.getBlobovnicza(blzPath)
|
||||
blz, err := shBlz.Open()
|
||||
if err != nil {
|
||||
return common.GetRes{}, err
|
||||
|
|
|
@ -46,7 +46,7 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re
|
|||
|
||||
if prm.StorageID != nil {
|
||||
id := blobovnicza.NewIDFromBytes(prm.StorageID)
|
||||
shBlz := b.openBlobovnicza(id.String())
|
||||
shBlz := b.getBlobovnicza(id.String())
|
||||
blz, err := shBlz.Open()
|
||||
if err != nil {
|
||||
return common.GetRangeRes{}, err
|
||||
|
@ -102,7 +102,7 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re
|
|||
// returns error if object could not be read from any blobovnicza of the same level.
|
||||
func (b *Blobovniczas) getRangeFromLevel(ctx context.Context, prm common.GetRangePrm, blzPath string) (common.GetRangeRes, error) {
|
||||
// open blobovnicza (cached inside)
|
||||
shBlz := b.openBlobovnicza(blzPath)
|
||||
shBlz := b.getBlobovnicza(blzPath)
|
||||
blz, err := shBlz.Open()
|
||||
if err != nil {
|
||||
return common.GetRangeRes{}, err
|
||||
|
|
|
@ -68,7 +68,7 @@ func (b *Blobovniczas) Iterate(ctx context.Context, prm common.IteratePrm) (comm
|
|||
// iterator over all Blobovniczas in unsorted order. Break on f's error return.
|
||||
func (b *Blobovniczas) iterateBlobovniczas(ctx context.Context, ignoreErrors bool, f func(string, *blobovnicza.Blobovnicza) error) error {
|
||||
return b.iterateLeaves(ctx, func(p string) (bool, error) {
|
||||
shBlz := b.openBlobovnicza(p)
|
||||
shBlz := b.getBlobovnicza(p)
|
||||
blz, err := shBlz.Open()
|
||||
if err != nil {
|
||||
if ignoreErrors {
|
||||
|
|
|
@ -114,8 +114,7 @@ func newLevelDBManager(width uint64, options []blobovnicza.Option, rootPath stri
|
|||
result := &levelDbManager{
|
||||
databases: make([]*sharedDB, width),
|
||||
}
|
||||
var idx uint64
|
||||
for idx = 0; idx < width; idx++ {
|
||||
for idx := uint64(0); idx < width; idx++ {
|
||||
result.databases[idx] = newSharedDB(options, filepath.Join(rootPath, lvlPath, u64ToHexString(idx)), readOnly, metrics, openDBCounter, closedFlog, log)
|
||||
}
|
||||
return result
|
||||
|
|
Loading…
Reference in a new issue