[#9999] metabase: Fix db engine to pebble in db.go and control.go

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-07-02 15:33:02 +03:00
parent b7bd21b412
commit e40cc9dd41
2 changed files with 94 additions and 225 deletions

View file

@ -2,8 +2,8 @@ package meta
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -11,7 +11,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"go.etcd.io/bbolt"
"github.com/cockroachdb/pebble"
"go.uber.org/zap"
)
@ -21,23 +21,7 @@ var ErrDegradedMode = logicerr.New("metabase is in a degraded mode")
// ErrReadOnlyMode is returned when metabase is in a read-only mode.
var ErrReadOnlyMode = logicerr.New("metabase is in a read-only mode")
var (
mStaticBuckets = map[string]struct{}{
string(containerVolumeBucketName): {},
string(containerCounterBucketName): {},
string(graveyardBucketName): {},
string(garbageBucketName): {},
string(shardInfoBucket): {},
string(bucketNameLocked): {},
}
// deprecatedBuckets buckets that are not used anymore.
deprecatedBuckets = [][]byte{
toMoveItBucketName,
}
)
// Open boltDB instance for metabase.
// Open metabase.
func (db *DB) Open(_ context.Context, m mode.Mode) error {
db.modeMtx.Lock()
defer db.modeMtx.Unlock()
@ -47,147 +31,122 @@ func (db *DB) Open(_ context.Context, m mode.Mode) error {
if m.NoMetabase() {
return nil
}
return db.openDB(m)
}
func (db *DB) openDB(mode mode.Mode) error {
err := util.MkdirAllX(filepath.Dir(db.info.Path), db.info.Permission)
err := util.MkdirAllX(db.info.Path, db.info.Permission)
if err != nil {
return fmt.Errorf("can't create dir %s for metabase: %w", db.info.Path, err)
}
db.log.Debug(logs.MetabaseCreatedDirectoryForMetabase, zap.String("path", db.info.Path))
if db.boltOptions == nil {
opts := *bbolt.DefaultOptions
db.boltOptions = &opts
}
db.boltOptions.ReadOnly = mode.ReadOnly()
return metaerr.Wrap(db.openBolt())
return metaerr.Wrap(db.openDatabase(mode.ReadOnly()))
}
func (db *DB) openBolt() error {
var err error
db.database, err = bbolt.Open(db.info.Path, db.info.Permission, db.boltOptions)
if err != nil {
return fmt.Errorf("can't open boltDB database: %w", err)
func (db *DB) pebbleOptions(readOnly bool) *pebble.Options {
return &pebble.Options{
ReadOnly: readOnly,
}
db.database.MaxBatchDelay = db.boltBatchDelay
db.database.MaxBatchSize = db.boltBatchSize
}
db.log.Debug(logs.MetabaseOpenedBoltDBInstanceForMetabase)
func (db *DB) openDatabase(readOnly bool) error {
opts := db.pebbleOptions(readOnly)
db.log.Debug(logs.MetabaseCheckingMetabaseVersion)
return db.database.View(func(tx *bbolt.Tx) error {
// The safest way to check if the metabase is fresh is to check if it has no buckets.
// However, shard info can be present. So here we check that the number of buckets is
// at most 1.
// Another thing to consider is that tests do not persist shard ID, we want to support
// this case too.
var n int
err := tx.ForEach(func([]byte, *bbolt.Bucket) error {
if n++; n >= 2 { // do not iterate a lot
return errBreakBucketForEach
}
return nil
})
var err error
db.database, err = pebble.Open(db.info.Path, opts)
if err != nil {
return fmt.Errorf("can't open badger database: %w", err)
}
if err == errBreakBucketForEach {
db.initialized = true
err = nil
return db.snapshot(func(s *pebble.Snapshot) error {
data, err := valueSafe(s, shardInfoKey(versionKey))
if err != nil {
return err
}
return err
db.initialized = len(data) > 0
return nil
})
}
// Init initializes metabase. It creates static (CID-independent) buckets in underlying BoltDB instance.
// Init initializes metabase.
//
// Returns ErrOutdatedVersion if a database at the provided path is outdated.
//
// Does nothing if metabase has already been initialized and filled. To roll back the database to its initial state,
// use Reset.
func (db *DB) Init() error {
return metaerr.Wrap(db.init(false))
db.modeMtx.Lock()
defer db.modeMtx.Unlock()
return metaerr.Wrap(db.init(context.TODO(), false))
}
// Reset resets metabase. Works similar to Init but cleans up all static buckets and
// removes all dynamic (CID-dependent) ones in non-blank BoltDB instances.
func (db *DB) Reset() error {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
func (db *DB) Init2(ctx context.Context) error {
db.modeMtx.Lock()
defer db.modeMtx.Unlock()
return metaerr.Wrap(db.init(ctx, false))
}
// Reset resets metabase. Works similar to Init but cleans up all data records.
func (db *DB) Reset(ctx context.Context) error {
db.modeMtx.Lock()
defer db.modeMtx.Unlock()
if db.mode.NoMetabase() {
return ErrDegradedMode
}
return metaerr.Wrap(db.init(true))
return metaerr.Wrap(db.init(ctx, true))
}
func (db *DB) init(reset bool) error {
func (db *DB) init(ctx context.Context, reset bool) error {
if db.mode.NoMetabase() || db.mode.ReadOnly() {
return nil
}
return db.database.Update(func(tx *bbolt.Tx) error {
var err error
if !reset {
// Normal open, check version and update if not initialized.
err := checkVersion(tx, db.initialized)
if err != nil {
return err
}
if reset {
if err := db.reset(); err != nil {
return err
}
for k := range mStaticBuckets {
name := []byte(k)
if reset {
err := tx.DeleteBucket(name)
if err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) {
return fmt.Errorf("could not delete static bucket %s: %w", k, err)
}
}
}
_, err := tx.CreateBucketIfNotExists(name)
if err != nil {
return fmt.Errorf("could not create static bucket %s: %w", k, err)
}
return db.batch(func(b *pebble.Batch) error {
err := checkVersion(b, db.initialized)
if err != nil {
return err
}
err = syncCounter(ctx, b, false)
if err != nil {
return fmt.Errorf("could not sync object counter: %w", err)
}
for _, b := range deprecatedBuckets {
err := tx.DeleteBucket(b)
if err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) {
return fmt.Errorf("could not delete deprecated bucket %s: %w", string(b), err)
}
}
return nil
})
}
if !reset { // counters will be recalculated by refill metabase
err = syncCounter(tx, false)
if err != nil {
return fmt.Errorf("could not sync object counter: %w", err)
}
return nil
}
bucketCursor := tx.Cursor()
name, _ := bucketCursor.First()
for name != nil {
if _, ok := mStaticBuckets[string(name)]; !ok {
if err := tx.DeleteBucket(name); err != nil {
return err
}
name, _ = bucketCursor.Seek(name)
continue
}
name, _ = bucketCursor.Next()
}
return updateVersion(tx, version)
func (db *DB) reset() error {
if err := db.database.Close(); err != nil {
return err
}
db.database = nil
if err := os.RemoveAll(db.info.Path); err != nil {
return err
}
var err error
db.database, err = pebble.Open(db.info.Path, db.pebbleOptions(false))
if err != nil {
return fmt.Errorf("can't open badger database: %w", err)
}
return db.batch(func(b *pebble.Batch) error {
return updateVersion(b, version)
})
}
// SyncCounters forces to synchronize the object counters.
func (db *DB) SyncCounters() error {
func (db *DB) SyncCounters(ctx context.Context) error {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
@ -197,26 +156,29 @@ func (db *DB) SyncCounters() error {
return ErrReadOnlyMode
}
return metaerr.Wrap(db.database.Update(func(tx *bbolt.Tx) error {
return syncCounter(tx, true)
return metaerr.Wrap(db.batch(func(b *pebble.Batch) error {
return syncCounter(ctx, b, true)
}))
}
// Close closes boltDB instance
// and reports metabase metric.
// Close closes metabase.
func (db *DB) Close() error {
var err error
if db.database != nil {
err = db.close()
}
if err == nil {
db.metrics.Close()
}
return err
db.modeMtx.Lock()
defer db.modeMtx.Unlock()
return db.close()
}
func (db *DB) close() error {
return metaerr.Wrap(db.database.Close())
var err error
if db.database != nil {
err = metaerr.Wrap(db.database.Close())
}
if err == nil {
db.database = nil
db.metrics.Close()
}
return err
}
// Reload reloads part of the configuration.
@ -235,14 +197,14 @@ func (db *DB) Reload(opts ...Option) (bool, error) {
defer db.modeMtx.Unlock()
if db.mode.NoMetabase() || c.info.Path != "" && filepath.Clean(db.info.Path) != filepath.Clean(c.info.Path) {
if err := db.Close(); err != nil {
if err := db.close(); err != nil {
return false, err
}
db.mode = mode.Disabled
db.metrics.SetMode(mode.ComponentDisabled)
db.info.Path = c.info.Path
if err := db.openBolt(); err != nil {
if err := db.openDatabase(false); err != nil {
return false, metaerr.Wrap(fmt.Errorf("%w: %v", ErrDegradedMode, err))
}

View file

@ -1,10 +1,8 @@
package meta
import (
"bytes"
"encoding/binary"
"encoding/hex"
"errors"
"io/fs"
"os"
"strconv"
@ -85,20 +83,16 @@ func New(opts ...Option) *DB {
cfg: c,
matchers: map[objectSDK.SearchMatchType]matcher{
objectSDK.MatchUnknown: {
matchSlow: unknownMatcher,
matchBucket: unknownMatcherBucket,
matchSlow: unknownMatcher,
},
objectSDK.MatchStringEqual: {
matchSlow: stringEqualMatcher,
matchBucket: stringEqualMatcherBucket,
matchSlow: stringEqualMatcher,
},
objectSDK.MatchStringNotEqual: {
matchSlow: stringNotEqualMatcher,
matchBucket: stringNotEqualMatcherBucket,
matchSlow: stringNotEqualMatcher,
},
objectSDK.MatchCommonPrefix: {
matchSlow: stringCommonPrefixMatcher,
matchBucket: stringCommonPrefixMatcherBucket,
matchSlow: stringCommonPrefixMatcher,
},
},
mode: mode.Disabled,
@ -180,105 +174,18 @@ func stringEqualMatcher(key string, objVal []byte, filterVal string) bool {
return stringifyValue(key, objVal) == filterVal
}
func stringEqualMatcherBucket(r pebble.Reader, fKey string, fValue string, f func([]byte, []byte) error) error {
// Ignore the second return value because we check for strict equality.
val, _, ok := destringifyValue(fKey, fValue, false)
if !ok {
return nil
}
data, err := valueSafe(r, val)
if err != nil {
return err
}
if data != nil {
return f(val, data)
}
return nil
}
func stringNotEqualMatcher(key string, objVal []byte, filterVal string) bool {
return stringifyValue(key, objVal) != filterVal
}
func stringNotEqualMatcherBucket(r pebble.Reader, fKey string, fValue string, f func([]byte, []byte) error) error {
// Ignore the second return value because we check for strict inequality.
val, _, ok := destringifyValue(fKey, fValue, false)
it, err := r.NewIter(&pebble.IterOptions{
SkipPoint: func(k []byte) bool {
return ok && bytes.Equal(val, k)
},
})
if err != nil {
return err
}
for v := it.First(); v; v = it.Next() {
if err := f(it.Key(), it.Value()); err != nil {
err = errors.Join(err, it.Close())
return err
}
}
return it.Close()
}
func stringCommonPrefixMatcher(key string, objVal []byte, filterVal string) bool {
return strings.HasPrefix(stringifyValue(key, objVal), filterVal)
}
func stringCommonPrefixMatcherBucket(r pebble.Reader, fKey string, fVal string, f func([]byte, []byte) error) error {
val, checkLast, ok := destringifyValue(fKey, fVal, true)
if !ok {
return nil
}
prefix := val
if checkLast {
prefix = val[:len(val)-1]
}
if len(val) == 0 {
it, err := r.NewIter(&pebble.IterOptions{})
if err != nil {
return err
}
for v := it.First(); v; v = it.Next() {
if err := f(it.Key(), it.Value()); err != nil {
err = errors.Join(err, it.Close())
return err
}
}
return it.Close()
}
it, err := r.NewIter(&pebble.IterOptions{
LowerBound: prefix,
})
if err != nil {
return err
}
for v := it.First(); v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() {
if checkLast && (len(it.Key()) == len(prefix) || it.Key()[len(prefix)]>>4 != val[len(val)-1]) {
// If the last byte doesn't match, this means the prefix does no longer match,
// so we need to break here.
break
}
if err := f(it.Key(), it.Value()); err != nil {
err = errors.Join(err, it.Close())
return err
}
}
return it.Close()
}
func unknownMatcher(_ string, _ []byte, _ string) bool {
return false
}
func unknownMatcherBucket(_ pebble.Reader, _ string, _ string, _ func([]byte, []byte) error) error {
return nil
}
// bucketKeyHelper returns byte representation of val that is used as a key
// in boltDB. Useful for getting filter values from unique and list indexes.
func bucketKeyHelper(hdr string, val string) []byte {