frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree/active.go

214 lines
4.6 KiB
Go

package blobovniczatree
import (
"path/filepath"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
)
type activeDB struct {
blz *blobovnicza.Blobovnicza
shDB *sharedDB
}
func (db *activeDB) Blobovnicza() *blobovnicza.Blobovnicza {
return db.blz
}
func (db *activeDB) Close() {
db.shDB.Close()
}
func (db *activeDB) SystemPath() string {
return db.shDB.SystemPath()
}
// activeDBManager manages active blobovnicza instances (that is, those that are being used for Put).
//
// Uses dbManager for opening/closing sharedDB instances.
// Stores a reference to an open active sharedDB, so dbManager does not close it.
// When changing the active sharedDB, releases the reference to the previous active sharedDB.
type activeDBManager struct {
levelToActiveDBGuard *sync.RWMutex
levelToActiveDB map[string]*sharedDB
levelLock *utilSync.KeyLocker[string]
closed bool
dbManager *dbManager
leafWidth uint64
}
func newActiveDBManager(dbManager *dbManager, leafWidth uint64) *activeDBManager {
return &activeDBManager{
levelToActiveDBGuard: &sync.RWMutex{},
levelToActiveDB: make(map[string]*sharedDB),
levelLock: utilSync.NewKeyLocker[string](),
dbManager: dbManager,
leafWidth: leafWidth,
}
}
// GetOpenedActiveDBForLevel returns active DB for level.
// DB must be closed after use.
func (m *activeDBManager) GetOpenedActiveDBForLevel(lvlPath string) (*activeDB, error) {
activeDB, err := m.getCurrentActiveIfOk(lvlPath)
if err != nil {
return nil, err
}
if activeDB != nil {
return activeDB, nil
}
return m.updateAndGetActive(lvlPath)
}
func (m *activeDBManager) Open() {
m.levelToActiveDBGuard.Lock()
defer m.levelToActiveDBGuard.Unlock()
m.closed = false
}
func (m *activeDBManager) Close() {
m.levelToActiveDBGuard.Lock()
defer m.levelToActiveDBGuard.Unlock()
for _, db := range m.levelToActiveDB {
db.Close()
}
m.levelToActiveDB = make(map[string]*sharedDB)
m.closed = true
}
func (m *activeDBManager) getCurrentActiveIfOk(lvlPath string) (*activeDB, error) {
m.levelToActiveDBGuard.RLock()
defer m.levelToActiveDBGuard.RUnlock()
if m.closed {
return nil, errClosed
}
db, ok := m.levelToActiveDB[lvlPath]
if !ok {
return nil, nil
}
blz, err := db.Open() // open db for usage, will be closed on activeDB.Close()
if err != nil {
return nil, err
}
if blz.IsFull() {
db.Close()
return nil, nil
}
return &activeDB{
blz: blz,
shDB: db,
}, nil
}
func (m *activeDBManager) updateAndGetActive(lvlPath string) (*activeDB, error) {
m.levelLock.Lock(lvlPath)
defer m.levelLock.Unlock(lvlPath)
current, err := m.getCurrentActiveIfOk(lvlPath)
if err != nil {
return nil, err
}
if current != nil {
return current, nil
}
nextShDB, err := m.getNextSharedDB(lvlPath)
if err != nil {
return nil, err
}
if nextShDB == nil {
return nil, nil
}
blz, err := nextShDB.Open() // open db for client, client must call Close() after usage
if err != nil {
return nil, err
}
return &activeDB{
blz: blz,
shDB: nextShDB,
}, nil
}
func (m *activeDBManager) getNextSharedDB(lvlPath string) (*sharedDB, error) {
var idx uint64
var iterCount uint64
hasActive, currentIdx := m.hasActiveDB(lvlPath)
if hasActive {
idx = (currentIdx + 1) % m.leafWidth
}
var next *sharedDB
for iterCount < m.leafWidth {
path := filepath.Join(lvlPath, u64ToHexStringExt(idx))
shDB := m.dbManager.GetByPath(path)
db, err := shDB.Open() // open db to hold active DB open, will be closed if db is full, after m.replace or by activeDBManager.Close()
if err != nil {
return nil, err
}
if db.IsFull() {
shDB.Close()
} else {
next = shDB
break
}
idx = (idx + 1) % m.leafWidth
iterCount++
}
previous, updated := m.replace(lvlPath, next)
if !updated && next != nil {
next.Close() // manager is closed, so don't hold active DB open
}
if updated && previous != nil {
previous.Close()
}
return next, nil
}
func (m *activeDBManager) hasActiveDB(lvlPath string) (bool, uint64) {
m.levelToActiveDBGuard.RLock()
defer m.levelToActiveDBGuard.RUnlock()
if m.closed {
return false, 0
}
db, ok := m.levelToActiveDB[lvlPath]
if !ok {
return false, 0
}
return true, u64FromHexString(filepath.Base(db.SystemPath()))
}
func (m *activeDBManager) replace(lvlPath string, shDB *sharedDB) (*sharedDB, bool) {
m.levelToActiveDBGuard.Lock()
defer m.levelToActiveDBGuard.Unlock()
if m.closed {
return nil, false
}
previous := m.levelToActiveDB[lvlPath]
if shDB == nil {
delete(m.levelToActiveDB, lvlPath)
} else {
m.levelToActiveDB[lvlPath] = shDB
}
return previous, true
}