package blobovniczatree import ( "context" "path/filepath" "sync" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza" utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync" ) type activeDB struct { blz *blobovnicza.Blobovnicza shDB *sharedDB } func (db *activeDB) Blobovnicza() *blobovnicza.Blobovnicza { return db.blz } func (db *activeDB) Close(ctx context.Context) { db.shDB.Close(ctx) } func (db *activeDB) SystemPath() string { return db.shDB.SystemPath() } // activeDBManager manages active blobovnicza instances (that is, those that are being used for Put). // // Uses dbManager for opening/closing sharedDB instances. // Stores a reference to an open active sharedDB, so dbManager does not close it. // When changing the active sharedDB, releases the reference to the previous active sharedDB. type activeDBManager struct { levelToActiveDBGuard *sync.RWMutex levelToActiveDB map[string]*sharedDB levelLock *utilSync.KeyLocker[string] closed bool dbManager *dbManager rootPath string } func newActiveDBManager(dbManager *dbManager, rootPath string) *activeDBManager { return &activeDBManager{ levelToActiveDBGuard: &sync.RWMutex{}, levelToActiveDB: make(map[string]*sharedDB), levelLock: utilSync.NewKeyLocker[string](), dbManager: dbManager, rootPath: rootPath, } } // GetOpenedActiveDBForLevel returns active DB for level. // DB must be closed after use. func (m *activeDBManager) GetOpenedActiveDBForLevel(ctx context.Context, lvlPath string) (*activeDB, error) { activeDB, err := m.getCurrentActiveIfOk(ctx, lvlPath) if err != nil { return nil, err } if activeDB != nil { return activeDB, nil } return m.updateAndGetActive(ctx, lvlPath) } func (m *activeDBManager) Open() { m.levelToActiveDBGuard.Lock() defer m.levelToActiveDBGuard.Unlock() m.closed = false } func (m *activeDBManager) Close(ctx context.Context) { m.levelToActiveDBGuard.Lock() defer m.levelToActiveDBGuard.Unlock() for _, db := range m.levelToActiveDB { db.Close(ctx) } m.levelToActiveDB = make(map[string]*sharedDB) m.closed = true } func (m *activeDBManager) getCurrentActiveIfOk(ctx context.Context, lvlPath string) (*activeDB, error) { m.levelToActiveDBGuard.RLock() defer m.levelToActiveDBGuard.RUnlock() if m.closed { return nil, errClosed } db, ok := m.levelToActiveDB[lvlPath] if !ok { return nil, nil } blz, err := db.Open(ctx) // open db for usage, will be closed on activeDB.Close() if err != nil { return nil, err } if blz.IsFull() { db.Close(ctx) return nil, nil } return &activeDB{ blz: blz, shDB: db, }, nil } func (m *activeDBManager) updateAndGetActive(ctx context.Context, lvlPath string) (*activeDB, error) { m.levelLock.Lock(lvlPath) defer m.levelLock.Unlock(lvlPath) current, err := m.getCurrentActiveIfOk(ctx, lvlPath) if err != nil { return nil, err } if current != nil { return current, nil } nextShDB, err := m.getNextSharedDB(ctx, lvlPath) if err != nil { return nil, err } if nextShDB == nil { return nil, nil } blz, err := nextShDB.Open(ctx) // open db for client, client must call Close() after usage if err != nil { return nil, err } return &activeDB{ blz: blz, shDB: nextShDB, }, nil } func (m *activeDBManager) getNextSharedDB(ctx context.Context, lvlPath string) (*sharedDB, error) { var nextActiveDBIdx uint64 hasActive, currentIdx := m.hasActiveDB(lvlPath) if hasActive { nextActiveDBIdx = currentIdx + 1 } else { hasDBs, maxIdx, err := getBlobovniczaMaxIndex(filepath.Join(m.rootPath, lvlPath)) if err != nil { return nil, err } if hasDBs { nextActiveDBIdx = maxIdx } } path := filepath.Join(lvlPath, u64ToHexStringExt(nextActiveDBIdx)) next := m.dbManager.GetByPath(path) _, err := next.Open(ctx) // open db to hold active DB open, will be closed if db is full, after m.replace or by activeDBManager.Close() if err != nil { return nil, err } previous, updated := m.replace(lvlPath, next) if !updated && next != nil { next.Close(ctx) // manager is closed, so don't hold active DB open } if updated && previous != nil { previous.Close(ctx) } return next, nil } func (m *activeDBManager) hasActiveDB(lvlPath string) (bool, uint64) { m.levelToActiveDBGuard.RLock() defer m.levelToActiveDBGuard.RUnlock() if m.closed { return false, 0 } db, ok := m.levelToActiveDB[lvlPath] if !ok { return false, 0 } return true, u64FromHexString(filepath.Base(db.SystemPath())) } func (m *activeDBManager) replace(lvlPath string, shDB *sharedDB) (*sharedDB, bool) { m.levelToActiveDBGuard.Lock() defer m.levelToActiveDBGuard.Unlock() if m.closed { return nil, false } previous := m.levelToActiveDB[lvlPath] if shDB == nil { delete(m.levelToActiveDB, lvlPath) } else { m.levelToActiveDB[lvlPath] = shDB } return previous, true }