frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree/active.go

210 lines
4.8 KiB
Go
Raw Normal View History

package blobovniczatree
import (
"context"
"path/filepath"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
)
type activeDB struct {
blz *blobovnicza.Blobovnicza
shDB *sharedDB
}
func (db *activeDB) Blobovnicza() *blobovnicza.Blobovnicza {
return db.blz
}
func (db *activeDB) Close(ctx context.Context) {
db.shDB.Close(ctx)
}
func (db *activeDB) SystemPath() string {
return db.shDB.SystemPath()
}
// activeDBManager manages active blobovnicza instances (that is, those that are being used for Put).
//
// Uses dbManager for opening/closing sharedDB instances.
// Stores a reference to an open active sharedDB, so dbManager does not close it.
// When changing the active sharedDB, releases the reference to the previous active sharedDB.
type activeDBManager struct {
levelToActiveDBGuard *sync.RWMutex
levelToActiveDB map[string]*sharedDB
levelLock *utilSync.KeyLocker[string]
closed bool
dbManager *dbManager
rootPath string
}
func newActiveDBManager(dbManager *dbManager, rootPath string) *activeDBManager {
return &activeDBManager{
levelToActiveDBGuard: &sync.RWMutex{},
levelToActiveDB: make(map[string]*sharedDB),
levelLock: utilSync.NewKeyLocker[string](),
dbManager: dbManager,
rootPath: rootPath,
}
}
// GetOpenedActiveDBForLevel returns active DB for level.
// DB must be closed after use.
func (m *activeDBManager) GetOpenedActiveDBForLevel(ctx context.Context, lvlPath string) (*activeDB, error) {
activeDB, err := m.getCurrentActiveIfOk(ctx, lvlPath)
if err != nil {
return nil, err
}
if activeDB != nil {
return activeDB, nil
}
return m.updateAndGetActive(ctx, lvlPath)
}
func (m *activeDBManager) Open() {
m.levelToActiveDBGuard.Lock()
defer m.levelToActiveDBGuard.Unlock()
m.closed = false
}
func (m *activeDBManager) Close(ctx context.Context) {
m.levelToActiveDBGuard.Lock()
defer m.levelToActiveDBGuard.Unlock()
for _, db := range m.levelToActiveDB {
db.Close(ctx)
}
m.levelToActiveDB = make(map[string]*sharedDB)
m.closed = true
}
func (m *activeDBManager) getCurrentActiveIfOk(ctx context.Context, lvlPath string) (*activeDB, error) {
m.levelToActiveDBGuard.RLock()
defer m.levelToActiveDBGuard.RUnlock()
if m.closed {
return nil, errClosed
}
db, ok := m.levelToActiveDB[lvlPath]
if !ok {
return nil, nil
}
blz, err := db.Open(ctx) // open db for usage, will be closed on activeDB.Close()
if err != nil {
return nil, err
}
if blz.IsFull() {
db.Close(ctx)
return nil, nil
}
return &activeDB{
blz: blz,
shDB: db,
}, nil
}
func (m *activeDBManager) updateAndGetActive(ctx context.Context, lvlPath string) (*activeDB, error) {
m.levelLock.Lock(lvlPath)
defer m.levelLock.Unlock(lvlPath)
current, err := m.getCurrentActiveIfOk(ctx, lvlPath)
if err != nil {
return nil, err
}
if current != nil {
return current, nil
}
nextShDB, err := m.getNextSharedDB(ctx, lvlPath)
if err != nil {
return nil, err
}
if nextShDB == nil {
return nil, nil
}
blz, err := nextShDB.Open(ctx) // open db for client, client must call Close() after usage
if err != nil {
return nil, err
}
return &activeDB{
blz: blz,
shDB: nextShDB,
}, nil
}
func (m *activeDBManager) getNextSharedDB(ctx context.Context, lvlPath string) (*sharedDB, error) {
var nextActiveDBIdx uint64
hasActive, currentIdx := m.hasActiveDB(lvlPath)
if hasActive {
nextActiveDBIdx = currentIdx + 1
} else {
hasDBs, maxIdx, err := getBlobovniczaMaxIndex(filepath.Join(m.rootPath, lvlPath))
if err != nil {
return nil, err
}
if hasDBs {
nextActiveDBIdx = maxIdx
}
}
path := filepath.Join(lvlPath, u64ToHexStringExt(nextActiveDBIdx))
next := m.dbManager.GetByPath(path)
_, err := next.Open(ctx) // open db to hold active DB open, will be closed if db is full, after m.replace or by activeDBManager.Close()
if err != nil {
return nil, err
}
previous, updated := m.replace(lvlPath, next)
if !updated && next != nil {
next.Close(ctx) // manager is closed, so don't hold active DB open
}
if updated && previous != nil {
previous.Close(ctx)
}
return next, nil
}
func (m *activeDBManager) hasActiveDB(lvlPath string) (bool, uint64) {
m.levelToActiveDBGuard.RLock()
defer m.levelToActiveDBGuard.RUnlock()
if m.closed {
return false, 0
}
db, ok := m.levelToActiveDB[lvlPath]
if !ok {
return false, 0
}
return true, u64FromHexString(filepath.Base(db.SystemPath()))
}
func (m *activeDBManager) replace(lvlPath string, shDB *sharedDB) (*sharedDB, bool) {
m.levelToActiveDBGuard.Lock()
defer m.levelToActiveDBGuard.Unlock()
if m.closed {
return nil, false
}
previous := m.levelToActiveDB[lvlPath]
if shDB == nil {
delete(m.levelToActiveDB, lvlPath)
} else {
m.levelToActiveDB[lvlPath] = shDB
}
return previous, true
}