[#9999] metabase: Fix db engine to pebble in db.go and control.go
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
c229b08f46
commit
d0a5d96089
2 changed files with 94 additions and 225 deletions
|
@ -2,8 +2,8 @@ package meta
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
|
@ -11,7 +11,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
"go.etcd.io/bbolt"
|
||||
"github.com/cockroachdb/pebble"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -21,23 +21,7 @@ var ErrDegradedMode = logicerr.New("metabase is in a degraded mode")
|
|||
// ErrReadOnlyMode is returned when metabase is in a read-only mode.
|
||||
var ErrReadOnlyMode = logicerr.New("metabase is in a read-only mode")
|
||||
|
||||
var (
|
||||
mStaticBuckets = map[string]struct{}{
|
||||
string(containerVolumeBucketName): {},
|
||||
string(containerCounterBucketName): {},
|
||||
string(graveyardBucketName): {},
|
||||
string(garbageBucketName): {},
|
||||
string(shardInfoBucket): {},
|
||||
string(bucketNameLocked): {},
|
||||
}
|
||||
|
||||
// deprecatedBuckets buckets that are not used anymore.
|
||||
deprecatedBuckets = [][]byte{
|
||||
toMoveItBucketName,
|
||||
}
|
||||
)
|
||||
|
||||
// Open boltDB instance for metabase.
|
||||
// Open metabase.
|
||||
func (db *DB) Open(_ context.Context, m mode.Mode) error {
|
||||
db.modeMtx.Lock()
|
||||
defer db.modeMtx.Unlock()
|
||||
|
@ -47,147 +31,122 @@ func (db *DB) Open(_ context.Context, m mode.Mode) error {
|
|||
if m.NoMetabase() {
|
||||
return nil
|
||||
}
|
||||
|
||||
return db.openDB(m)
|
||||
}
|
||||
|
||||
func (db *DB) openDB(mode mode.Mode) error {
|
||||
err := util.MkdirAllX(filepath.Dir(db.info.Path), db.info.Permission)
|
||||
err := util.MkdirAllX(db.info.Path, db.info.Permission)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't create dir %s for metabase: %w", db.info.Path, err)
|
||||
}
|
||||
|
||||
db.log.Debug(logs.MetabaseCreatedDirectoryForMetabase, zap.String("path", db.info.Path))
|
||||
|
||||
if db.boltOptions == nil {
|
||||
opts := *bbolt.DefaultOptions
|
||||
db.boltOptions = &opts
|
||||
}
|
||||
db.boltOptions.ReadOnly = mode.ReadOnly()
|
||||
|
||||
return metaerr.Wrap(db.openBolt())
|
||||
return metaerr.Wrap(db.openDatabase(mode.ReadOnly()))
|
||||
}
|
||||
|
||||
func (db *DB) openBolt() error {
|
||||
var err error
|
||||
|
||||
db.database, err = bbolt.Open(db.info.Path, db.info.Permission, db.boltOptions)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't open boltDB database: %w", err)
|
||||
func (db *DB) pebbleOptions(readOnly bool) *pebble.Options {
|
||||
return &pebble.Options{
|
||||
ReadOnly: readOnly,
|
||||
}
|
||||
db.database.MaxBatchDelay = db.boltBatchDelay
|
||||
db.database.MaxBatchSize = db.boltBatchSize
|
||||
}
|
||||
|
||||
db.log.Debug(logs.MetabaseOpenedBoltDBInstanceForMetabase)
|
||||
func (db *DB) openDatabase(readOnly bool) error {
|
||||
opts := db.pebbleOptions(readOnly)
|
||||
|
||||
db.log.Debug(logs.MetabaseCheckingMetabaseVersion)
|
||||
return db.database.View(func(tx *bbolt.Tx) error {
|
||||
// The safest way to check if the metabase is fresh is to check if it has no buckets.
|
||||
// However, shard info can be present. So here we check that the number of buckets is
|
||||
// at most 1.
|
||||
// Another thing to consider is that tests do not persist shard ID, we want to support
|
||||
// this case too.
|
||||
var n int
|
||||
err := tx.ForEach(func([]byte, *bbolt.Bucket) error {
|
||||
if n++; n >= 2 { // do not iterate a lot
|
||||
return errBreakBucketForEach
|
||||
}
|
||||
return nil
|
||||
})
|
||||
var err error
|
||||
db.database, err = pebble.Open(db.info.Path, opts)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't open badger database: %w", err)
|
||||
}
|
||||
|
||||
if err == errBreakBucketForEach {
|
||||
db.initialized = true
|
||||
err = nil
|
||||
return db.snapshot(func(s *pebble.Snapshot) error {
|
||||
data, err := valueSafe(s, shardInfoKey(versionKey))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return err
|
||||
db.initialized = len(data) > 0
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// Init initializes metabase. It creates static (CID-independent) buckets in underlying BoltDB instance.
|
||||
// Init initializes metabase.
|
||||
//
|
||||
// Returns ErrOutdatedVersion if a database at the provided path is outdated.
|
||||
//
|
||||
// Does nothing if metabase has already been initialized and filled. To roll back the database to its initial state,
|
||||
// use Reset.
|
||||
func (db *DB) Init() error {
|
||||
return metaerr.Wrap(db.init(false))
|
||||
db.modeMtx.Lock()
|
||||
defer db.modeMtx.Unlock()
|
||||
|
||||
return metaerr.Wrap(db.init(context.TODO(), false))
|
||||
}
|
||||
|
||||
// Reset resets metabase. Works similar to Init but cleans up all static buckets and
|
||||
// removes all dynamic (CID-dependent) ones in non-blank BoltDB instances.
|
||||
func (db *DB) Reset() error {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
func (db *DB) Init2(ctx context.Context) error {
|
||||
db.modeMtx.Lock()
|
||||
defer db.modeMtx.Unlock()
|
||||
|
||||
return metaerr.Wrap(db.init(ctx, false))
|
||||
}
|
||||
|
||||
// Reset resets metabase. Works similar to Init but cleans up all data records.
|
||||
func (db *DB) Reset(ctx context.Context) error {
|
||||
db.modeMtx.Lock()
|
||||
defer db.modeMtx.Unlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
return metaerr.Wrap(db.init(true))
|
||||
return metaerr.Wrap(db.init(ctx, true))
|
||||
}
|
||||
|
||||
func (db *DB) init(reset bool) error {
|
||||
func (db *DB) init(ctx context.Context, reset bool) error {
|
||||
if db.mode.NoMetabase() || db.mode.ReadOnly() {
|
||||
return nil
|
||||
}
|
||||
|
||||
return db.database.Update(func(tx *bbolt.Tx) error {
|
||||
var err error
|
||||
if !reset {
|
||||
// Normal open, check version and update if not initialized.
|
||||
err := checkVersion(tx, db.initialized)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if reset {
|
||||
if err := db.reset(); err != nil {
|
||||
return err
|
||||
}
|
||||
for k := range mStaticBuckets {
|
||||
name := []byte(k)
|
||||
if reset {
|
||||
err := tx.DeleteBucket(name)
|
||||
if err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) {
|
||||
return fmt.Errorf("could not delete static bucket %s: %w", k, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_, err := tx.CreateBucketIfNotExists(name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create static bucket %s: %w", k, err)
|
||||
}
|
||||
return db.batch(func(b *pebble.Batch) error {
|
||||
err := checkVersion(b, db.initialized)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = syncCounter(ctx, b, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not sync object counter: %w", err)
|
||||
}
|
||||
|
||||
for _, b := range deprecatedBuckets {
|
||||
err := tx.DeleteBucket(b)
|
||||
if err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) {
|
||||
return fmt.Errorf("could not delete deprecated bucket %s: %w", string(b), err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
if !reset { // counters will be recalculated by refill metabase
|
||||
err = syncCounter(tx, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not sync object counter: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
bucketCursor := tx.Cursor()
|
||||
name, _ := bucketCursor.First()
|
||||
for name != nil {
|
||||
if _, ok := mStaticBuckets[string(name)]; !ok {
|
||||
if err := tx.DeleteBucket(name); err != nil {
|
||||
return err
|
||||
}
|
||||
name, _ = bucketCursor.Seek(name)
|
||||
continue
|
||||
}
|
||||
name, _ = bucketCursor.Next()
|
||||
}
|
||||
return updateVersion(tx, version)
|
||||
func (db *DB) reset() error {
|
||||
if err := db.database.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
db.database = nil
|
||||
if err := os.RemoveAll(db.info.Path); err != nil {
|
||||
return err
|
||||
}
|
||||
var err error
|
||||
db.database, err = pebble.Open(db.info.Path, db.pebbleOptions(false))
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't open badger database: %w", err)
|
||||
}
|
||||
return db.batch(func(b *pebble.Batch) error {
|
||||
return updateVersion(b, version)
|
||||
})
|
||||
}
|
||||
|
||||
// SyncCounters forces to synchronize the object counters.
|
||||
func (db *DB) SyncCounters() error {
|
||||
func (db *DB) SyncCounters(ctx context.Context) error {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
|
@ -197,26 +156,29 @@ func (db *DB) SyncCounters() error {
|
|||
return ErrReadOnlyMode
|
||||
}
|
||||
|
||||
return metaerr.Wrap(db.database.Update(func(tx *bbolt.Tx) error {
|
||||
return syncCounter(tx, true)
|
||||
return metaerr.Wrap(db.batch(func(b *pebble.Batch) error {
|
||||
return syncCounter(ctx, b, true)
|
||||
}))
|
||||
}
|
||||
|
||||
// Close closes boltDB instance
|
||||
// and reports metabase metric.
|
||||
// Close closes metabase.
|
||||
func (db *DB) Close() error {
|
||||
var err error
|
||||
if db.database != nil {
|
||||
err = db.close()
|
||||
}
|
||||
if err == nil {
|
||||
db.metrics.Close()
|
||||
}
|
||||
return err
|
||||
db.modeMtx.Lock()
|
||||
defer db.modeMtx.Unlock()
|
||||
|
||||
return db.close()
|
||||
}
|
||||
|
||||
func (db *DB) close() error {
|
||||
return metaerr.Wrap(db.database.Close())
|
||||
var err error
|
||||
if db.database != nil {
|
||||
err = metaerr.Wrap(db.database.Close())
|
||||
}
|
||||
if err == nil {
|
||||
db.database = nil
|
||||
db.metrics.Close()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Reload reloads part of the configuration.
|
||||
|
@ -235,14 +197,14 @@ func (db *DB) Reload(opts ...Option) (bool, error) {
|
|||
defer db.modeMtx.Unlock()
|
||||
|
||||
if db.mode.NoMetabase() || c.info.Path != "" && filepath.Clean(db.info.Path) != filepath.Clean(c.info.Path) {
|
||||
if err := db.Close(); err != nil {
|
||||
if err := db.close(); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
db.mode = mode.Disabled
|
||||
db.metrics.SetMode(mode.ComponentDisabled)
|
||||
db.info.Path = c.info.Path
|
||||
if err := db.openBolt(); err != nil {
|
||||
if err := db.openDatabase(false); err != nil {
|
||||
return false, metaerr.Wrap(fmt.Errorf("%w: %v", ErrDegradedMode, err))
|
||||
}
|
||||
|
||||
|
|
|
@ -1,10 +1,8 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"io/fs"
|
||||
"os"
|
||||
"strconv"
|
||||
|
@ -85,20 +83,16 @@ func New(opts ...Option) *DB {
|
|||
cfg: c,
|
||||
matchers: map[objectSDK.SearchMatchType]matcher{
|
||||
objectSDK.MatchUnknown: {
|
||||
matchSlow: unknownMatcher,
|
||||
matchBucket: unknownMatcherBucket,
|
||||
matchSlow: unknownMatcher,
|
||||
},
|
||||
objectSDK.MatchStringEqual: {
|
||||
matchSlow: stringEqualMatcher,
|
||||
matchBucket: stringEqualMatcherBucket,
|
||||
matchSlow: stringEqualMatcher,
|
||||
},
|
||||
objectSDK.MatchStringNotEqual: {
|
||||
matchSlow: stringNotEqualMatcher,
|
||||
matchBucket: stringNotEqualMatcherBucket,
|
||||
matchSlow: stringNotEqualMatcher,
|
||||
},
|
||||
objectSDK.MatchCommonPrefix: {
|
||||
matchSlow: stringCommonPrefixMatcher,
|
||||
matchBucket: stringCommonPrefixMatcherBucket,
|
||||
matchSlow: stringCommonPrefixMatcher,
|
||||
},
|
||||
},
|
||||
mode: mode.Disabled,
|
||||
|
@ -180,105 +174,18 @@ func stringEqualMatcher(key string, objVal []byte, filterVal string) bool {
|
|||
return stringifyValue(key, objVal) == filterVal
|
||||
}
|
||||
|
||||
func stringEqualMatcherBucket(r pebble.Reader, fKey string, fValue string, f func([]byte, []byte) error) error {
|
||||
// Ignore the second return value because we check for strict equality.
|
||||
val, _, ok := destringifyValue(fKey, fValue, false)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
data, err := valueSafe(r, val)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if data != nil {
|
||||
return f(val, data)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func stringNotEqualMatcher(key string, objVal []byte, filterVal string) bool {
|
||||
return stringifyValue(key, objVal) != filterVal
|
||||
}
|
||||
|
||||
func stringNotEqualMatcherBucket(r pebble.Reader, fKey string, fValue string, f func([]byte, []byte) error) error {
|
||||
// Ignore the second return value because we check for strict inequality.
|
||||
val, _, ok := destringifyValue(fKey, fValue, false)
|
||||
it, err := r.NewIter(&pebble.IterOptions{
|
||||
SkipPoint: func(k []byte) bool {
|
||||
return ok && bytes.Equal(val, k)
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for v := it.First(); v; v = it.Next() {
|
||||
if err := f(it.Key(), it.Value()); err != nil {
|
||||
err = errors.Join(err, it.Close())
|
||||
return err
|
||||
}
|
||||
}
|
||||
return it.Close()
|
||||
}
|
||||
|
||||
func stringCommonPrefixMatcher(key string, objVal []byte, filterVal string) bool {
|
||||
return strings.HasPrefix(stringifyValue(key, objVal), filterVal)
|
||||
}
|
||||
|
||||
func stringCommonPrefixMatcherBucket(r pebble.Reader, fKey string, fVal string, f func([]byte, []byte) error) error {
|
||||
val, checkLast, ok := destringifyValue(fKey, fVal, true)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
prefix := val
|
||||
if checkLast {
|
||||
prefix = val[:len(val)-1]
|
||||
}
|
||||
|
||||
if len(val) == 0 {
|
||||
it, err := r.NewIter(&pebble.IterOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for v := it.First(); v; v = it.Next() {
|
||||
if err := f(it.Key(), it.Value()); err != nil {
|
||||
err = errors.Join(err, it.Close())
|
||||
return err
|
||||
}
|
||||
}
|
||||
return it.Close()
|
||||
}
|
||||
|
||||
it, err := r.NewIter(&pebble.IterOptions{
|
||||
LowerBound: prefix,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for v := it.First(); v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() {
|
||||
if checkLast && (len(it.Key()) == len(prefix) || it.Key()[len(prefix)]>>4 != val[len(val)-1]) {
|
||||
// If the last byte doesn't match, this means the prefix does no longer match,
|
||||
// so we need to break here.
|
||||
break
|
||||
}
|
||||
|
||||
if err := f(it.Key(), it.Value()); err != nil {
|
||||
err = errors.Join(err, it.Close())
|
||||
return err
|
||||
}
|
||||
}
|
||||
return it.Close()
|
||||
}
|
||||
|
||||
func unknownMatcher(_ string, _ []byte, _ string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func unknownMatcherBucket(_ pebble.Reader, _ string, _ string, _ func([]byte, []byte) error) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// bucketKeyHelper returns byte representation of val that is used as a key
|
||||
// in boltDB. Useful for getting filter values from unique and list indexes.
|
||||
func bucketKeyHelper(hdr string, val string) []byte {
|
||||
|
|
Loading…
Reference in a new issue