frostfs-node/pkg/local_object_storage/blobovnicza/control.go

154 lines
3.5 KiB
Go
Raw Permalink Normal View History

package blobovnicza
import (
"errors"
"fmt"
"path/filepath"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
// Open opens an internal database at the configured path with the configured permissions.
//
// If the database file does not exist, it will be created automatically.
// If blobovnicza is already open, does nothing.
func (b *Blobovnicza) Open() error {
b.controlMtx.Lock()
defer b.controlMtx.Unlock()
if b.opened {
return nil
}
b.log.Debug(logs.BlobovniczaCreatingDirectoryForBoltDB,
zap.String("path", b.path),
zap.Bool("ro", b.boltOptions.ReadOnly),
)
var err error
if !b.boltOptions.ReadOnly {
err = util.MkdirAllX(filepath.Dir(b.path), b.perm)
if err != nil {
return err
}
}
b.log.Debug(logs.BlobovniczaOpeningBoltDB,
zap.String("path", b.path),
zap.Stringer("permissions", b.perm),
)
b.boltDB, err = bbolt.Open(b.path, b.perm, b.boltOptions)
if err == nil {
b.opened = true
b.metrics.IncOpenBlobovniczaCount()
}
return err
}
// Init initializes internal database structure.
//
// If Blobovnicza is already initialized, no action is taken.
// Blobovnicza must be open, otherwise an error will return.
func (b *Blobovnicza) Init() error {
b.controlMtx.Lock()
defer b.controlMtx.Unlock()
if !b.opened {
return errors.New("blobovnicza is not open")
}
b.log.Debug(logs.BlobovniczaInitializing,
zap.Uint64("object size limit", b.objSizeLimit),
zap.Uint64("storage size limit", b.fullSizeLimit),
)
size := b.dataSize.Load()
items := b.itemsCount.Load()
if size != 0 || items != 0 {
b.log.Debug(logs.BlobovniczaAlreadyInitialized, zap.Uint64("size", size), zap.Uint64("items", items))
return nil
}
if !b.boltOptions.ReadOnly {
err := b.boltDB.Update(func(tx *bbolt.Tx) error {
return b.iterateBucketKeys(true, func(lower, upper uint64, key []byte) (bool, error) {
// create size range bucket
rangeStr := stringifyBounds(lower, upper)
b.log.Debug(logs.BlobovniczaCreatingBucketForSizeRange,
zap.String("range", rangeStr))
_, err := tx.CreateBucketIfNotExists(key)
if err != nil {
return false, fmt.Errorf("(%T) could not create bucket for bounds %s: %w",
b, rangeStr, err)
}
return false, nil
})
})
if err != nil {
return err
}
}
return b.initializeCounters()
}
func (b *Blobovnicza) initializeCounters() error {
var size uint64
var items uint64
err := b.boltDB.View(func(tx *bbolt.Tx) error {
return b.iterateAllBuckets(tx, func(lower, upper uint64, b *bbolt.Bucket) (bool, error) {
keysN := uint64(b.Stats().KeyN)
size += keysN * upper
items += keysN
return false, nil
})
})
if err != nil {
return fmt.Errorf("can't determine DB size: %w", err)
}
b.dataSize.Store(size)
b.itemsCount.Store(items)
b.metrics.AddOpenBlobovniczaSize(size)
b.metrics.AddOpenBlobovniczaItems(items)
return nil
}
// Close releases all internal database resources.
//
// If blobovnicza is already closed, does nothing.
func (b *Blobovnicza) Close() error {
b.controlMtx.Lock()
defer b.controlMtx.Unlock()
if !b.opened {
return nil
}
b.log.Debug(logs.BlobovniczaClosingBoltDB,
zap.String("path", b.path),
)
if err := b.boltDB.Close(); err != nil {
return err
}
b.metrics.DecOpenBlobovniczaCount()
b.metrics.SubOpenBlobovniczaSize(b.dataSize.Load())
b.metrics.SubOpenBlobovniczaItems(b.itemsCount.Load())
b.dataSize.Store(0)
b.itemsCount.Store(0)
b.opened = false
return nil
}