336 lines
7.6 KiB
Go
336 lines
7.6 KiB
Go
package blobovniczatree
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
var errClosingClosedBlobovnicza = errors.New("closing closed blobovnicza is not allowed")
|
|
|
|
// sharedDB is responsible for opening and closing a file of single blobovnicza.
|
|
type sharedDB struct {
|
|
cond *sync.Cond
|
|
blcza *blobovnicza.Blobovnicza
|
|
refCount uint32
|
|
|
|
openDBCounter *openDBCounter
|
|
closedFlag *atomic.Bool
|
|
options []blobovnicza.Option
|
|
path string
|
|
readOnly bool
|
|
metrics blobovnicza.Metrics
|
|
log *logger.Logger
|
|
}
|
|
|
|
func newSharedDB(options []blobovnicza.Option, path string, readOnly bool,
|
|
metrics blobovnicza.Metrics, openDBCounter *openDBCounter, closedFlag *atomic.Bool, log *logger.Logger,
|
|
) *sharedDB {
|
|
return &sharedDB{
|
|
cond: &sync.Cond{
|
|
L: &sync.RWMutex{},
|
|
},
|
|
options: options,
|
|
path: path,
|
|
readOnly: readOnly,
|
|
metrics: metrics,
|
|
closedFlag: closedFlag,
|
|
log: log,
|
|
openDBCounter: openDBCounter,
|
|
}
|
|
}
|
|
|
|
func (b *sharedDB) Open(ctx context.Context) (*blobovnicza.Blobovnicza, error) {
|
|
if b.closedFlag.Load() {
|
|
return nil, errClosed
|
|
}
|
|
|
|
b.cond.L.Lock()
|
|
defer b.cond.L.Unlock()
|
|
|
|
if b.refCount > 0 {
|
|
b.refCount++
|
|
return b.blcza, nil
|
|
}
|
|
|
|
blz := blobovnicza.New(append(b.options,
|
|
blobovnicza.WithReadOnly(b.readOnly),
|
|
blobovnicza.WithPath(b.path),
|
|
blobovnicza.WithMetrics(b.metrics),
|
|
)...)
|
|
|
|
if err := blz.Open(ctx); err != nil {
|
|
return nil, fmt.Errorf("could not open blobovnicza %s: %w", b.path, err)
|
|
}
|
|
if err := blz.Init(ctx); err != nil {
|
|
return nil, fmt.Errorf("could not init blobovnicza %s: %w", b.path, err)
|
|
}
|
|
|
|
b.refCount++
|
|
b.blcza = blz
|
|
b.openDBCounter.Inc()
|
|
|
|
return blz, nil
|
|
}
|
|
|
|
func (b *sharedDB) Close(ctx context.Context) {
|
|
b.cond.L.Lock()
|
|
defer b.cond.L.Unlock()
|
|
|
|
if b.refCount == 0 {
|
|
b.log.Error(ctx, logs.AttemtToCloseAlreadyClosedBlobovnicza, zap.String("id", b.path))
|
|
b.cond.Broadcast()
|
|
return
|
|
}
|
|
|
|
if b.refCount == 1 {
|
|
b.refCount = 0
|
|
if err := b.blcza.Close(ctx); err != nil {
|
|
b.log.Error(ctx, logs.BlobovniczatreeCouldNotCloseBlobovnicza,
|
|
zap.String("id", b.path),
|
|
zap.String("error", err.Error()),
|
|
)
|
|
}
|
|
b.blcza = nil
|
|
b.openDBCounter.Dec()
|
|
return
|
|
}
|
|
|
|
b.refCount--
|
|
if b.refCount == 1 {
|
|
b.cond.Broadcast()
|
|
}
|
|
}
|
|
|
|
func (b *sharedDB) CloseAndRemoveFile(ctx context.Context) error {
|
|
b.cond.L.Lock()
|
|
if b.refCount > 1 {
|
|
b.cond.Wait()
|
|
}
|
|
defer b.cond.L.Unlock()
|
|
|
|
if b.refCount == 0 {
|
|
return errClosingClosedBlobovnicza
|
|
}
|
|
|
|
if err := b.blcza.Close(ctx); err != nil {
|
|
b.log.Error(ctx, logs.BlobovniczatreeCouldNotCloseBlobovnicza,
|
|
zap.String("id", b.path),
|
|
zap.String("error", err.Error()),
|
|
)
|
|
return fmt.Errorf("failed to close blobovnicza (path = %s): %w", b.path, err)
|
|
}
|
|
|
|
b.refCount = 0
|
|
b.blcza = nil
|
|
b.openDBCounter.Dec()
|
|
|
|
return os.Remove(b.path)
|
|
}
|
|
|
|
func (b *sharedDB) SystemPath() string {
|
|
return b.path
|
|
}
|
|
|
|
// levelDbManager stores pointers of the sharedDB's for the leaf directory of the blobovnicza tree.
|
|
type levelDbManager struct {
|
|
dbMtx *sync.RWMutex
|
|
databases map[uint64]*sharedDB
|
|
|
|
options []blobovnicza.Option
|
|
path string
|
|
readOnly bool
|
|
metrics blobovnicza.Metrics
|
|
openDBCounter *openDBCounter
|
|
closedFlag *atomic.Bool
|
|
log *logger.Logger
|
|
}
|
|
|
|
func newLevelDBManager(options []blobovnicza.Option, rootPath string, lvlPath string,
|
|
readOnly bool, metrics blobovnicza.Metrics, openDBCounter *openDBCounter, closedFlag *atomic.Bool, log *logger.Logger,
|
|
) *levelDbManager {
|
|
result := &levelDbManager{
|
|
databases: make(map[uint64]*sharedDB),
|
|
dbMtx: &sync.RWMutex{},
|
|
|
|
options: options,
|
|
path: filepath.Join(rootPath, lvlPath),
|
|
readOnly: readOnly,
|
|
metrics: metrics,
|
|
openDBCounter: openDBCounter,
|
|
closedFlag: closedFlag,
|
|
log: log,
|
|
}
|
|
return result
|
|
}
|
|
|
|
func (m *levelDbManager) GetByIndex(idx uint64) *sharedDB {
|
|
res := m.getDBIfExists(idx)
|
|
if res != nil {
|
|
return res
|
|
}
|
|
return m.getOrCreateDB(idx)
|
|
}
|
|
|
|
func (m *levelDbManager) getDBIfExists(idx uint64) *sharedDB {
|
|
m.dbMtx.RLock()
|
|
defer m.dbMtx.RUnlock()
|
|
|
|
return m.databases[idx]
|
|
}
|
|
|
|
func (m *levelDbManager) getOrCreateDB(idx uint64) *sharedDB {
|
|
m.dbMtx.Lock()
|
|
defer m.dbMtx.Unlock()
|
|
|
|
db := m.databases[idx]
|
|
if db != nil {
|
|
return db
|
|
}
|
|
|
|
db = newSharedDB(m.options, filepath.Join(m.path, u64ToHexStringExt(idx)), m.readOnly, m.metrics, m.openDBCounter, m.closedFlag, m.log)
|
|
m.databases[idx] = db
|
|
return db
|
|
}
|
|
|
|
func (m *levelDbManager) hasAnyDB() bool {
|
|
m.dbMtx.RLock()
|
|
defer m.dbMtx.RUnlock()
|
|
|
|
return len(m.databases) > 0
|
|
}
|
|
|
|
// dbManager manages the opening and closing of blobovnicza instances.
|
|
//
|
|
// The blobovnicza opens at the first request, closes after the last request.
|
|
type dbManager struct {
|
|
levelToManager map[string]*levelDbManager
|
|
levelToManagerGuard *sync.RWMutex
|
|
closedFlag *atomic.Bool
|
|
dbCounter *openDBCounter
|
|
|
|
rootPath string
|
|
options []blobovnicza.Option
|
|
readOnly bool
|
|
metrics blobovnicza.Metrics
|
|
log *logger.Logger
|
|
}
|
|
|
|
func newDBManager(rootPath string, options []blobovnicza.Option, readOnly bool, metrics blobovnicza.Metrics, log *logger.Logger) *dbManager {
|
|
return &dbManager{
|
|
rootPath: rootPath,
|
|
options: options,
|
|
readOnly: readOnly,
|
|
metrics: metrics,
|
|
levelToManager: make(map[string]*levelDbManager),
|
|
levelToManagerGuard: &sync.RWMutex{},
|
|
log: log,
|
|
closedFlag: &atomic.Bool{},
|
|
dbCounter: newOpenDBCounter(),
|
|
}
|
|
}
|
|
|
|
func (m *dbManager) GetByPath(path string) *sharedDB {
|
|
lvlPath := filepath.Dir(path)
|
|
curIndex := u64FromHexString(filepath.Base(path))
|
|
levelManager := m.getLevelManager(lvlPath)
|
|
return levelManager.GetByIndex(curIndex)
|
|
}
|
|
|
|
func (m *dbManager) CleanResources(path string) {
|
|
lvlPath := filepath.Dir(path)
|
|
|
|
m.levelToManagerGuard.Lock()
|
|
defer m.levelToManagerGuard.Unlock()
|
|
|
|
if result, ok := m.levelToManager[lvlPath]; ok && !result.hasAnyDB() {
|
|
delete(m.levelToManager, lvlPath)
|
|
}
|
|
}
|
|
|
|
func (m *dbManager) Open() {
|
|
m.closedFlag.Store(false)
|
|
}
|
|
|
|
func (m *dbManager) Close() {
|
|
m.closedFlag.Store(true)
|
|
m.dbCounter.WaitUntilAllClosed()
|
|
}
|
|
|
|
func (m *dbManager) getLevelManager(lvlPath string) *levelDbManager {
|
|
result := m.getLevelManagerIfExists(lvlPath)
|
|
if result != nil {
|
|
return result
|
|
}
|
|
return m.getOrCreateLevelManager(lvlPath)
|
|
}
|
|
|
|
func (m *dbManager) getLevelManagerIfExists(lvlPath string) *levelDbManager {
|
|
m.levelToManagerGuard.RLock()
|
|
defer m.levelToManagerGuard.RUnlock()
|
|
|
|
return m.levelToManager[lvlPath]
|
|
}
|
|
|
|
func (m *dbManager) getOrCreateLevelManager(lvlPath string) *levelDbManager {
|
|
m.levelToManagerGuard.Lock()
|
|
defer m.levelToManagerGuard.Unlock()
|
|
|
|
if result, ok := m.levelToManager[lvlPath]; ok {
|
|
return result
|
|
}
|
|
|
|
result := newLevelDBManager(m.options, m.rootPath, lvlPath, m.readOnly, m.metrics, m.dbCounter, m.closedFlag, m.log)
|
|
m.levelToManager[lvlPath] = result
|
|
return result
|
|
}
|
|
|
|
type openDBCounter struct {
|
|
cond *sync.Cond
|
|
count uint64
|
|
}
|
|
|
|
func newOpenDBCounter() *openDBCounter {
|
|
return &openDBCounter{
|
|
cond: &sync.Cond{
|
|
L: &sync.Mutex{},
|
|
},
|
|
}
|
|
}
|
|
|
|
func (c *openDBCounter) Inc() {
|
|
c.cond.L.Lock()
|
|
defer c.cond.L.Unlock()
|
|
|
|
c.count++
|
|
}
|
|
|
|
func (c *openDBCounter) Dec() {
|
|
c.cond.L.Lock()
|
|
defer c.cond.L.Unlock()
|
|
|
|
if c.count > 0 {
|
|
c.count--
|
|
}
|
|
|
|
if c.count == 0 {
|
|
c.cond.Broadcast()
|
|
}
|
|
}
|
|
|
|
func (c *openDBCounter) WaitUntilAllClosed() {
|
|
c.cond.L.Lock()
|
|
for c.count > 0 {
|
|
c.cond.Wait()
|
|
}
|
|
c.cond.L.Unlock()
|
|
}
|