209 lines
4.8 KiB
Go
209 lines
4.8 KiB
Go
package blobovniczatree
|
|
|
|
import (
|
|
"context"
|
|
"path/filepath"
|
|
"sync"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
|
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
|
|
)
|
|
|
|
type activeDB struct {
|
|
blz *blobovnicza.Blobovnicza
|
|
shDB *sharedDB
|
|
}
|
|
|
|
func (db *activeDB) Blobovnicza() *blobovnicza.Blobovnicza {
|
|
return db.blz
|
|
}
|
|
|
|
func (db *activeDB) Close(ctx context.Context) {
|
|
db.shDB.Close(ctx)
|
|
}
|
|
|
|
func (db *activeDB) SystemPath() string {
|
|
return db.shDB.SystemPath()
|
|
}
|
|
|
|
// activeDBManager manages active blobovnicza instances (that is, those that are being used for Put).
|
|
//
|
|
// Uses dbManager for opening/closing sharedDB instances.
|
|
// Stores a reference to an open active sharedDB, so dbManager does not close it.
|
|
// When changing the active sharedDB, releases the reference to the previous active sharedDB.
|
|
type activeDBManager struct {
|
|
levelToActiveDBGuard *sync.RWMutex
|
|
levelToActiveDB map[string]*sharedDB
|
|
levelLock *utilSync.KeyLocker[string]
|
|
closed bool
|
|
|
|
dbManager *dbManager
|
|
rootPath string
|
|
}
|
|
|
|
func newActiveDBManager(dbManager *dbManager, rootPath string) *activeDBManager {
|
|
return &activeDBManager{
|
|
levelToActiveDBGuard: &sync.RWMutex{},
|
|
levelToActiveDB: make(map[string]*sharedDB),
|
|
levelLock: utilSync.NewKeyLocker[string](),
|
|
|
|
dbManager: dbManager,
|
|
rootPath: rootPath,
|
|
}
|
|
}
|
|
|
|
// GetOpenedActiveDBForLevel returns active DB for level.
|
|
// DB must be closed after use.
|
|
func (m *activeDBManager) GetOpenedActiveDBForLevel(ctx context.Context, lvlPath string) (*activeDB, error) {
|
|
activeDB, err := m.getCurrentActiveIfOk(ctx, lvlPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if activeDB != nil {
|
|
return activeDB, nil
|
|
}
|
|
|
|
return m.updateAndGetActive(ctx, lvlPath)
|
|
}
|
|
|
|
func (m *activeDBManager) Open() {
|
|
m.levelToActiveDBGuard.Lock()
|
|
defer m.levelToActiveDBGuard.Unlock()
|
|
|
|
m.closed = false
|
|
}
|
|
|
|
func (m *activeDBManager) Close(ctx context.Context) {
|
|
m.levelToActiveDBGuard.Lock()
|
|
defer m.levelToActiveDBGuard.Unlock()
|
|
|
|
for _, db := range m.levelToActiveDB {
|
|
db.Close(ctx)
|
|
}
|
|
m.levelToActiveDB = make(map[string]*sharedDB)
|
|
m.closed = true
|
|
}
|
|
|
|
func (m *activeDBManager) getCurrentActiveIfOk(ctx context.Context, lvlPath string) (*activeDB, error) {
|
|
m.levelToActiveDBGuard.RLock()
|
|
defer m.levelToActiveDBGuard.RUnlock()
|
|
|
|
if m.closed {
|
|
return nil, errClosed
|
|
}
|
|
|
|
db, ok := m.levelToActiveDB[lvlPath]
|
|
if !ok {
|
|
return nil, nil
|
|
}
|
|
|
|
blz, err := db.Open(ctx) // open db for usage, will be closed on activeDB.Close()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if blz.IsFull() {
|
|
db.Close(ctx)
|
|
return nil, nil
|
|
}
|
|
|
|
return &activeDB{
|
|
blz: blz,
|
|
shDB: db,
|
|
}, nil
|
|
}
|
|
|
|
func (m *activeDBManager) updateAndGetActive(ctx context.Context, lvlPath string) (*activeDB, error) {
|
|
m.levelLock.Lock(lvlPath)
|
|
defer m.levelLock.Unlock(lvlPath)
|
|
|
|
current, err := m.getCurrentActiveIfOk(ctx, lvlPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if current != nil {
|
|
return current, nil
|
|
}
|
|
|
|
nextShDB, err := m.getNextSharedDB(ctx, lvlPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if nextShDB == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
blz, err := nextShDB.Open(ctx) // open db for client, client must call Close() after usage
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &activeDB{
|
|
blz: blz,
|
|
shDB: nextShDB,
|
|
}, nil
|
|
}
|
|
|
|
func (m *activeDBManager) getNextSharedDB(ctx context.Context, lvlPath string) (*sharedDB, error) {
|
|
var nextActiveDBIdx uint64
|
|
hasActive, currentIdx := m.hasActiveDB(lvlPath)
|
|
if hasActive {
|
|
nextActiveDBIdx = currentIdx + 1
|
|
} else {
|
|
hasDBs, maxIdx, err := getBlobovniczaMaxIndex(filepath.Join(m.rootPath, lvlPath))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if hasDBs {
|
|
nextActiveDBIdx = maxIdx
|
|
}
|
|
}
|
|
|
|
path := filepath.Join(lvlPath, u64ToHexStringExt(nextActiveDBIdx))
|
|
next := m.dbManager.GetByPath(path)
|
|
_, err := next.Open(ctx) // open db to hold active DB open, will be closed if db is full, after m.replace or by activeDBManager.Close()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
previous, updated := m.replace(lvlPath, next)
|
|
if !updated && next != nil {
|
|
next.Close(ctx) // manager is closed, so don't hold active DB open
|
|
}
|
|
if updated && previous != nil {
|
|
previous.Close(ctx)
|
|
}
|
|
return next, nil
|
|
}
|
|
|
|
func (m *activeDBManager) hasActiveDB(lvlPath string) (bool, uint64) {
|
|
m.levelToActiveDBGuard.RLock()
|
|
defer m.levelToActiveDBGuard.RUnlock()
|
|
|
|
if m.closed {
|
|
return false, 0
|
|
}
|
|
|
|
db, ok := m.levelToActiveDB[lvlPath]
|
|
if !ok {
|
|
return false, 0
|
|
}
|
|
return true, u64FromHexString(filepath.Base(db.SystemPath()))
|
|
}
|
|
|
|
func (m *activeDBManager) replace(lvlPath string, shDB *sharedDB) (*sharedDB, bool) {
|
|
m.levelToActiveDBGuard.Lock()
|
|
defer m.levelToActiveDBGuard.Unlock()
|
|
|
|
if m.closed {
|
|
return nil, false
|
|
}
|
|
|
|
previous := m.levelToActiveDB[lvlPath]
|
|
if shDB == nil {
|
|
delete(m.levelToActiveDB, lvlPath)
|
|
} else {
|
|
m.levelToActiveDB[lvlPath] = shDB
|
|
}
|
|
return previous, true
|
|
}
|