From d0a5d9608962e4af570f9ae0ac4c7948ae466827 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 2 Jul 2024 15:33:02 +0300 Subject: [PATCH] [#9999] metabase: Fix db engine to pebble in db.go and control.go Signed-off-by: Dmitrii Stepanov --- pkg/local_object_storage/metabase/control.go | 218 ++++++++----------- pkg/local_object_storage/metabase/db.go | 101 +-------- 2 files changed, 94 insertions(+), 225 deletions(-) diff --git a/pkg/local_object_storage/metabase/control.go b/pkg/local_object_storage/metabase/control.go index 0e280a5f6..da543df63 100644 --- a/pkg/local_object_storage/metabase/control.go +++ b/pkg/local_object_storage/metabase/control.go @@ -2,8 +2,8 @@ package meta import ( "context" - "errors" "fmt" + "os" "path/filepath" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" @@ -11,7 +11,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" - "go.etcd.io/bbolt" + "github.com/cockroachdb/pebble" "go.uber.org/zap" ) @@ -21,23 +21,7 @@ var ErrDegradedMode = logicerr.New("metabase is in a degraded mode") // ErrReadOnlyMode is returned when metabase is in a read-only mode. var ErrReadOnlyMode = logicerr.New("metabase is in a read-only mode") -var ( - mStaticBuckets = map[string]struct{}{ - string(containerVolumeBucketName): {}, - string(containerCounterBucketName): {}, - string(graveyardBucketName): {}, - string(garbageBucketName): {}, - string(shardInfoBucket): {}, - string(bucketNameLocked): {}, - } - - // deprecatedBuckets buckets that are not used anymore. - deprecatedBuckets = [][]byte{ - toMoveItBucketName, - } -) - -// Open boltDB instance for metabase. +// Open metabase. func (db *DB) Open(_ context.Context, m mode.Mode) error { db.modeMtx.Lock() defer db.modeMtx.Unlock() @@ -47,147 +31,122 @@ func (db *DB) Open(_ context.Context, m mode.Mode) error { if m.NoMetabase() { return nil } + return db.openDB(m) } func (db *DB) openDB(mode mode.Mode) error { - err := util.MkdirAllX(filepath.Dir(db.info.Path), db.info.Permission) + err := util.MkdirAllX(db.info.Path, db.info.Permission) if err != nil { return fmt.Errorf("can't create dir %s for metabase: %w", db.info.Path, err) } db.log.Debug(logs.MetabaseCreatedDirectoryForMetabase, zap.String("path", db.info.Path)) - if db.boltOptions == nil { - opts := *bbolt.DefaultOptions - db.boltOptions = &opts - } - db.boltOptions.ReadOnly = mode.ReadOnly() - - return metaerr.Wrap(db.openBolt()) + return metaerr.Wrap(db.openDatabase(mode.ReadOnly())) } -func (db *DB) openBolt() error { - var err error - - db.database, err = bbolt.Open(db.info.Path, db.info.Permission, db.boltOptions) - if err != nil { - return fmt.Errorf("can't open boltDB database: %w", err) +func (db *DB) pebbleOptions(readOnly bool) *pebble.Options { + return &pebble.Options{ + ReadOnly: readOnly, } - db.database.MaxBatchDelay = db.boltBatchDelay - db.database.MaxBatchSize = db.boltBatchSize +} - db.log.Debug(logs.MetabaseOpenedBoltDBInstanceForMetabase) +func (db *DB) openDatabase(readOnly bool) error { + opts := db.pebbleOptions(readOnly) - db.log.Debug(logs.MetabaseCheckingMetabaseVersion) - return db.database.View(func(tx *bbolt.Tx) error { - // The safest way to check if the metabase is fresh is to check if it has no buckets. - // However, shard info can be present. So here we check that the number of buckets is - // at most 1. - // Another thing to consider is that tests do not persist shard ID, we want to support - // this case too. - var n int - err := tx.ForEach(func([]byte, *bbolt.Bucket) error { - if n++; n >= 2 { // do not iterate a lot - return errBreakBucketForEach - } - return nil - }) + var err error + db.database, err = pebble.Open(db.info.Path, opts) + if err != nil { + return fmt.Errorf("can't open badger database: %w", err) + } - if err == errBreakBucketForEach { - db.initialized = true - err = nil + return db.snapshot(func(s *pebble.Snapshot) error { + data, err := valueSafe(s, shardInfoKey(versionKey)) + if err != nil { + return err } - return err + db.initialized = len(data) > 0 + return nil }) } -// Init initializes metabase. It creates static (CID-independent) buckets in underlying BoltDB instance. +// Init initializes metabase. // // Returns ErrOutdatedVersion if a database at the provided path is outdated. // // Does nothing if metabase has already been initialized and filled. To roll back the database to its initial state, // use Reset. func (db *DB) Init() error { - return metaerr.Wrap(db.init(false)) + db.modeMtx.Lock() + defer db.modeMtx.Unlock() + + return metaerr.Wrap(db.init(context.TODO(), false)) } -// Reset resets metabase. Works similar to Init but cleans up all static buckets and -// removes all dynamic (CID-dependent) ones in non-blank BoltDB instances. -func (db *DB) Reset() error { - db.modeMtx.RLock() - defer db.modeMtx.RUnlock() +func (db *DB) Init2(ctx context.Context) error { + db.modeMtx.Lock() + defer db.modeMtx.Unlock() + + return metaerr.Wrap(db.init(ctx, false)) +} + +// Reset resets metabase. Works similar to Init but cleans up all data records. +func (db *DB) Reset(ctx context.Context) error { + db.modeMtx.Lock() + defer db.modeMtx.Unlock() if db.mode.NoMetabase() { return ErrDegradedMode } - return metaerr.Wrap(db.init(true)) + return metaerr.Wrap(db.init(ctx, true)) } -func (db *DB) init(reset bool) error { +func (db *DB) init(ctx context.Context, reset bool) error { if db.mode.NoMetabase() || db.mode.ReadOnly() { return nil } - - return db.database.Update(func(tx *bbolt.Tx) error { - var err error - if !reset { - // Normal open, check version and update if not initialized. - err := checkVersion(tx, db.initialized) - if err != nil { - return err - } + if reset { + if err := db.reset(); err != nil { + return err } - for k := range mStaticBuckets { - name := []byte(k) - if reset { - err := tx.DeleteBucket(name) - if err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) { - return fmt.Errorf("could not delete static bucket %s: %w", k, err) - } - } + } - _, err := tx.CreateBucketIfNotExists(name) - if err != nil { - return fmt.Errorf("could not create static bucket %s: %w", k, err) - } + return db.batch(func(b *pebble.Batch) error { + err := checkVersion(b, db.initialized) + if err != nil { + return err + } + err = syncCounter(ctx, b, false) + if err != nil { + return fmt.Errorf("could not sync object counter: %w", err) } - for _, b := range deprecatedBuckets { - err := tx.DeleteBucket(b) - if err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) { - return fmt.Errorf("could not delete deprecated bucket %s: %w", string(b), err) - } - } + return nil + }) +} - if !reset { // counters will be recalculated by refill metabase - err = syncCounter(tx, false) - if err != nil { - return fmt.Errorf("could not sync object counter: %w", err) - } - - return nil - } - - bucketCursor := tx.Cursor() - name, _ := bucketCursor.First() - for name != nil { - if _, ok := mStaticBuckets[string(name)]; !ok { - if err := tx.DeleteBucket(name); err != nil { - return err - } - name, _ = bucketCursor.Seek(name) - continue - } - name, _ = bucketCursor.Next() - } - return updateVersion(tx, version) +func (db *DB) reset() error { + if err := db.database.Close(); err != nil { + return err + } + db.database = nil + if err := os.RemoveAll(db.info.Path); err != nil { + return err + } + var err error + db.database, err = pebble.Open(db.info.Path, db.pebbleOptions(false)) + if err != nil { + return fmt.Errorf("can't open badger database: %w", err) + } + return db.batch(func(b *pebble.Batch) error { + return updateVersion(b, version) }) } // SyncCounters forces to synchronize the object counters. -func (db *DB) SyncCounters() error { +func (db *DB) SyncCounters(ctx context.Context) error { db.modeMtx.RLock() defer db.modeMtx.RUnlock() @@ -197,26 +156,29 @@ func (db *DB) SyncCounters() error { return ErrReadOnlyMode } - return metaerr.Wrap(db.database.Update(func(tx *bbolt.Tx) error { - return syncCounter(tx, true) + return metaerr.Wrap(db.batch(func(b *pebble.Batch) error { + return syncCounter(ctx, b, true) })) } -// Close closes boltDB instance -// and reports metabase metric. +// Close closes metabase. func (db *DB) Close() error { - var err error - if db.database != nil { - err = db.close() - } - if err == nil { - db.metrics.Close() - } - return err + db.modeMtx.Lock() + defer db.modeMtx.Unlock() + + return db.close() } func (db *DB) close() error { - return metaerr.Wrap(db.database.Close()) + var err error + if db.database != nil { + err = metaerr.Wrap(db.database.Close()) + } + if err == nil { + db.database = nil + db.metrics.Close() + } + return err } // Reload reloads part of the configuration. @@ -235,14 +197,14 @@ func (db *DB) Reload(opts ...Option) (bool, error) { defer db.modeMtx.Unlock() if db.mode.NoMetabase() || c.info.Path != "" && filepath.Clean(db.info.Path) != filepath.Clean(c.info.Path) { - if err := db.Close(); err != nil { + if err := db.close(); err != nil { return false, err } db.mode = mode.Disabled db.metrics.SetMode(mode.ComponentDisabled) db.info.Path = c.info.Path - if err := db.openBolt(); err != nil { + if err := db.openDatabase(false); err != nil { return false, metaerr.Wrap(fmt.Errorf("%w: %v", ErrDegradedMode, err)) } diff --git a/pkg/local_object_storage/metabase/db.go b/pkg/local_object_storage/metabase/db.go index 16a191781..84653f721 100644 --- a/pkg/local_object_storage/metabase/db.go +++ b/pkg/local_object_storage/metabase/db.go @@ -1,10 +1,8 @@ package meta import ( - "bytes" "encoding/binary" "encoding/hex" - "errors" "io/fs" "os" "strconv" @@ -85,20 +83,16 @@ func New(opts ...Option) *DB { cfg: c, matchers: map[objectSDK.SearchMatchType]matcher{ objectSDK.MatchUnknown: { - matchSlow: unknownMatcher, - matchBucket: unknownMatcherBucket, + matchSlow: unknownMatcher, }, objectSDK.MatchStringEqual: { - matchSlow: stringEqualMatcher, - matchBucket: stringEqualMatcherBucket, + matchSlow: stringEqualMatcher, }, objectSDK.MatchStringNotEqual: { - matchSlow: stringNotEqualMatcher, - matchBucket: stringNotEqualMatcherBucket, + matchSlow: stringNotEqualMatcher, }, objectSDK.MatchCommonPrefix: { - matchSlow: stringCommonPrefixMatcher, - matchBucket: stringCommonPrefixMatcherBucket, + matchSlow: stringCommonPrefixMatcher, }, }, mode: mode.Disabled, @@ -180,105 +174,18 @@ func stringEqualMatcher(key string, objVal []byte, filterVal string) bool { return stringifyValue(key, objVal) == filterVal } -func stringEqualMatcherBucket(r pebble.Reader, fKey string, fValue string, f func([]byte, []byte) error) error { - // Ignore the second return value because we check for strict equality. - val, _, ok := destringifyValue(fKey, fValue, false) - if !ok { - return nil - } - data, err := valueSafe(r, val) - if err != nil { - return err - } - if data != nil { - return f(val, data) - } - return nil -} - func stringNotEqualMatcher(key string, objVal []byte, filterVal string) bool { return stringifyValue(key, objVal) != filterVal } -func stringNotEqualMatcherBucket(r pebble.Reader, fKey string, fValue string, f func([]byte, []byte) error) error { - // Ignore the second return value because we check for strict inequality. - val, _, ok := destringifyValue(fKey, fValue, false) - it, err := r.NewIter(&pebble.IterOptions{ - SkipPoint: func(k []byte) bool { - return ok && bytes.Equal(val, k) - }, - }) - if err != nil { - return err - } - - for v := it.First(); v; v = it.Next() { - if err := f(it.Key(), it.Value()); err != nil { - err = errors.Join(err, it.Close()) - return err - } - } - return it.Close() -} - func stringCommonPrefixMatcher(key string, objVal []byte, filterVal string) bool { return strings.HasPrefix(stringifyValue(key, objVal), filterVal) } -func stringCommonPrefixMatcherBucket(r pebble.Reader, fKey string, fVal string, f func([]byte, []byte) error) error { - val, checkLast, ok := destringifyValue(fKey, fVal, true) - if !ok { - return nil - } - - prefix := val - if checkLast { - prefix = val[:len(val)-1] - } - - if len(val) == 0 { - it, err := r.NewIter(&pebble.IterOptions{}) - if err != nil { - return err - } - for v := it.First(); v; v = it.Next() { - if err := f(it.Key(), it.Value()); err != nil { - err = errors.Join(err, it.Close()) - return err - } - } - return it.Close() - } - - it, err := r.NewIter(&pebble.IterOptions{ - LowerBound: prefix, - }) - if err != nil { - return err - } - for v := it.First(); v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() { - if checkLast && (len(it.Key()) == len(prefix) || it.Key()[len(prefix)]>>4 != val[len(val)-1]) { - // If the last byte doesn't match, this means the prefix does no longer match, - // so we need to break here. - break - } - - if err := f(it.Key(), it.Value()); err != nil { - err = errors.Join(err, it.Close()) - return err - } - } - return it.Close() -} - func unknownMatcher(_ string, _ []byte, _ string) bool { return false } -func unknownMatcherBucket(_ pebble.Reader, _ string, _ string, _ func([]byte, []byte) error) error { - return nil -} - // bucketKeyHelper returns byte representation of val that is used as a key // in boltDB. Useful for getting filter values from unique and list indexes. func bucketKeyHelper(hdr string, val string) []byte {