Fix blobovnicza cache #669

Merged
fyrchik merged 2 commits from dstepanov-yadro/frostfs-node:fix/open_cache_capacity into master 2024-09-04 19:51:03 +00:00
17 changed files with 692 additions and 476 deletions

View file

@ -509,4 +509,5 @@ const (
RuntimeSoftMemoryLimitUpdated = "soft runtime memory limit value updated" RuntimeSoftMemoryLimitUpdated = "soft runtime memory limit value updated"
RuntimeSoftMemoryDefinedWithGOMEMLIMIT = "soft runtime memory defined with GOMEMLIMIT environment variable, config value skipped" RuntimeSoftMemoryDefinedWithGOMEMLIMIT = "soft runtime memory defined with GOMEMLIMIT environment variable, config value skipped"
FailedToCountWritecacheItems = "failed to count writecache items" FailedToCountWritecacheItems = "failed to count writecache items"
AttemtToCloseAlreadyClosedBlobovnicza = "attempt to close an already closed blobovnicza"
) )

View file

@ -3,7 +3,6 @@ package blobovnicza
import ( import (
"context" "context"
"crypto/rand" "crypto/rand"
"errors"
"os" "os"
"testing" "testing"
@ -98,9 +97,9 @@ func TestBlobovnicza(t *testing.T) {
testPutGet(t, blz, oidtest.Address(), objSizeLim, nil, nil) testPutGet(t, blz, oidtest.Address(), objSizeLim, nil, nil)
} }
// from now objects should not be saved // blobovnizca accepts object event if full

Now the check for the fullness of the database is performed at the blobovnicza tree level.

Now the check for the fullness of the database is performed at the blobovnicza tree level.
testPutGet(t, blz, oidtest.Address(), 1024, func(err error) bool { testPutGet(t, blz, oidtest.Address(), 1024, func(err error) bool {
return errors.Is(err, ErrFull) return err == nil
}, nil) }, nil)
require.NoError(t, blz.Close()) require.NoError(t, blz.Close())

View file

@ -23,10 +23,6 @@ type PutPrm struct {
type PutRes struct { type PutRes struct {
} }
// ErrFull is returned when trying to save an
// object to a filled blobovnicza.
var ErrFull = logicerr.New("blobovnicza is full")
// SetAddress sets the address of the saving object. // SetAddress sets the address of the saving object.
func (p *PutPrm) SetAddress(addr oid.Address) { func (p *PutPrm) SetAddress(addr oid.Address) {
p.addr = addr p.addr = addr
@ -65,10 +61,6 @@ func (b *Blobovnicza) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
key := addressKey(prm.addr) key := addressKey(prm.addr)
err := b.boltDB.Batch(func(tx *bbolt.Tx) error { err := b.boltDB.Batch(func(tx *bbolt.Tx) error {
if b.full() {
return ErrFull
}
buck := tx.Bucket(bucketName) buck := tx.Bucket(bucketName)
if buck == nil { if buck == nil {
// expected to happen: // expected to happen:

View file

@ -55,6 +55,6 @@ func (b *Blobovnicza) itemDeleted(itemSize uint64) {
b.metrics.SubOpenBlobovniczaItems(1) b.metrics.SubOpenBlobovniczaItems(1)
} }
func (b *Blobovnicza) full() bool { func (b *Blobovnicza) IsFull() bool {
return b.dataSize.Load() >= b.fullSizeLimit return b.dataSize.Load() >= b.fullSizeLimit
} }

View file

@ -0,0 +1,213 @@
package blobovniczatree
import (
"path/filepath"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
)
type activeDB struct {
blz *blobovnicza.Blobovnicza
shDB *sharedDB
}
func (db *activeDB) Blobovnicza() *blobovnicza.Blobovnicza {
return db.blz
}
func (db *activeDB) Close() {
db.shDB.Close()
}
func (db *activeDB) Path() string {
return db.shDB.Path()
}
// activeDBManager manages active blobovnicza instances (that is, those that are being used for Put).
//
// Uses dbManager for opening/closing sharedDB instances.
// Stores a reference to an open active sharedDB, so dbManager does not close it.
// When changing the active sharedDB, releases the reference to the previous active sharedDB.
type activeDBManager struct {
levelToActiveDBGuard *sync.RWMutex
levelToActiveDB map[string]*sharedDB
levelLock *utilSync.KeyLocker[string]
closed bool
dbManager *dbManager
leafWidth uint64
}
fyrchik marked this conversation as resolved Outdated

levelToActive or levelToShared?

`levelToActive` or `levelToShared`?

Active DB of type sharedDB.

Active DB of type `sharedDB`.

Ok, for some reason I've though shared/active is a pair of mutually exclusive roles.

Ok, for some reason I've though shared/active is a pair of mutually exclusive roles.
func newActiveDBManager(dbManager *dbManager, leafWidth uint64) *activeDBManager {
return &activeDBManager{
levelToActiveDBGuard: &sync.RWMutex{},
levelToActiveDB: make(map[string]*sharedDB),
levelLock: utilSync.NewKeyLocker[string](),
dbManager: dbManager,
leafWidth: leafWidth,
}
}
// GetOpenedActiveDBForLevel returns active DB for level.
// DB must be closed after use.
func (m *activeDBManager) GetOpenedActiveDBForLevel(lvlPath string) (*activeDB, error) {
activeDB, err := m.getCurrentActiveIfOk(lvlPath)
if err != nil {
return nil, err
}
if activeDB != nil {
return activeDB, nil
}
return m.updateAndGetActive(lvlPath)
}
func (m *activeDBManager) Open() {
m.levelToActiveDBGuard.Lock()
defer m.levelToActiveDBGuard.Unlock()
m.closed = false
}
func (m *activeDBManager) Close() {

Could you leave a comment about error handling here? We may found ourselves in a situation when the DB was not closed and later would not be opened because open FD leaked.

Could you leave a comment about error handling here? We may found ourselves in a situation when the DB was not closed and later would not be opened because open FD leaked.
It is existed behaviour: https://git.frostfs.info/TrueCloudLab/frostfs-node/src/branch/master/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go#L111 When closing DB we just log error: see `sharedDB.Close()`

I don't argue, I think it is just something worth thinking about if you touch the code or investigate problems.

I don't argue, I think it is just something worth thinking about if you touch the code or investigate problems.
m.levelToActiveDBGuard.Lock()
defer m.levelToActiveDBGuard.Unlock()
for _, db := range m.levelToActiveDB {
db.Close()
}
m.levelToActiveDB = make(map[string]*sharedDB)
m.closed = true
}
func (m *activeDBManager) getCurrentActiveIfOk(lvlPath string) (*activeDB, error) {
m.levelToActiveDBGuard.RLock()
defer m.levelToActiveDBGuard.RUnlock()
if m.closed {
return nil, errClosed
}
db, ok := m.levelToActiveDB[lvlPath]

db 0 is active (refCount = 1)
goroutine 1: got and opened db 0 (refCount = 2)
goroutine 2: got db 0
goroutine 1: saved object to db 0, db 0 is full, closed (refCount = 1)
goroutine 3: replaced active db from db 0 to db 1, so db 0 was physically closed (refCount = 0)
goroutine 2: physically opens db 0 (refCount = 1)

To prevent this situation active db manager returns active db opened, so user has to close it.

db 0 is active (refCount = 1) goroutine 1: got and opened db 0 (refCount = 2) goroutine 2: got db 0 goroutine 1: saved object to db 0, db 0 is full, closed (refCount = 1) goroutine 3: replaced active db from db 0 to db 1, so db 0 was physically closed (refCount = 0) goroutine 2: physically opens db 0 (refCount = 1) To prevent this situation active db manager returns active db opened, so user has to close it.
if !ok {
return nil, nil
}
blz, err := db.Open() //open db for usage, will be closed on activeDB.Close()
if err != nil {
return nil, err
}
if blz.IsFull() {
db.Close()
return nil, nil
}
return &activeDB{
blz: blz,
shDB: db,
}, nil
}
func (m *activeDBManager) updateAndGetActive(lvlPath string) (*activeDB, error) {
m.levelLock.Lock(lvlPath)
defer m.levelLock.Unlock(lvlPath)
current, err := m.getCurrentActiveIfOk(lvlPath)
if err != nil {
return nil, err
}
if current != nil {
return current, nil
}
nextShDB, err := m.getNextSharedDB(lvlPath)
if err != nil {
return nil, err
}
if nextShDB == nil {
return nil, nil
}
blz, err := nextShDB.Open() // open db for client, client must call Close() after usage
if err != nil {
return nil, err
}
return &activeDB{
blz: blz,
shDB: nextShDB,
}, nil
}
func (m *activeDBManager) getNextSharedDB(lvlPath string) (*sharedDB, error) {
var idx uint64
var iterCount uint64
hasActive, currentIdx := m.hasActiveDB(lvlPath)
if hasActive {
idx = (currentIdx + 1) % m.leafWidth
}
var next *sharedDB
for iterCount < m.leafWidth {
path := filepath.Join(lvlPath, u64ToHexString(idx))
shDB := m.dbManager.GetByPath(path)
db, err := shDB.Open() //open db to hold active DB open, will be closed if db is full, after m.replace or by activeDBManager.Close()
if err != nil {
return nil, err
}
if db.IsFull() {
shDB.Close()
} else {
next = shDB
break
}
idx = (idx + 1) % m.leafWidth
iterCount++
}
previous, updated := m.replace(lvlPath, next)
if !updated && next != nil {
next.Close() // manager is closed, so don't hold active DB open
}
if updated && previous != nil {
previous.Close()
}
return next, nil
}
func (m *activeDBManager) hasActiveDB(lvlPath string) (bool, uint64) {
m.levelToActiveDBGuard.RLock()
defer m.levelToActiveDBGuard.RUnlock()
if m.closed {
return false, 0
}
db, ok := m.levelToActiveDB[lvlPath]
if !ok {
return false, 0
}
return true, u64FromHexString(filepath.Base(db.Path()))
}
func (m *activeDBManager) replace(lvlPath string, shDB *sharedDB) (*sharedDB, bool) {
m.levelToActiveDBGuard.Lock()
defer m.levelToActiveDBGuard.Unlock()
if m.closed {
return nil, false
}
previous := m.levelToActiveDB[lvlPath]
if shDB == nil {
delete(m.levelToActiveDB, lvlPath)
} else {
m.levelToActiveDB[lvlPath] = shDB
}
return previous, true
}

View file

@ -3,19 +3,12 @@ package blobovniczatree
import ( import (
"errors" "errors"
"fmt" "fmt"
"path/filepath"
"strconv" "strconv"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/hrw" "git.frostfs.info/TrueCloudLab/hrw"
"github.com/hashicorp/golang-lru/v2/simplelru"
"go.uber.org/zap"
) )
// Blobovniczas represents the storage of the "small" objects. // Blobovniczas represents the storage of the "small" objects.
@ -61,28 +54,9 @@ import (
type Blobovniczas struct { type Blobovniczas struct {
cfg cfg
// cache of opened filled Blobovniczas commondbManager *dbManager
opened *simplelru.LRU[string, *blobovnicza.Blobovnicza] activeDBManager *activeDBManager
// lruMtx protects opened cache. dbCache *dbCache
// It isn't RWMutex because `Get` calls must
// lock this mutex on write, as LRU info is updated.
// It must be taken after activeMtx in case when eviction is possible
// i.e. `Add`, `Purge` and `Remove` calls.
lruMtx sync.Mutex
// mutex to exclude parallel bbolt.Open() calls
// bbolt.Open() deadlocks if it tries to open already opened file
openMtx sync.Mutex
// list of active (opened, non-filled) Blobovniczas
activeMtx sync.RWMutex
active map[string]blobovniczaWithIndex
}
type blobovniczaWithIndex struct {
ind uint64
blz *blobovnicza.Blobovnicza
} }
var _ common.Storage = (*Blobovniczas)(nil) var _ common.Storage = (*Blobovniczas)(nil)
@ -102,120 +76,13 @@ func NewBlobovniczaTree(opts ...Option) (blz *Blobovniczas) {
blz.blzLeafWidth = blz.blzShallowWidth blz.blzLeafWidth = blz.blzShallowWidth
} }
cache, err := simplelru.NewLRU[string, *blobovnicza.Blobovnicza](blz.openedCacheSize, func(p string, value *blobovnicza.Blobovnicza) { blz.commondbManager = newDBManager(blz.rootPath, blz.blzOpts, blz.blzLeafWidth, blz.readOnly, blz.metrics.Blobovnicza(), blz.log)
lvlPath := filepath.Dir(p) blz.activeDBManager = newActiveDBManager(blz.commondbManager, blz.blzLeafWidth)
if b, ok := blz.active[lvlPath]; ok && b.ind == u64FromHexString(filepath.Base(p)) { blz.dbCache = newDBCache(blz.openedCacheSize, blz.commondbManager)
// This branch is taken if we have recently updated active blobovnicza and remove
// it from opened cache.
return
} else if err := value.Close(); err != nil {
blz.log.Error(logs.BlobovniczatreeCouldNotCloseBlobovnicza,
zap.String("id", p),
zap.String("error", err.Error()),
)
} else {
blz.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyClosedOnEvict,
zap.String("id", p),
)
}
})
if err != nil {
// occurs only if the size is not positive
panic(fmt.Errorf("could not create LRU cache of size %d: %w", blz.openedCacheSize, err))
}
activeMapCapacity := uint64(1)
for i := uint64(0); i < blz.blzShallowDepth; i++ {
if i+1 == blz.blzShallowDepth {
activeMapCapacity *= blz.blzLeafWidth
} else {
activeMapCapacity *= blz.blzShallowWidth
}
}
blz.opened = cache
blz.active = make(map[string]blobovniczaWithIndex, activeMapCapacity)
return blz return blz
} }
// activates and returns activated blobovnicza of p-level (dir).
//
// returns error if blobvnicza could not be activated.
func (b *Blobovniczas) getActivated(lvlPath string) (blobovniczaWithIndex, error) {
return b.updateAndGet(lvlPath, nil)
}
// updates active blobovnicza of p-level (dir).
//
// if current active blobovnicza's index is not old, it remains unchanged.
func (b *Blobovniczas) updateActive(lvlPath string, old *uint64) error {
b.log.Debug(logs.BlobovniczatreeUpdatingActiveBlobovnicza, zap.String("path", lvlPath))
_, err := b.updateAndGet(lvlPath, old)
b.log.Debug(logs.BlobovniczatreeActiveBlobovniczaSuccessfullyUpdated, zap.String("path", lvlPath))
return err
}
// updates and returns active blobovnicza of p-level (dir).
//
// if current active blobovnicza's index is not old, it is returned unchanged.
func (b *Blobovniczas) updateAndGet(lvlPath string, old *uint64) (blobovniczaWithIndex, error) {
b.activeMtx.RLock()
active, ok := b.active[lvlPath]
b.activeMtx.RUnlock()
if ok {
if old != nil {
if active.ind == b.blzLeafWidth-1 {
return active, logicerr.New("no more Blobovniczas")

In general, this is an invalid statement. Imagine the situation: the width of the leaf level is 3, the current active database has index = 2, but all objects were deleted from the database with index = 0. So database with index = 0 is not full and can be next active database.

In general, this is an invalid statement. Imagine the situation: the width of the leaf level is 3, the current active database has index = 2, but all objects were deleted from the database with index = 0. So database with index = 0 is not full and can be next active database.
} else if active.ind != *old {
// sort of CAS in order to control concurrent
// updateActive calls
return active, nil
}
} else {
return active, nil
}
active.ind++
}
var err error
if active.blz, err = b.openBlobovnicza(filepath.Join(lvlPath, u64ToHexString(active.ind))); err != nil {
return active, err
}
b.activeMtx.Lock()
defer b.activeMtx.Unlock()
// check 2nd time to find out if it blobovnicza was activated while thread was locked
tryActive, ok := b.active[lvlPath]
if ok && tryActive.blz == active.blz {
return tryActive, nil
}
// Remove from opened cache (active blobovnicza should always be opened).
// Because `onEvict` callback is called in `Remove`, we need to update
// active map beforehand.
b.active[lvlPath] = active
activePath := filepath.Join(lvlPath, u64ToHexString(active.ind))
b.lruMtx.Lock()
b.opened.Remove(activePath)
if ok {
b.opened.Add(filepath.Join(lvlPath, u64ToHexString(tryActive.ind)), tryActive.blz)
}
b.lruMtx.Unlock()
b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyActivated,
zap.String("path", activePath))
return active, nil
}
// returns hash of the object address. // returns hash of the object address.
func addressHash(addr *oid.Address, path string) uint64 { func addressHash(addr *oid.Address, path string) uint64 {
var a string var a string

View file

@ -0,0 +1,103 @@
package blobovniczatree
import (
"fmt"
"sync"
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
"github.com/hashicorp/golang-lru/v2/simplelru"
)
// dbCache caches sharedDB instances that are NOT open for Put.
//
// Uses dbManager for opening/closing sharedDB instances.
// Stores a reference to an cached sharedDB, so dbManager does not close it.
type dbCache struct {
cacheGuard *sync.RWMutex
cache simplelru.LRUCache[string, *sharedDB]
pathLock *utilSync.KeyLocker[string]
closed bool
dbManager *dbManager
}
func newDBCache(size int, dbManager *dbManager) *dbCache {
cache, err := simplelru.NewLRU[string, *sharedDB](size, func(_ string, evictedDB *sharedDB) {
evictedDB.Close()
})
if err != nil {
// occurs only if the size is not positive
panic(fmt.Errorf("could not create LRU cache of size %d: %w", size, err))
}
return &dbCache{
cacheGuard: &sync.RWMutex{},
cache: cache,
dbManager: dbManager,
pathLock: utilSync.NewKeyLocker[string](),
}
}
func (c *dbCache) Open() {
c.cacheGuard.Lock()
defer c.cacheGuard.Unlock()
c.closed = false
}
func (c *dbCache) Close() {
c.cacheGuard.Lock()
defer c.cacheGuard.Unlock()
c.cache.Purge()
c.closed = true
}
func (c *dbCache) GetOrCreate(path string) *sharedDB {
value := c.getExisted(path)
if value != nil {
return value
}
return c.create(path)
}

cache.Get can rebuild cache, so exclusive lock.

`cache.Get` can rebuild cache, so exclusive lock.
func (c *dbCache) getExisted(path string) *sharedDB {
c.cacheGuard.Lock()
defer c.cacheGuard.Unlock()
if value, ok := c.cache.Get(path); ok {
return value
}
return nil
}
func (c *dbCache) create(path string) *sharedDB {
c.pathLock.Lock(path)
defer c.pathLock.Unlock(path)
value := c.getExisted(path)
if value != nil {
return value
}
value = c.dbManager.GetByPath(path)
_, err := value.Open() //open db to hold reference, closed by evictedDB.Close() or if cache closed
if err != nil {
return value
}
if added := c.put(path, value); !added {
value.Close()
}
return value
}
func (c *dbCache) put(path string, db *sharedDB) bool {
c.cacheGuard.Lock()
defer c.cacheGuard.Unlock()
if !c.closed {
c.cache.Add(path, db)
return true
}
return false
}

View file

@ -0,0 +1,59 @@
package blobovniczatree
import (
"context"
"sync"
"sync/atomic"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
"github.com/stretchr/testify/require"
)
func TestBlobovniczaTree_Concurrency(t *testing.T) {

Test from task.

Test from task.
t.Parallel()
const n = 1000
st := NewBlobovniczaTree(
WithLogger(test.NewLogger(t, true)),
WithObjectSizeLimit(1024),
WithBlobovniczaShallowWidth(10),
WithBlobovniczaShallowDepth(1),
WithRootPath(t.TempDir()))
require.NoError(t, st.Open(false))
require.NoError(t, st.Init())
t.Cleanup(func() {
require.NoError(t, st.Close())
})
objGen := &testutil.SeqObjGenerator{ObjSize: 1}
var cnt atomic.Int64
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for cnt.Add(1) <= n {
obj := objGen.Next()
addr := testutil.AddressFromObject(t, obj)
raw, err := obj.Marshal()
require.NoError(t, err)
_, err = st.Put(context.Background(), common.PutPrm{
Address: addr,
RawData: raw,
})
require.NoError(t, err)
_, err = st.Get(context.Background(), common.GetPrm{Address: addr})
require.NoError(t, err)
fyrchik marked this conversation as resolved Outdated

Still fails?

Still fails?

Ups, comment removed

Ups, comment removed
}
}()
}
wg.Wait()
}

View file

@ -2,11 +2,8 @@ package blobovniczatree
import ( import (
"context" "context"
"fmt"
"path/filepath"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -14,6 +11,7 @@ import (
func (b *Blobovniczas) Open(readOnly bool) error { func (b *Blobovniczas) Open(readOnly bool) error {
b.readOnly = readOnly b.readOnly = readOnly
b.metrics.SetMode(readOnly) b.metrics.SetMode(readOnly)
b.openManagers()
return nil return nil
} }
@ -29,115 +27,40 @@ func (b *Blobovniczas) Init() error {
} }
return b.iterateLeaves(context.TODO(), func(p string) (bool, error) { return b.iterateLeaves(context.TODO(), func(p string) (bool, error) {
blz, err := b.openBlobovniczaNoCache(p) shBlz := b.getBlobovniczaWithoutCaching(p)
_, err := shBlz.Open()
if err != nil { if err != nil {
return true, err return true, err
} }
defer blz.Close() defer shBlz.Close()
if err := blz.Init(); err != nil {
return true, fmt.Errorf("could not initialize blobovnicza structure %s: %w", p, err)
}
b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p)) b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p))
return false, nil return false, nil
}) })
} }
func (b *Blobovniczas) openManagers() {
b.commondbManager.Open() //order important
b.activeDBManager.Open()
b.dbCache.Open()
}
// Close implements common.Storage. // Close implements common.Storage.
func (b *Blobovniczas) Close() error { func (b *Blobovniczas) Close() error {
b.activeMtx.Lock() b.dbCache.Close() //order important
b.activeDBManager.Close()
b.lruMtx.Lock() b.commondbManager.Close()
for p, v := range b.active {
if err := v.blz.Close(); err != nil {
b.log.Debug(logs.BlobovniczatreeCouldNotCloseActiveBlobovnicza,
zap.String("path", p),
zap.String("error", err.Error()),
)
}
b.opened.Remove(p)
}
for _, k := range b.opened.Keys() {
blz, _ := b.opened.Get(k)
if err := blz.Close(); err != nil {
b.log.Debug(logs.BlobovniczatreeCouldNotCloseActiveBlobovnicza,
zap.String("path", k),
zap.String("error", err.Error()),
)
}
b.opened.Remove(k)
}
b.active = make(map[string]blobovniczaWithIndex)
b.metrics.Close()
b.lruMtx.Unlock()
b.activeMtx.Unlock()
return nil return nil
} }
// opens and returns blobovnicza with path p. // returns blobovnicza with path p
// //
// If blobovnicza is already opened and cached, instance from cache is returned w/o changes. // If blobovnicza is already cached, instance from cache is returned w/o changes.
func (b *Blobovniczas) openBlobovnicza(p string) (*blobovnicza.Blobovnicza, error) { func (b *Blobovniczas) getBlobovnicza(p string) *sharedDB {
b.lruMtx.Lock() return b.dbCache.GetOrCreate(p)
v, ok := b.opened.Get(p)
b.lruMtx.Unlock()
if ok {
// blobovnicza should be opened in cache
return v, nil
}
lvlPath := filepath.Dir(p)
curIndex := u64FromHexString(filepath.Base(p))
b.activeMtx.RLock()
defer b.activeMtx.RUnlock()
active, ok := b.active[lvlPath]
if ok && active.ind == curIndex {
return active.blz, nil
}
b.lruMtx.Lock()
defer b.lruMtx.Unlock()
v, ok = b.opened.Get(p)
if ok {
return v, nil
}
blz, err := b.openBlobovniczaNoCache(p)
if err != nil {
return nil, err
}
b.opened.Add(p, blz)
return blz, nil
} }

Is the name and the comment still valid?

Is the name and the comment still valid?

Yes, they are still valid.

Yes, they are still valid.

Then why do we call Open() on the result at the callsites?

Then why do we call `Open()` on the result at the callsites?

Ok, then just return. Fixed.

Ok, then just return. Fixed.
func (b *Blobovniczas) openBlobovniczaNoCache(p string) (*blobovnicza.Blobovnicza, error) { func (b *Blobovniczas) getBlobovniczaWithoutCaching(p string) *sharedDB {
b.openMtx.Lock() return b.commondbManager.GetByPath(p)
defer b.openMtx.Unlock()
path := filepath.Join(b.rootPath, p)
blz := blobovnicza.New(append(b.blzOpts,
blobovnicza.WithReadOnly(b.readOnly),
blobovnicza.WithPath(path),
blobovnicza.WithMetrics(b.metrics.Blobovnicza()),
)...)
if err := blz.Open(); err != nil {
return nil, fmt.Errorf("could not open blobovnicza %s: %w", p, err)
}
if err := blz.Init(); err != nil {
return nil, fmt.Errorf("could not init blobovnicza %s: %w", p, err)
}
return blz, nil
} }

View file

@ -3,7 +3,6 @@ package blobovniczatree
import ( import (
"context" "context"
"encoding/hex" "encoding/hex"
"path/filepath"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -48,10 +47,12 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
if prm.StorageID != nil { if prm.StorageID != nil {
id := blobovnicza.NewIDFromBytes(prm.StorageID) id := blobovnicza.NewIDFromBytes(prm.StorageID)
blz, err := b.openBlobovnicza(id.String()) shBlz := b.getBlobovnicza(id.String())
blz, err := shBlz.Open()
if err != nil { if err != nil {
return res, err return res, err
} }
defer shBlz.Close()
if res, err = b.deleteObject(ctx, blz, bPrm); err == nil { if res, err = b.deleteObject(ctx, blz, bPrm); err == nil {
success = true success = true
@ -59,16 +60,10 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
return res, err return res, err
} }
activeCache := make(map[string]struct{})

It was a protection in case the active database was changed during the iteration. Now changing the active database does not affect the opening of the database.

It was a protection in case the active database was changed during the iteration. Now changing the active database does not affect the opening of the database.
objectFound := false objectFound := false
err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) { err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
dirPath := filepath.Dir(p) res, err = b.deleteObjectFromLevel(ctx, bPrm, p)
// don't process active blobovnicza of the level twice
_, ok := activeCache[dirPath]
res, err = b.deleteObjectFromLevel(ctx, bPrm, p, !ok)
if err != nil { if err != nil {
if !client.IsErrObjectNotFound(err) { if !client.IsErrObjectNotFound(err) {
b.log.Debug(logs.BlobovniczatreeCouldNotRemoveObjectFromLevel, b.log.Debug(logs.BlobovniczatreeCouldNotRemoveObjectFromLevel,
@ -78,8 +73,6 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
} }
} }
activeCache[dirPath] = struct{}{}
if err == nil { if err == nil {
objectFound = true objectFound = true
} }
@ -100,57 +93,13 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
// tries to delete object from particular blobovnicza. // tries to delete object from particular blobovnicza.
// //
// returns no error if object was removed from some blobovnicza of the same level. // returns no error if object was removed from some blobovnicza of the same level.
func (b *Blobovniczas) deleteObjectFromLevel(ctx context.Context, prm blobovnicza.DeletePrm, blzPath string, tryActive bool) (common.DeleteRes, error) { func (b *Blobovniczas) deleteObjectFromLevel(ctx context.Context, prm blobovnicza.DeletePrm, blzPath string) (common.DeleteRes, error) {
lvlPath := filepath.Dir(blzPath) shBlz := b.getBlobovnicza(blzPath)
blz, err := shBlz.Open()
// try to remove from blobovnicza if it is opened
b.lruMtx.Lock()
v, ok := b.opened.Get(blzPath)
b.lruMtx.Unlock()
if ok {
if res, err := b.deleteObject(ctx, v, prm); err == nil {
return res, err
} else if !client.IsErrObjectNotFound(err) {
b.log.Debug(logs.BlobovniczatreeCouldNotRemoveObjectFromOpenedBlobovnicza,
zap.String("path", blzPath),
zap.String("error", err.Error()),
)
}
}
// therefore the object is possibly placed in a lighter blobovnicza
// next we check in the active level blobobnicza:
// * the active blobovnicza is always opened.
b.activeMtx.RLock()
active, ok := b.active[lvlPath]
b.activeMtx.RUnlock()
if ok && tryActive {
if res, err := b.deleteObject(ctx, active.blz, prm); err == nil {
return res, err
} else if !client.IsErrObjectNotFound(err) {
b.log.Debug(logs.BlobovniczatreeCouldNotRemoveObjectFromActiveBlobovnicza,
zap.String("path", blzPath),
zap.String("error", err.Error()),
)
}
}
// then object is possibly placed in closed blobovnicza
// check if it makes sense to try to open the blob
// (Blobovniczas "after" the active one are empty anyway,
// and it's pointless to open them).
if u64FromHexString(filepath.Base(blzPath)) > active.ind {
return common.DeleteRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
// open blobovnicza (cached inside)
blz, err := b.openBlobovnicza(blzPath)
if err != nil { if err != nil {
return common.DeleteRes{}, err return common.DeleteRes{}, err
} }
defer shBlz.Close()
return b.deleteObject(ctx, blz, prm) return b.deleteObject(ctx, blz, prm)
} }

View file

@ -7,6 +7,8 @@ import (
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
) )
var errClosed = logicerr.New("blobvnicza is closed")

Error is logical to not to count as shard error.

Error is logical to not to count as shard error.
func isErrOutOfRange(err error) bool { func isErrOutOfRange(err error) bool {
var target *apistatus.ObjectOutOfRange var target *apistatus.ObjectOutOfRange
return errors.As(err, &target) return errors.As(err, &target)

View file

@ -3,7 +3,6 @@ package blobovniczatree
import ( import (
"context" "context"
"encoding/hex" "encoding/hex"
"path/filepath"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -37,26 +36,22 @@ func (b *Blobovniczas) Exists(ctx context.Context, prm common.ExistsPrm) (common
if prm.StorageID != nil { if prm.StorageID != nil {
id := blobovnicza.NewIDFromBytes(prm.StorageID) id := blobovnicza.NewIDFromBytes(prm.StorageID)
blz, err := b.openBlobovnicza(id.String()) shBlz := b.getBlobovnicza(id.String())
blz, err := shBlz.Open()
if err != nil { if err != nil {
return common.ExistsRes{}, err return common.ExistsRes{}, err
} }
defer shBlz.Close()
exists, err := blz.Exists(ctx, prm.Address) exists, err := blz.Exists(ctx, prm.Address)
return common.ExistsRes{Exists: exists}, err return common.ExistsRes{Exists: exists}, err
} }
activeCache := make(map[string]struct{})
var gPrm blobovnicza.GetPrm var gPrm blobovnicza.GetPrm
gPrm.SetAddress(prm.Address) gPrm.SetAddress(prm.Address)
err := b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) { err := b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
dirPath := filepath.Dir(p) _, err := b.getObjectFromLevel(ctx, gPrm, p)
_, ok := activeCache[dirPath]
_, err := b.getObjectFromLevel(ctx, gPrm, p, !ok)
if err != nil { if err != nil {
if !client.IsErrObjectNotFound(err) { if !client.IsErrObjectNotFound(err) {
b.log.Debug(logs.BlobovniczatreeCouldNotGetObjectFromLevel, b.log.Debug(logs.BlobovniczatreeCouldNotGetObjectFromLevel,
@ -65,7 +60,6 @@ func (b *Blobovniczas) Exists(ctx context.Context, prm common.ExistsPrm) (common
} }
} }
activeCache[dirPath] = struct{}{}
found = err == nil found = err == nil
return found, nil return found, nil
}) })

View file

@ -4,7 +4,6 @@ import (
"context" "context"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"path/filepath"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -48,10 +47,12 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
if prm.StorageID != nil { if prm.StorageID != nil {
id := blobovnicza.NewIDFromBytes(prm.StorageID) id := blobovnicza.NewIDFromBytes(prm.StorageID)
blz, err := b.openBlobovnicza(id.String()) shBlz := b.getBlobovnicza(id.String())
blz, err := shBlz.Open()
if err != nil { if err != nil {
return res, err return res, err
} }
defer shBlz.Close()
res, err = b.getObject(ctx, blz, bPrm) res, err = b.getObject(ctx, blz, bPrm)
if err == nil { if err == nil {
@ -61,14 +62,8 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
return res, err return res, err
} }
activeCache := make(map[string]struct{})
err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) { err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
dirPath := filepath.Dir(p) res, err = b.getObjectFromLevel(ctx, bPrm, p)
_, ok := activeCache[dirPath]
res, err = b.getObjectFromLevel(ctx, bPrm, p, !ok)
if err != nil { if err != nil {
if !client.IsErrObjectNotFound(err) { if !client.IsErrObjectNotFound(err) {
b.log.Debug(logs.BlobovniczatreeCouldNotGetObjectFromLevel, b.log.Debug(logs.BlobovniczatreeCouldNotGetObjectFromLevel,
@ -78,8 +73,6 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
} }
} }
activeCache[dirPath] = struct{}{}
// abort iterator if found, otherwise process all Blobovniczas // abort iterator if found, otherwise process all Blobovniczas
return err == nil, nil return err == nil, nil
}) })
@ -98,58 +91,14 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
// tries to read object from particular blobovnicza. // tries to read object from particular blobovnicza.
// //
// returns error if object could not be read from any blobovnicza of the same level. // returns error if object could not be read from any blobovnicza of the same level.
func (b *Blobovniczas) getObjectFromLevel(ctx context.Context, prm blobovnicza.GetPrm, blzPath string, tryActive bool) (common.GetRes, error) { func (b *Blobovniczas) getObjectFromLevel(ctx context.Context, prm blobovnicza.GetPrm, blzPath string) (common.GetRes, error) {
lvlPath := filepath.Dir(blzPath)
// try to read from blobovnicza if it is opened
b.lruMtx.Lock()
v, ok := b.opened.Get(blzPath)
b.lruMtx.Unlock()
if ok {
if res, err := b.getObject(ctx, v, prm); err == nil {
return res, err
} else if !client.IsErrObjectNotFound(err) {
b.log.Debug(logs.BlobovniczatreeCouldNotReadObjectFromOpenedBlobovnicza,
zap.String("path", blzPath),
zap.String("error", err.Error()),
)
}
}
// therefore the object is possibly placed in a lighter blobovnicza
// next we check in the active level blobobnicza:
// * the freshest objects are probably the most demanded;
// * the active blobovnicza is always opened.
b.activeMtx.RLock()
active, ok := b.active[lvlPath]
b.activeMtx.RUnlock()
if ok && tryActive {
if res, err := b.getObject(ctx, active.blz, prm); err == nil {
return res, err
} else if !client.IsErrObjectNotFound(err) {
b.log.Debug(logs.BlobovniczatreeCouldNotGetObjectFromActiveBlobovnicza,
zap.String("path", blzPath),
zap.String("error", err.Error()),
)
}
}
// then object is possibly placed in closed blobovnicza
// check if it makes sense to try to open the blob
// (Blobovniczas "after" the active one are empty anyway,
// and it's pointless to open them).
if u64FromHexString(filepath.Base(blzPath)) > active.ind {
return common.GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
// open blobovnicza (cached inside) // open blobovnicza (cached inside)
blz, err := b.openBlobovnicza(blzPath) shBlz := b.getBlobovnicza(blzPath)
blz, err := shBlz.Open()
if err != nil { if err != nil {
return common.GetRes{}, err return common.GetRes{}, err
} }
defer shBlz.Close()
return b.getObject(ctx, blz, prm) return b.getObject(ctx, blz, prm)
} }

View file

@ -4,7 +4,6 @@ import (
"context" "context"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"path/filepath"
"strconv" "strconv"
"time" "time"
@ -47,10 +46,12 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re
if prm.StorageID != nil { if prm.StorageID != nil {
id := blobovnicza.NewIDFromBytes(prm.StorageID) id := blobovnicza.NewIDFromBytes(prm.StorageID)
blz, err := b.openBlobovnicza(id.String()) shBlz := b.getBlobovnicza(id.String())
blz, err := shBlz.Open()
if err != nil { if err != nil {
return common.GetRangeRes{}, err return common.GetRangeRes{}, err
} }
defer shBlz.Close()
res, err := b.getObjectRange(ctx, blz, prm) res, err := b.getObjectRange(ctx, blz, prm)
if err == nil { if err == nil {
@ -60,15 +61,10 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re
return res, err return res, err
} }
activeCache := make(map[string]struct{})
objectFound := false objectFound := false
err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) { err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
dirPath := filepath.Dir(p) res, err = b.getRangeFromLevel(ctx, prm, p)
_, ok := activeCache[dirPath]
res, err = b.getRangeFromLevel(ctx, prm, p, !ok)
if err != nil { if err != nil {
outOfBounds := isErrOutOfRange(err) outOfBounds := isErrOutOfRange(err)
if !outOfBounds && !client.IsErrObjectNotFound(err) { if !outOfBounds && !client.IsErrObjectNotFound(err) {
@ -82,8 +78,6 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re
} }
} }
activeCache[dirPath] = struct{}{}
objectFound = err == nil objectFound = err == nil
// abort iterator if found, otherwise process all Blobovniczas // abort iterator if found, otherwise process all Blobovniczas
@ -106,68 +100,14 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re
// tries to read range of object payload data from particular blobovnicza. // tries to read range of object payload data from particular blobovnicza.
// //
// returns error if object could not be read from any blobovnicza of the same level. // returns error if object could not be read from any blobovnicza of the same level.
func (b *Blobovniczas) getRangeFromLevel(ctx context.Context, prm common.GetRangePrm, blzPath string, tryActive bool) (common.GetRangeRes, error) { func (b *Blobovniczas) getRangeFromLevel(ctx context.Context, prm common.GetRangePrm, blzPath string) (common.GetRangeRes, error) {
lvlPath := filepath.Dir(blzPath)
// try to read from blobovnicza if it is opened
b.lruMtx.Lock()
v, ok := b.opened.Get(blzPath)
b.lruMtx.Unlock()
if ok {
res, err := b.getObjectRange(ctx, v, prm)
switch {
case err == nil,
isErrOutOfRange(err):
return res, err
default:
if !client.IsErrObjectNotFound(err) {
b.log.Debug(logs.BlobovniczatreeCouldNotReadPayloadRangeFromOpenedBlobovnicza,
zap.String("path", blzPath),
zap.String("error", err.Error()),
)
}
}
}
// therefore the object is possibly placed in a lighter blobovnicza
// next we check in the active level blobobnicza:
// * the freshest objects are probably the most demanded;
// * the active blobovnicza is always opened.
b.activeMtx.RLock()
active, ok := b.active[lvlPath]
b.activeMtx.RUnlock()
if ok && tryActive {
res, err := b.getObjectRange(ctx, active.blz, prm)
switch {
case err == nil,
isErrOutOfRange(err):
return res, err
default:
if !client.IsErrObjectNotFound(err) {
b.log.Debug(logs.BlobovniczatreeCouldNotReadPayloadRangeFromActiveBlobovnicza,
zap.String("path", blzPath),
zap.String("error", err.Error()),
)
}
}
}
// then object is possibly placed in closed blobovnicza
// check if it makes sense to try to open the blob
// (Blobovniczas "after" the active one are empty anyway,
// and it's pointless to open them).
if u64FromHexString(filepath.Base(blzPath)) > active.ind {
return common.GetRangeRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
// open blobovnicza (cached inside) // open blobovnicza (cached inside)
blz, err := b.openBlobovnicza(blzPath) shBlz := b.getBlobovnicza(blzPath)
blz, err := shBlz.Open()
if err != nil { if err != nil {
return common.GetRangeRes{}, err return common.GetRangeRes{}, err
} }
defer shBlz.Close()
return b.getObjectRange(ctx, blz, prm) return b.getObjectRange(ctx, blz, prm)
} }

View file

@ -68,13 +68,15 @@ func (b *Blobovniczas) Iterate(ctx context.Context, prm common.IteratePrm) (comm
// iterator over all Blobovniczas in unsorted order. Break on f's error return. // iterator over all Blobovniczas in unsorted order. Break on f's error return.
func (b *Blobovniczas) iterateBlobovniczas(ctx context.Context, ignoreErrors bool, f func(string, *blobovnicza.Blobovnicza) error) error { func (b *Blobovniczas) iterateBlobovniczas(ctx context.Context, ignoreErrors bool, f func(string, *blobovnicza.Blobovnicza) error) error {
return b.iterateLeaves(ctx, func(p string) (bool, error) { return b.iterateLeaves(ctx, func(p string) (bool, error) {
blz, err := b.openBlobovnicza(p) shBlz := b.getBlobovnicza(p)
blz, err := shBlz.Open()
if err != nil { if err != nil {
if ignoreErrors { if ignoreErrors {
return false, nil return false, nil
} }
return false, fmt.Errorf("could not open blobovnicza %s: %w", p, err) return false, fmt.Errorf("could not open blobovnicza %s: %w", p, err)
} }
defer shBlz.Close()
err = f(p, blz) err = f(p, blz)

View file

@ -0,0 +1,242 @@
package blobovniczatree
import (
"fmt"
"path/filepath"
"sync"
"sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
// sharedDB is responsible for opening and closing a file of single blobovnicza.
type sharedDB struct {
guard *sync.RWMutex
blcza *blobovnicza.Blobovnicza
refCount uint32
openDBCounter *openDBCounter
closedFlag *atomic.Bool
options []blobovnicza.Option
path string
readOnly bool
metrics blobovnicza.Metrics
log *logger.Logger
}
func newSharedDB(options []blobovnicza.Option, path string, readOnly bool,
metrics blobovnicza.Metrics, openDBCounter *openDBCounter, closedFlag *atomic.Bool, log *logger.Logger) *sharedDB {
return &sharedDB{
guard: &sync.RWMutex{},
options: options,
path: path,
readOnly: readOnly,
metrics: metrics,
closedFlag: closedFlag,
log: log,
openDBCounter: openDBCounter,
}
}
func (b *sharedDB) Open() (*blobovnicza.Blobovnicza, error) {
if b.closedFlag.Load() {
return nil, errClosed
}
b.guard.Lock()
defer b.guard.Unlock()
if b.refCount > 0 {
b.refCount++
return b.blcza, nil
}
blz := blobovnicza.New(append(b.options,
blobovnicza.WithReadOnly(b.readOnly),
blobovnicza.WithPath(b.path),
blobovnicza.WithMetrics(b.metrics),
)...)
if err := blz.Open(); err != nil {
return nil, fmt.Errorf("could not open blobovnicza %s: %w", b.path, err)
}
if err := blz.Init(); err != nil {
return nil, fmt.Errorf("could not init blobovnicza %s: %w", b.path, err)
}
b.refCount++
b.blcza = blz
b.openDBCounter.Inc()
return blz, nil
}
func (b *sharedDB) Close() {
b.guard.Lock()
defer b.guard.Unlock()
fyrchik marked this conversation as resolved Outdated

What is the expected scenario for this to happen?

What is the expected scenario for this to happen?

Developer mistake.

Developer mistake.

It's worth some high-level logs then (I would panic, but it is not obvious why it couldn't happen)

It's worth some high-level logs then (I would panic, but it is not obvious why it couldn't happen)

Added error log

Added error log
if b.refCount == 0 {
b.log.Error(logs.AttemtToCloseAlreadyClosedBlobovnicza, zap.String("id", b.path))
return
}
if b.refCount == 1 {
b.refCount = 0
if err := b.blcza.Close(); err != nil {
b.log.Error(logs.BlobovniczatreeCouldNotCloseBlobovnicza,
zap.String("id", b.path),
zap.String("error", err.Error()),
)
}
b.blcza = nil
b.openDBCounter.Dec()
return
}
b.refCount--
}
func (b *sharedDB) Path() string {
return b.path
}
// levelDbManager stores pointers of the sharedDB's for the leaf directory of the blobovnicza tree.
type levelDbManager struct {
databases []*sharedDB
}
func newLevelDBManager(width uint64, options []blobovnicza.Option, rootPath string, lvlPath string,
readOnly bool, metrics blobovnicza.Metrics, openDBCounter *openDBCounter, closedFlog *atomic.Bool, log *logger.Logger) *levelDbManager {
result := &levelDbManager{
fyrchik marked this conversation as resolved Outdated

It is not used outside the loop, why not go with idx := 0?

It is not used outside the loop, why not go with `idx := 0`?

Fixed

Fixed
databases: make([]*sharedDB, width),
}
for idx := uint64(0); idx < width; idx++ {
result.databases[idx] = newSharedDB(options, filepath.Join(rootPath, lvlPath, u64ToHexString(idx)), readOnly, metrics, openDBCounter, closedFlog, log)
}
return result
}
func (m *levelDbManager) GetByIndex(idx uint64) *sharedDB {
return m.databases[idx]
}
// dbManager manages the opening and closing of blobovnicza instances.
//
// The blobovnicza opens at the first request, closes after the last request.
type dbManager struct {
levelToManager map[string]*levelDbManager
levelToManagerGuard *sync.RWMutex
closedFlag *atomic.Bool
dbCounter *openDBCounter
rootPath string
options []blobovnicza.Option
readOnly bool
metrics blobovnicza.Metrics
leafWidth uint64
log *logger.Logger
}
func newDBManager(rootPath string, options []blobovnicza.Option, leafWidth uint64, readOnly bool, metrics blobovnicza.Metrics, log *logger.Logger) *dbManager {
return &dbManager{
rootPath: rootPath,
options: options,
readOnly: readOnly,
metrics: metrics,
leafWidth: leafWidth,
levelToManager: make(map[string]*levelDbManager),
levelToManagerGuard: &sync.RWMutex{},
log: log,
closedFlag: &atomic.Bool{},
dbCounter: newOpenDBCounter(),
}
}
func (m *dbManager) GetByPath(path string) *sharedDB {
lvlPath := filepath.Dir(path)
curIndex := u64FromHexString(filepath.Base(path))
levelManager := m.getLevelManager(lvlPath)
return levelManager.GetByIndex(curIndex)
}
func (m *dbManager) Open() {
m.closedFlag.Store(false)
}
func (m *dbManager) Close() {
m.closedFlag.Store(true)
m.dbCounter.WaitUntilAllClosed()
}
func (m *dbManager) getLevelManager(lvlPath string) *levelDbManager {
result := m.getLevelManagerIfExists(lvlPath)
if result != nil {
return result
}
return m.getOrCreateLevelManager(lvlPath)
}
func (m *dbManager) getLevelManagerIfExists(lvlPath string) *levelDbManager {
m.levelToManagerGuard.RLock()
defer m.levelToManagerGuard.RUnlock()
return m.levelToManager[lvlPath]
}
func (m *dbManager) getOrCreateLevelManager(lvlPath string) *levelDbManager {
m.levelToManagerGuard.Lock()
defer m.levelToManagerGuard.Unlock()
if result, ok := m.levelToManager[lvlPath]; ok {
return result
}
result := newLevelDBManager(m.leafWidth, m.options, m.rootPath, lvlPath, m.readOnly, m.metrics, m.dbCounter, m.closedFlag, m.log)
m.levelToManager[lvlPath] = result

Yes, it's not Go way. But it looks simpler than mutex-protected channel.

Yes, it's not Go way. But it looks simpler than mutex-protected channel.
return result
}
type openDBCounter struct {
cond *sync.Cond
count uint64
}
func newOpenDBCounter() *openDBCounter {
return &openDBCounter{
cond: &sync.Cond{
L: &sync.Mutex{},
},
}
}
func (c *openDBCounter) Inc() {
c.cond.L.Lock()
defer c.cond.L.Unlock()
c.count++
}
fyrchik marked this conversation as resolved Outdated

It there an expected situation when c.count == 0 here?

It there an expected situation when `c.count == 0` here?

Only if there is a developer error.

Only if there is a developer error.
func (c *openDBCounter) Dec() {
c.cond.L.Lock()
defer c.cond.L.Unlock()
if c.count > 0 {
c.count--
}
if c.count == 0 {
c.cond.Broadcast()
}
}
func (c *openDBCounter) WaitUntilAllClosed() {
c.cond.L.Lock()
for c.count > 0 {
c.cond.Wait()
}
c.cond.L.Unlock()
}

View file

@ -2,7 +2,6 @@ package blobovniczatree
import ( import (
"context" "context"
"errors"
"path/filepath" "path/filepath"
"time" "time"
@ -10,7 +9,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
"go.uber.org/zap" "go.uber.org/zap"
@ -76,8 +74,8 @@ type putIterator struct {
PutPrm blobovnicza.PutPrm PutPrm blobovnicza.PutPrm
} }
func (i *putIterator) iterate(ctx context.Context, path string) (bool, error) { func (i *putIterator) iterate(ctx context.Context, lvlPath string) (bool, error) {
active, err := i.B.getActivated(path) active, err := i.B.activeDBManager.GetOpenedActiveDBForLevel(lvlPath)
if err != nil { if err != nil {
if !isLogical(err) { if !isLogical(err) {
i.B.reportError(logs.BlobovniczatreeCouldNotGetActiveBlobovnicza, err) i.B.reportError(logs.BlobovniczatreeCouldNotGetActiveBlobovnicza, err)
@ -89,46 +87,29 @@ func (i *putIterator) iterate(ctx context.Context, path string) (bool, error) {
return false, nil return false, nil
} }
if _, err := active.blz.Put(ctx, i.PutPrm); err != nil { if active == nil {
// Check if blobovnicza is full. We could either receive `blobovnicza.ErrFull` error i.B.log.Debug(logs.BlobovniczatreeBlobovniczaOverflowed, zap.String("level", lvlPath))
// or update active blobovnicza in other thread. In the latter case the database will be closed return false, nil
// and `updateActive` takes care of not updating the active blobovnicza twice. }
if isFull := errors.Is(err, blobovnicza.ErrFull); isFull || errors.Is(err, bbolt.ErrDatabaseNotOpen) { defer active.Close()
if isFull {
i.B.log.Debug(logs.BlobovniczatreeBlobovniczaOverflowed,
zap.String("path", filepath.Join(path, u64ToHexString(active.ind))))
}
if err := i.B.updateActive(path, &active.ind); err != nil { i.AllFull = false
if !isLogical(err) {
i.B.reportError(logs.BlobovniczatreeCouldNotUpdateActiveBlobovnicza, err)
} else {
i.B.log.Debug(logs.BlobovniczatreeCouldNotUpdateActiveBlobovnicza,
zap.String("level", path),
zap.String("error", err.Error()))
}
return false, nil _, err = active.Blobovnicza().Put(ctx, i.PutPrm)
} if err != nil {
return i.iterate(ctx, path)
}
i.AllFull = false
if !isLogical(err) { if !isLogical(err) {
i.B.reportError(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, err) i.B.reportError(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, err)
} else { } else {
i.B.log.Debug(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, i.B.log.Debug(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza,
zap.String("path", filepath.Join(path, u64ToHexString(active.ind))), zap.String("path", active.Path()),
zap.String("error", err.Error())) zap.String("error", err.Error()))
} }
return false, nil return false, nil
} }
path = filepath.Join(path, u64ToHexString(active.ind)) idx := u64FromHexString(filepath.Base(active.Path()))
i.ID = blobovnicza.NewIDFromBytes([]byte(filepath.Join(lvlPath, u64ToHexString(idx))))
i.ID = blobovnicza.NewIDFromBytes([]byte(path))
return true, nil return true, nil
} }