frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree/manager.go
Dmitrii Stepanov 422226da18 [#661] blobovniczatree: Add Rebuild implementation
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-12-07 15:37:32 +03:00

335 lines
7.5 KiB
Go

package blobovniczatree
import (
"errors"
"fmt"
"os"
"path/filepath"
"sync"
"sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
var errClosingClosedBlobovnicza = errors.New("closing closed blobovnicza is not allowed")
// sharedDB is responsible for opening and closing a file of single blobovnicza.
type sharedDB struct {
cond *sync.Cond
blcza *blobovnicza.Blobovnicza
refCount uint32
openDBCounter *openDBCounter
closedFlag *atomic.Bool
options []blobovnicza.Option
path string
readOnly bool
metrics blobovnicza.Metrics
log *logger.Logger
}
func newSharedDB(options []blobovnicza.Option, path string, readOnly bool,
metrics blobovnicza.Metrics, openDBCounter *openDBCounter, closedFlag *atomic.Bool, log *logger.Logger,
) *sharedDB {
return &sharedDB{
cond: &sync.Cond{
L: &sync.RWMutex{},
},
options: options,
path: path,
readOnly: readOnly,
metrics: metrics,
closedFlag: closedFlag,
log: log,
openDBCounter: openDBCounter,
}
}
func (b *sharedDB) Open() (*blobovnicza.Blobovnicza, error) {
if b.closedFlag.Load() {
return nil, errClosed
}
b.cond.L.Lock()
defer b.cond.L.Unlock()
if b.refCount > 0 {
b.refCount++
return b.blcza, nil
}
blz := blobovnicza.New(append(b.options,
blobovnicza.WithReadOnly(b.readOnly),
blobovnicza.WithPath(b.path),
blobovnicza.WithMetrics(b.metrics),
)...)
if err := blz.Open(); err != nil {
return nil, fmt.Errorf("could not open blobovnicza %s: %w", b.path, err)
}
if err := blz.Init(); err != nil {
return nil, fmt.Errorf("could not init blobovnicza %s: %w", b.path, err)
}
b.refCount++
b.blcza = blz
b.openDBCounter.Inc()
return blz, nil
}
func (b *sharedDB) Close() {
b.cond.L.Lock()
defer b.cond.L.Unlock()
if b.refCount == 0 {
b.log.Error(logs.AttemtToCloseAlreadyClosedBlobovnicza, zap.String("id", b.path))
b.cond.Broadcast()
return
}
if b.refCount == 1 {
b.refCount = 0
if err := b.blcza.Close(); err != nil {
b.log.Error(logs.BlobovniczatreeCouldNotCloseBlobovnicza,
zap.String("id", b.path),
zap.String("error", err.Error()),
)
}
b.blcza = nil
b.openDBCounter.Dec()
return
}
b.refCount--
if b.refCount == 1 {
b.cond.Broadcast()
}
}
func (b *sharedDB) CloseAndRemoveFile() error {
b.cond.L.Lock()
if b.refCount > 1 {
b.cond.Wait()
}
defer b.cond.L.Unlock()
if b.refCount == 0 {
return errClosingClosedBlobovnicza
}
if err := b.blcza.Close(); err != nil {
b.log.Error(logs.BlobovniczatreeCouldNotCloseBlobovnicza,
zap.String("id", b.path),
zap.String("error", err.Error()),
)
return fmt.Errorf("failed to close blobovnicza (path = %s): %w", b.path, err)
}
b.refCount = 0
b.blcza = nil
b.openDBCounter.Dec()
return os.Remove(b.path)
}
func (b *sharedDB) SystemPath() string {
return b.path
}
// levelDbManager stores pointers of the sharedDB's for the leaf directory of the blobovnicza tree.
type levelDbManager struct {
dbMtx *sync.RWMutex
databases map[uint64]*sharedDB
options []blobovnicza.Option
path string
readOnly bool
metrics blobovnicza.Metrics
openDBCounter *openDBCounter
closedFlag *atomic.Bool
log *logger.Logger
}
func newLevelDBManager(options []blobovnicza.Option, rootPath string, lvlPath string,
readOnly bool, metrics blobovnicza.Metrics, openDBCounter *openDBCounter, closedFlag *atomic.Bool, log *logger.Logger,
) *levelDbManager {
result := &levelDbManager{
databases: make(map[uint64]*sharedDB),
dbMtx: &sync.RWMutex{},
options: options,
path: filepath.Join(rootPath, lvlPath),
readOnly: readOnly,
metrics: metrics,
openDBCounter: openDBCounter,
closedFlag: closedFlag,
log: log,
}
return result
}
func (m *levelDbManager) GetByIndex(idx uint64) *sharedDB {
res := m.getDBIfExists(idx)
if res != nil {
return res
}
return m.getOrCreateDB(idx)
}
func (m *levelDbManager) getDBIfExists(idx uint64) *sharedDB {
m.dbMtx.RLock()
defer m.dbMtx.RUnlock()
return m.databases[idx]
}
func (m *levelDbManager) getOrCreateDB(idx uint64) *sharedDB {
m.dbMtx.Lock()
defer m.dbMtx.Unlock()
db := m.databases[idx]
if db != nil {
return db
}
db = newSharedDB(m.options, filepath.Join(m.path, u64ToHexStringExt(idx)), m.readOnly, m.metrics, m.openDBCounter, m.closedFlag, m.log)
m.databases[idx] = db
return db
}
func (m *levelDbManager) hasAnyDB() bool {
m.dbMtx.RLock()
defer m.dbMtx.RUnlock()
return len(m.databases) > 0
}
// dbManager manages the opening and closing of blobovnicza instances.
//
// The blobovnicza opens at the first request, closes after the last request.
type dbManager struct {
levelToManager map[string]*levelDbManager
levelToManagerGuard *sync.RWMutex
closedFlag *atomic.Bool
dbCounter *openDBCounter
rootPath string
options []blobovnicza.Option
readOnly bool
metrics blobovnicza.Metrics
log *logger.Logger
}
func newDBManager(rootPath string, options []blobovnicza.Option, readOnly bool, metrics blobovnicza.Metrics, log *logger.Logger) *dbManager {
return &dbManager{
rootPath: rootPath,
options: options,
readOnly: readOnly,
metrics: metrics,
levelToManager: make(map[string]*levelDbManager),
levelToManagerGuard: &sync.RWMutex{},
log: log,
closedFlag: &atomic.Bool{},
dbCounter: newOpenDBCounter(),
}
}
func (m *dbManager) GetByPath(path string) *sharedDB {
lvlPath := filepath.Dir(path)
curIndex := u64FromHexString(filepath.Base(path))
levelManager := m.getLevelManager(lvlPath)
return levelManager.GetByIndex(curIndex)
}
func (m *dbManager) CleanResources(path string) {
lvlPath := filepath.Dir(path)
m.levelToManagerGuard.Lock()
defer m.levelToManagerGuard.Unlock()
if result, ok := m.levelToManager[lvlPath]; ok && !result.hasAnyDB() {
delete(m.levelToManager, lvlPath)
}
}
func (m *dbManager) Open() {
m.closedFlag.Store(false)
}
func (m *dbManager) Close() {
m.closedFlag.Store(true)
m.dbCounter.WaitUntilAllClosed()
}
func (m *dbManager) getLevelManager(lvlPath string) *levelDbManager {
result := m.getLevelManagerIfExists(lvlPath)
if result != nil {
return result
}
return m.getOrCreateLevelManager(lvlPath)
}
func (m *dbManager) getLevelManagerIfExists(lvlPath string) *levelDbManager {
m.levelToManagerGuard.RLock()
defer m.levelToManagerGuard.RUnlock()
return m.levelToManager[lvlPath]
}
func (m *dbManager) getOrCreateLevelManager(lvlPath string) *levelDbManager {
m.levelToManagerGuard.Lock()
defer m.levelToManagerGuard.Unlock()
if result, ok := m.levelToManager[lvlPath]; ok {
return result
}
result := newLevelDBManager(m.options, m.rootPath, lvlPath, m.readOnly, m.metrics, m.dbCounter, m.closedFlag, m.log)
m.levelToManager[lvlPath] = result
return result
}
type openDBCounter struct {
cond *sync.Cond
count uint64
}
func newOpenDBCounter() *openDBCounter {
return &openDBCounter{
cond: &sync.Cond{
L: &sync.Mutex{},
},
}
}
func (c *openDBCounter) Inc() {
c.cond.L.Lock()
defer c.cond.L.Unlock()
c.count++
}
func (c *openDBCounter) Dec() {
c.cond.L.Lock()
defer c.cond.L.Unlock()
if c.count > 0 {
c.count--
}
if c.count == 0 {
c.cond.Broadcast()
}
}
func (c *openDBCounter) WaitUntilAllClosed() {
c.cond.L.Lock()
for c.count > 0 {
c.cond.Wait()
}
c.cond.L.Unlock()
}