package blobovniczatree import ( "fmt" "path/filepath" "sync" "sync/atomic" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "go.uber.org/zap" ) // sharedDB is responsible for opening and closing a file of single blobovnicza. type sharedDB struct { guard *sync.RWMutex blcza *blobovnicza.Blobovnicza refCount uint32 openDBCounter *openDBCounter closedFlag *atomic.Bool options []blobovnicza.Option path string readOnly bool metrics blobovnicza.Metrics log *logger.Logger } func newSharedDB(options []blobovnicza.Option, path string, readOnly bool, metrics blobovnicza.Metrics, openDBCounter *openDBCounter, closedFlag *atomic.Bool, log *logger.Logger) *sharedDB { return &sharedDB{ guard: &sync.RWMutex{}, options: options, path: path, readOnly: readOnly, metrics: metrics, closedFlag: closedFlag, log: log, openDBCounter: openDBCounter, } } func (b *sharedDB) Open() (*blobovnicza.Blobovnicza, error) { if b.closedFlag.Load() { return nil, errClosed } b.guard.Lock() defer b.guard.Unlock() if b.refCount > 0 { b.refCount++ return b.blcza, nil } blz := blobovnicza.New(append(b.options, blobovnicza.WithReadOnly(b.readOnly), blobovnicza.WithPath(b.path), blobovnicza.WithMetrics(b.metrics), )...) if err := blz.Open(); err != nil { return nil, fmt.Errorf("could not open blobovnicza %s: %w", b.path, err) } if err := blz.Init(); err != nil { return nil, fmt.Errorf("could not init blobovnicza %s: %w", b.path, err) } b.refCount++ b.blcza = blz b.openDBCounter.Inc() return blz, nil } func (b *sharedDB) Close() { b.guard.Lock() defer b.guard.Unlock() if b.refCount == 0 { b.log.Error(logs.AttemtToCloseAlreadyClosedBlobovnicza, zap.String("id", b.path)) return } if b.refCount == 1 { b.refCount = 0 if err := b.blcza.Close(); err != nil { b.log.Error(logs.BlobovniczatreeCouldNotCloseBlobovnicza, zap.String("id", b.path), zap.String("error", err.Error()), ) } b.blcza = nil b.openDBCounter.Dec() return } b.refCount-- } func (b *sharedDB) Path() string { return b.path } // levelDbManager stores pointers of the sharedDB's for the leaf directory of the blobovnicza tree. type levelDbManager struct { dbMtx *sync.RWMutex databases map[uint64]*sharedDB options []blobovnicza.Option path string readOnly bool metrics blobovnicza.Metrics openDBCounter *openDBCounter closedFlag *atomic.Bool log *logger.Logger } func newLevelDBManager(options []blobovnicza.Option, rootPath string, lvlPath string, readOnly bool, metrics blobovnicza.Metrics, openDBCounter *openDBCounter, closedFlag *atomic.Bool, log *logger.Logger) *levelDbManager { result := &levelDbManager{ databases: make(map[uint64]*sharedDB), dbMtx: &sync.RWMutex{}, options: options, path: filepath.Join(rootPath, lvlPath), readOnly: readOnly, metrics: metrics, openDBCounter: openDBCounter, closedFlag: closedFlag, log: log, } return result } func (m *levelDbManager) GetByIndex(idx uint64) *sharedDB { res := m.getDBIfExists(idx) if res != nil { return res } return m.getOrCreateDB(idx) } func (m *levelDbManager) getDBIfExists(idx uint64) *sharedDB { m.dbMtx.RLock() defer m.dbMtx.RUnlock() return m.databases[idx] } func (m *levelDbManager) getOrCreateDB(idx uint64) *sharedDB { m.dbMtx.Lock() defer m.dbMtx.Unlock() db := m.databases[idx] if db != nil { return db } db = newSharedDB(m.options, filepath.Join(m.path, u64ToHexStringExt(idx)), m.readOnly, m.metrics, m.openDBCounter, m.closedFlag, m.log) m.databases[idx] = db return db } // dbManager manages the opening and closing of blobovnicza instances. // // The blobovnicza opens at the first request, closes after the last request. type dbManager struct { levelToManager map[string]*levelDbManager levelToManagerGuard *sync.RWMutex closedFlag *atomic.Bool dbCounter *openDBCounter rootPath string options []blobovnicza.Option readOnly bool metrics blobovnicza.Metrics log *logger.Logger } func newDBManager(rootPath string, options []blobovnicza.Option, readOnly bool, metrics blobovnicza.Metrics, log *logger.Logger) *dbManager { return &dbManager{ rootPath: rootPath, options: options, readOnly: readOnly, metrics: metrics, levelToManager: make(map[string]*levelDbManager), levelToManagerGuard: &sync.RWMutex{}, log: log, closedFlag: &atomic.Bool{}, dbCounter: newOpenDBCounter(), } } func (m *dbManager) GetByPath(path string) *sharedDB { lvlPath := filepath.Dir(path) curIndex := u64FromHexString(filepath.Base(path)) levelManager := m.getLevelManager(lvlPath) return levelManager.GetByIndex(curIndex) } func (m *dbManager) Open() { m.closedFlag.Store(false) } func (m *dbManager) Close() { m.closedFlag.Store(true) m.dbCounter.WaitUntilAllClosed() } func (m *dbManager) getLevelManager(lvlPath string) *levelDbManager { result := m.getLevelManagerIfExists(lvlPath) if result != nil { return result } return m.getOrCreateLevelManager(lvlPath) } func (m *dbManager) getLevelManagerIfExists(lvlPath string) *levelDbManager { m.levelToManagerGuard.RLock() defer m.levelToManagerGuard.RUnlock() return m.levelToManager[lvlPath] } func (m *dbManager) getOrCreateLevelManager(lvlPath string) *levelDbManager { m.levelToManagerGuard.Lock() defer m.levelToManagerGuard.Unlock() if result, ok := m.levelToManager[lvlPath]; ok { return result } result := newLevelDBManager(m.options, m.rootPath, lvlPath, m.readOnly, m.metrics, m.dbCounter, m.closedFlag, m.log) m.levelToManager[lvlPath] = result return result } type openDBCounter struct { cond *sync.Cond count uint64 } func newOpenDBCounter() *openDBCounter { return &openDBCounter{ cond: &sync.Cond{ L: &sync.Mutex{}, }, } } func (c *openDBCounter) Inc() { c.cond.L.Lock() defer c.cond.L.Unlock() c.count++ } func (c *openDBCounter) Dec() { c.cond.L.Lock() defer c.cond.L.Unlock() if c.count > 0 { c.count-- } if c.count == 0 { c.cond.Broadcast() } } func (c *openDBCounter) WaitUntilAllClosed() { c.cond.L.Lock() for c.count > 0 { c.cond.Wait() } c.cond.L.Unlock() }