diff --git a/go.mod b/go.mod index 09a098502..e2cd2cdac 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02 github.com/cheggaaa/pb v1.0.29 github.com/chzyer/readline v1.5.1 + github.com/cockroachdb/pebble v1.1.1 github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568 github.com/go-pkgz/expirable-cache/v3 v3.0.0 github.com/google/uuid v1.6.0 @@ -55,11 +56,17 @@ require ( require ( git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect + github.com/DataDog/zstd v1.4.5 // indirect github.com/antlr4-go/antlr/v4 v4.13.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.13.0 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/cockroachdb/errors v1.11.3 // indirect + github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce // indirect + github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect + github.com/cockroachdb/redact v1.1.5 // indirect + github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect github.com/consensys/bavard v0.1.13 // indirect github.com/consensys/gnark-crypto v0.12.2-0.20231222162921-eb75782795d2 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect @@ -67,9 +74,11 @@ require ( github.com/davidmz/go-pageant v1.0.2 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect + github.com/getsentry/sentry-go v0.27.0 // indirect github.com/go-fed/httpsig v1.1.0 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/gorilla/websocket v1.5.1 // indirect github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0 // indirect @@ -83,6 +92,8 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/klauspost/cpuid/v2 v2.2.6 // indirect github.com/klauspost/reedsolomon v1.12.1 // indirect + github.com/kr/pretty v0.3.1 // indirect + github.com/kr/text v0.2.0 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-runewidth v0.0.15 // indirect @@ -98,11 +109,13 @@ require ( github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20240521091047-78685785716d // indirect github.com/nspcc-dev/rfc6979 v0.2.1 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/rivo/uniseg v0.4.4 // indirect + github.com/rogpeppe/go-internal v1.11.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/afero v1.11.0 // indirect diff --git a/go.sum b/go.sum index 1034ff61f..02ad01962 100644 Binary files a/go.sum and b/go.sum differ diff --git a/pkg/local_object_storage/metabase/children.go b/pkg/local_object_storage/metabase/children.go index acd367951..a1889d911 100644 --- a/pkg/local_object_storage/metabase/children.go +++ b/pkg/local_object_storage/metabase/children.go @@ -39,7 +39,7 @@ func (db *DB) GetChildren(ctx context.Context, addresses []oid.Address) (map[oid result := make(map[oid.Address][]oid.Address, len(addresses)) buffer := make([]byte, bucketKeySize) - err := db.boltDB.View(func(tx *bbolt.Tx) error { + err := db.database.View(func(tx *bbolt.Tx) error { for _, addr := range addresses { if _, found := result[addr]; found { continue diff --git a/pkg/local_object_storage/metabase/containers.go b/pkg/local_object_storage/metabase/containers.go index 472b2affc..5fae1f72e 100644 --- a/pkg/local_object_storage/metabase/containers.go +++ b/pkg/local_object_storage/metabase/containers.go @@ -30,7 +30,7 @@ func (db *DB) Containers(ctx context.Context) (list []cid.ID, err error) { return nil, ErrDegradedMode } - err = db.boltDB.View(func(tx *bbolt.Tx) error { + err = db.database.View(func(tx *bbolt.Tx) error { list, err = db.containers(tx) return err @@ -64,7 +64,7 @@ func (db *DB) ContainerSize(id cid.ID) (size uint64, err error) { return 0, ErrDegradedMode } - err = db.boltDB.View(func(tx *bbolt.Tx) error { + err = db.database.View(func(tx *bbolt.Tx) error { size, err = db.containerSize(tx, id) return err diff --git a/pkg/local_object_storage/metabase/control.go b/pkg/local_object_storage/metabase/control.go index 891a1e9b2..0e280a5f6 100644 --- a/pkg/local_object_storage/metabase/control.go +++ b/pkg/local_object_storage/metabase/control.go @@ -70,17 +70,17 @@ func (db *DB) openDB(mode mode.Mode) error { func (db *DB) openBolt() error { var err error - db.boltDB, err = bbolt.Open(db.info.Path, db.info.Permission, db.boltOptions) + db.database, err = bbolt.Open(db.info.Path, db.info.Permission, db.boltOptions) if err != nil { return fmt.Errorf("can't open boltDB database: %w", err) } - db.boltDB.MaxBatchDelay = db.boltBatchDelay - db.boltDB.MaxBatchSize = db.boltBatchSize + db.database.MaxBatchDelay = db.boltBatchDelay + db.database.MaxBatchSize = db.boltBatchSize db.log.Debug(logs.MetabaseOpenedBoltDBInstanceForMetabase) db.log.Debug(logs.MetabaseCheckingMetabaseVersion) - return db.boltDB.View(func(tx *bbolt.Tx) error { + return db.database.View(func(tx *bbolt.Tx) error { // The safest way to check if the metabase is fresh is to check if it has no buckets. // However, shard info can be present. So here we check that the number of buckets is // at most 1. @@ -130,7 +130,7 @@ func (db *DB) init(reset bool) error { return nil } - return db.boltDB.Update(func(tx *bbolt.Tx) error { + return db.database.Update(func(tx *bbolt.Tx) error { var err error if !reset { // Normal open, check version and update if not initialized. @@ -197,7 +197,7 @@ func (db *DB) SyncCounters() error { return ErrReadOnlyMode } - return metaerr.Wrap(db.boltDB.Update(func(tx *bbolt.Tx) error { + return metaerr.Wrap(db.database.Update(func(tx *bbolt.Tx) error { return syncCounter(tx, true) })) } @@ -206,7 +206,7 @@ func (db *DB) SyncCounters() error { // and reports metabase metric. func (db *DB) Close() error { var err error - if db.boltDB != nil { + if db.database != nil { err = db.close() } if err == nil { @@ -216,7 +216,7 @@ func (db *DB) Close() error { } func (db *DB) close() error { - return metaerr.Wrap(db.boltDB.Close()) + return metaerr.Wrap(db.database.Close()) } // Reload reloads part of the configuration. diff --git a/pkg/local_object_storage/metabase/counter.go b/pkg/local_object_storage/metabase/counter.go index 275099ff2..da3cfae26 100644 --- a/pkg/local_object_storage/metabase/counter.go +++ b/pkg/local_object_storage/metabase/counter.go @@ -63,7 +63,7 @@ func (db *DB) ObjectCounters() (cc ObjectCounters, err error) { return ObjectCounters{}, ErrDegradedMode } - err = db.boltDB.View(func(tx *bbolt.Tx) error { + err = db.database.View(func(tx *bbolt.Tx) error { b := tx.Bucket(shardInfoBucket) if b != nil { data := b.Get(objectPhyCounterKey) @@ -151,7 +151,7 @@ func (db *DB) containerCountersNextBatch(lastKey []byte, f func(id cid.ID, entit counter := 0 const batchSize = 1000 - err := db.boltDB.View(func(tx *bbolt.Tx) error { + err := db.database.View(func(tx *bbolt.Tx) error { b := tx.Bucket(containerCounterBucketName) if b == nil { return ErrInterruptIterator @@ -215,7 +215,7 @@ func (db *DB) ContainerCount(ctx context.Context, id cid.ID) (ObjectCounters, er var result ObjectCounters - err := db.boltDB.View(func(tx *bbolt.Tx) error { + err := db.database.View(func(tx *bbolt.Tx) error { b := tx.Bucket(containerCounterBucketName) key := make([]byte, cidSize) id.Encode(key) @@ -591,7 +591,7 @@ func (db *DB) containerSizesNextBatch(lastKey []byte, f func(cid.ID, uint64)) (b counter := 0 const batchSize = 1000 - err := db.boltDB.View(func(tx *bbolt.Tx) error { + err := db.database.View(func(tx *bbolt.Tx) error { b := tx.Bucket(containerVolumeBucketName) c := b.Cursor() var key, value []byte @@ -654,7 +654,7 @@ func (db *DB) DeleteContainerSize(ctx context.Context, id cid.ID) error { return ErrReadOnlyMode } - err := db.boltDB.Update(func(tx *bbolt.Tx) error { + err := db.database.Update(func(tx *bbolt.Tx) error { b := tx.Bucket(containerVolumeBucketName) key := make([]byte, cidSize) @@ -737,7 +737,7 @@ func (db *DB) DeleteContainerCount(ctx context.Context, id cid.ID) error { return ErrReadOnlyMode } - err := db.boltDB.Update(func(tx *bbolt.Tx) error { + err := db.database.Update(func(tx *bbolt.Tx) error { b := tx.Bucket(containerCounterBucketName) key := make([]byte, cidSize) diff --git a/pkg/local_object_storage/metabase/db.go b/pkg/local_object_storage/metabase/db.go index 1f444a3ef..21f5d34c2 100644 --- a/pkg/local_object_storage/metabase/db.go +++ b/pkg/local_object_storage/metabase/db.go @@ -4,25 +4,25 @@ import ( "bytes" "encoding/binary" "encoding/hex" + "errors" "io/fs" "os" "strconv" "strings" "sync" - "time" v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "github.com/cockroachdb/pebble" "github.com/mr-tron/base58" - "go.etcd.io/bbolt" "go.uber.org/zap" ) type matcher struct { matchSlow func(string, []byte, string) bool - matchBucket func(*bbolt.Bucket, string, string, func([]byte, []byte) error) error + matchBucket func(pebble.Reader, string, string, func([]byte, []byte) error) error } // EpochState is an interface that provides access to the @@ -41,7 +41,7 @@ type DB struct { matchers map[objectSDK.SearchMatchType]matcher - boltDB *bbolt.DB + database *pebble.DB initialized bool } @@ -50,10 +50,7 @@ type DB struct { type Option func(*cfg) type cfg struct { - boltOptions *bbolt.Options // optional - - boltBatchSize int - boltBatchDelay time.Duration + dbOptions *pebble.Options // optional info Info @@ -68,10 +65,9 @@ func defaultCfg() *cfg { info: Info{ Permission: os.ModePerm, // 0777 }, - boltBatchDelay: bbolt.DefaultMaxBatchDelay, - boltBatchSize: bbolt.DefaultMaxBatchSize, - log: &logger.Logger{Logger: zap.L()}, - metrics: &noopMetrics{}, + log: &logger.Logger{Logger: zap.L()}, + metrics: &noopMetrics{}, + dbOptions: &pebble.Options{}, } } @@ -186,17 +182,18 @@ func stringEqualMatcher(key string, objVal []byte, filterVal string) bool { return stringifyValue(key, objVal) == filterVal } -func stringEqualMatcherBucket(b *bbolt.Bucket, fKey string, fValue string, f func([]byte, []byte) error) error { +func stringEqualMatcherBucket(r pebble.Reader, fKey string, fValue string, f func([]byte, []byte) error) error { // Ignore the second return value because we check for strict equality. val, _, ok := destringifyValue(fKey, fValue, false) if !ok { return nil } - if data := b.Get(val); data != nil { - return f(val, data) + data, err := getSafe(r, val) + if err != nil { + return err } - if b.Bucket(val) != nil { - return f(val, nil) + if data != nil { + return f(val, data) } return nil } @@ -205,22 +202,33 @@ func stringNotEqualMatcher(key string, objVal []byte, filterVal string) bool { return stringifyValue(key, objVal) != filterVal } -func stringNotEqualMatcherBucket(b *bbolt.Bucket, fKey string, fValue string, f func([]byte, []byte) error) error { +func stringNotEqualMatcherBucket(r pebble.Reader, fKey string, fValue string, f func([]byte, []byte) error) error { // Ignore the second return value because we check for strict inequality. val, _, ok := destringifyValue(fKey, fValue, false) - return b.ForEach(func(k, v []byte) error { - if !ok || !bytes.Equal(val, k) { - return f(k, v) - } - return nil + it, err := r.NewIter(&pebble.IterOptions{ + SkipPoint: func(k []byte) bool { + return ok && bytes.Equal(val, k) + }, + OnlyReadGuaranteedDurable: true, }) + if err != nil { + return err + } + + for v := it.First(); v; v = it.Next() { + if err := f(it.Key(), it.Value()); err != nil { + err = errors.Join(err, it.Close()) + return err + } + } + return it.Close() } func stringCommonPrefixMatcher(key string, objVal []byte, filterVal string) bool { return strings.HasPrefix(stringifyValue(key, objVal), filterVal) } -func stringCommonPrefixMatcherBucket(b *bbolt.Bucket, fKey string, fVal string, f func([]byte, []byte) error) error { +func stringCommonPrefixMatcherBucket(r pebble.Reader, fKey string, fVal string, f func([]byte, []byte) error) error { val, checkLast, ok := destringifyValue(fKey, fVal, true) if !ok { return nil @@ -232,30 +240,48 @@ func stringCommonPrefixMatcherBucket(b *bbolt.Bucket, fKey string, fVal string, } if len(val) == 0 { - // empty common prefix, all the objects - // satisfy that filter - return b.ForEach(f) + it, err := r.NewIter(&pebble.IterOptions{ + OnlyReadGuaranteedDurable: true, + }) + if err != nil { + return err + } + for v := it.First(); v; v = it.Next() { + if err := f(it.Key(), it.Value()); err != nil { + err = errors.Join(err, it.Close()) + return err + } + } + return it.Close() } - c := b.Cursor() - for k, v := c.Seek(val); bytes.HasPrefix(k, prefix); k, v = c.Next() { - if checkLast && (len(k) == len(prefix) || k[len(prefix)]>>4 != val[len(val)-1]) { + it, err := r.NewIter(&pebble.IterOptions{ + OnlyReadGuaranteedDurable: true, + LowerBound: prefix, + }) + if err != nil { + return err + } + for v := it.First(); v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() { + if checkLast && (len(it.Key()) == len(prefix) || it.Key()[len(prefix)]>>4 != val[len(val)-1]) { // If the last byte doesn't match, this means the prefix does no longer match, // so we need to break here. break } - if err := f(k, v); err != nil { + + if err := f(it.Key(), it.Value()); err != nil { + err = errors.Join(err, it.Close()) return err } } - return nil + return it.Close() } func unknownMatcher(_ string, _ []byte, _ string) bool { return false } -func unknownMatcherBucket(_ *bbolt.Bucket, _ string, _ string, _ func([]byte, []byte) error) error { +func unknownMatcherBucket(_ pebble.Reader, _ string, _ string, _ func([]byte, []byte) error) error { return nil } @@ -307,13 +333,6 @@ func WithLogger(l *logger.Logger) Option { } } -// WithBoltDBOptions returns option to specify BoltDB options. -func WithBoltDBOptions(opts *bbolt.Options) Option { - return func(c *cfg) { - c.boltOptions = opts - } -} - // WithPath returns option to set system path to Metabase. func WithPath(path string) Option { return func(c *cfg) { @@ -329,28 +348,6 @@ func WithPermissions(perm fs.FileMode) Option { } } -// WithMaxBatchSize returns option to specify maximum concurrent operations -// to be processed in a single transactions. -// This option is missing from `bbolt.Options` but is set right after DB is open. -func WithMaxBatchSize(s int) Option { - return func(c *cfg) { - if s != 0 { - c.boltBatchSize = s - } - } -} - -// WithMaxBatchDelay returns option to specify maximum time to wait before -// the batch of concurrent transactions is processed. -// This option is missing from `bbolt.Options` but is set right after DB is open. -func WithMaxBatchDelay(d time.Duration) Option { - return func(c *cfg) { - if d != 0 { - c.boltBatchDelay = d - } - } -} - // WithEpochState return option to specify a source of current epoch height. func WithEpochState(s EpochState) Option { return func(c *cfg) { diff --git a/pkg/local_object_storage/metabase/delete.go b/pkg/local_object_storage/metabase/delete.go index ae10564a8..452b0c563 100644 --- a/pkg/local_object_storage/metabase/delete.go +++ b/pkg/local_object_storage/metabase/delete.go @@ -112,7 +112,7 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) { var err error var res DeleteRes - err = db.boltDB.Update(func(tx *bbolt.Tx) error { + err = db.database.Update(func(tx *bbolt.Tx) error { res, err = db.deleteGroup(tx, prm.addrs) return err }) diff --git a/pkg/local_object_storage/metabase/exists.go b/pkg/local_object_storage/metabase/exists.go index 153d92110..de52fe71d 100644 --- a/pkg/local_object_storage/metabase/exists.go +++ b/pkg/local_object_storage/metabase/exists.go @@ -81,7 +81,7 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err currEpoch := db.epochState.CurrentEpoch() - err = db.boltDB.View(func(tx *bbolt.Tx) error { + err = db.database.View(func(tx *bbolt.Tx) error { res.exists, res.locked, err = db.exists(tx, prm.addr, prm.paddr, currEpoch) return err diff --git a/pkg/local_object_storage/metabase/expired.go b/pkg/local_object_storage/metabase/expired.go index aa2cb6f20..f5ff131b6 100644 --- a/pkg/local_object_storage/metabase/expired.go +++ b/pkg/local_object_storage/metabase/expired.go @@ -49,7 +49,7 @@ func (db *DB) FilterExpired(ctx context.Context, epoch uint64, addresses []oid.A containerIDToObjectIDs[addr.Container()] = append(containerIDToObjectIDs[addr.Container()], addr.Object()) } - err := db.boltDB.View(func(tx *bbolt.Tx) error { + err := db.database.View(func(tx *bbolt.Tx) error { for containerID, objectIDs := range containerIDToObjectIDs { select { case <-ctx.Done(): diff --git a/pkg/local_object_storage/metabase/get.go b/pkg/local_object_storage/metabase/get.go index d9acd4ce2..8df92d871 100644 --- a/pkg/local_object_storage/metabase/get.go +++ b/pkg/local_object_storage/metabase/get.go @@ -77,7 +77,7 @@ func (db *DB) Get(ctx context.Context, prm GetPrm) (res GetRes, err error) { currEpoch := db.epochState.CurrentEpoch() - err = db.boltDB.View(func(tx *bbolt.Tx) error { + err = db.database.View(func(tx *bbolt.Tx) error { key := make([]byte, addressKeySize) res.hdr, err = db.get(tx, prm.addr, key, true, prm.raw, currEpoch) diff --git a/pkg/local_object_storage/metabase/graveyard.go b/pkg/local_object_storage/metabase/graveyard.go index 80d40fb78..e68960ea4 100644 --- a/pkg/local_object_storage/metabase/graveyard.go +++ b/pkg/local_object_storage/metabase/graveyard.go @@ -80,7 +80,7 @@ func (db *DB) IterateOverGarbage(ctx context.Context, p GarbageIterationPrm) err return ErrDegradedMode } - err := metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error { + err := metaerr.Wrap(db.database.View(func(tx *bbolt.Tx) error { return db.iterateDeletedObj(tx, gcHandler{p.h}, p.offset) })) success = err == nil @@ -160,7 +160,7 @@ func (db *DB) IterateOverGraveyard(ctx context.Context, p GraveyardIterationPrm) return ErrDegradedMode } - return metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error { + return metaerr.Wrap(db.database.View(func(tx *bbolt.Tx) error { return db.iterateDeletedObj(tx, graveyardHandler{p.h}, p.offset) })) } @@ -282,7 +282,7 @@ func (db *DB) DropGraves(ctx context.Context, tss []TombstonedObject) error { buf := make([]byte, addressKeySize) - return db.boltDB.Update(func(tx *bbolt.Tx) error { + return db.database.Update(func(tx *bbolt.Tx) error { bkt := tx.Bucket(graveyardBucketName) if bkt == nil { return nil diff --git a/pkg/local_object_storage/metabase/inhume.go b/pkg/local_object_storage/metabase/inhume.go index c265fb217..b1574f6f2 100644 --- a/pkg/local_object_storage/metabase/inhume.go +++ b/pkg/local_object_storage/metabase/inhume.go @@ -181,7 +181,7 @@ func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) { inhumedByCnrID: make(map[cid.ID]ObjectCounters), } currEpoch := db.epochState.CurrentEpoch() - err := db.boltDB.Update(func(tx *bbolt.Tx) error { + err := db.database.Update(func(tx *bbolt.Tx) error { return db.inhumeTx(tx, currEpoch, prm, &res) }) success = err == nil diff --git a/pkg/local_object_storage/metabase/iterators.go b/pkg/local_object_storage/metabase/iterators.go index 7b60b7d50..b259e0aa1 100644 --- a/pkg/local_object_storage/metabase/iterators.go +++ b/pkg/local_object_storage/metabase/iterators.go @@ -71,7 +71,7 @@ func (db *DB) IterateExpired(ctx context.Context, epoch uint64, h ExpiredObjectH return ErrDegradedMode } - err := metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error { + err := metaerr.Wrap(db.database.View(func(tx *bbolt.Tx) error { return db.iterateExpired(tx, epoch, h) })) success = err == nil @@ -164,7 +164,7 @@ func (db *DB) IterateCoveredByTombstones(ctx context.Context, tss map[string]oid return ErrDegradedMode } - return db.boltDB.View(func(tx *bbolt.Tx) error { + return db.database.View(func(tx *bbolt.Tx) error { return db.iterateCoveredByTombstones(tx, tss, h) }) } diff --git a/pkg/local_object_storage/metabase/list.go b/pkg/local_object_storage/metabase/list.go index b4326a92c..ac5342a1b 100644 --- a/pkg/local_object_storage/metabase/list.go +++ b/pkg/local_object_storage/metabase/list.go @@ -91,7 +91,7 @@ func (db *DB) ListWithCursor(ctx context.Context, prm ListPrm) (res ListRes, err result := make([]objectcore.Info, 0, prm.count) - err = db.boltDB.View(func(tx *bbolt.Tx) error { + err = db.database.View(func(tx *bbolt.Tx) error { res.addrList, res.cursor, err = db.listWithCursor(tx, result, prm.count, prm.cursor) return err }) diff --git a/pkg/local_object_storage/metabase/lock.go b/pkg/local_object_storage/metabase/lock.go index 732ba426d..e8e724dd1 100644 --- a/pkg/local_object_storage/metabase/lock.go +++ b/pkg/local_object_storage/metabase/lock.go @@ -78,7 +78,7 @@ func (db *DB) lockInternal(locked []oid.ID, cnr cid.ID, locker oid.ID) error { } key := make([]byte, cidSize) - return metaerr.Wrap(db.boltDB.Update(func(tx *bbolt.Tx) error { + return metaerr.Wrap(db.database.Update(func(tx *bbolt.Tx) error { if firstIrregularObjectType(tx, cnr, bucketKeysLocked...) != objectSDK.TypeRegular { return logicerr.Wrap(new(apistatus.LockNonRegularObject)) } @@ -143,7 +143,7 @@ func (db *DB) FreeLockedBy(lockers []oid.Address) ([]oid.Address, error) { var unlockedObjects []oid.Address - if err := db.boltDB.Update(func(tx *bbolt.Tx) error { + if err := db.database.Update(func(tx *bbolt.Tx) error { for i := range lockers { unlocked, err := freePotentialLocks(tx, lockers[i].Container(), lockers[i].Object()) if err != nil { @@ -343,7 +343,7 @@ func (db *DB) IsLocked(ctx context.Context, prm IsLockedPrm) (res IsLockedRes, e if db.mode.NoMetabase() { return res, ErrDegradedMode } - err = metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error { + err = metaerr.Wrap(db.database.View(func(tx *bbolt.Tx) error { res.locked = objectLocked(tx, prm.addr.Container(), prm.addr.Object()) return nil })) @@ -376,7 +376,7 @@ func (db *DB) GetLocked(ctx context.Context, addr oid.Address) (res []oid.ID, er if db.mode.NoMetabase() { return res, ErrDegradedMode } - err = metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error { + err = metaerr.Wrap(db.database.View(func(tx *bbolt.Tx) error { res, err = getLocked(tx, addr.Container(), addr.Object()) return nil })) diff --git a/pkg/local_object_storage/metabase/mode.go b/pkg/local_object_storage/metabase/mode.go index 2032ed6b2..c49437f97 100644 --- a/pkg/local_object_storage/metabase/mode.go +++ b/pkg/local_object_storage/metabase/mode.go @@ -23,7 +23,7 @@ func (db *DB) SetMode(m mode.Mode) error { } if m.NoMetabase() { - db.boltDB = nil + db.database = nil } else { err := db.openDB(m) if err == nil && !m.ReadOnly() { diff --git a/pkg/local_object_storage/metabase/mode_test.go b/pkg/local_object_storage/metabase/mode_test.go index 1b9f60055..7286a9db3 100644 --- a/pkg/local_object_storage/metabase/mode_test.go +++ b/pkg/local_object_storage/metabase/mode_test.go @@ -24,14 +24,14 @@ func Test_Mode(t *testing.T) { }...) require.NoError(t, bdb.Open(context.Background(), mode.DegradedReadOnly)) - require.Nil(t, bdb.boltDB) + require.Nil(t, bdb.database) require.NoError(t, bdb.Init()) - require.Nil(t, bdb.boltDB) + require.Nil(t, bdb.database) require.NoError(t, bdb.Close()) require.NoError(t, bdb.Open(context.Background(), mode.Degraded)) - require.Nil(t, bdb.boltDB) + require.Nil(t, bdb.database) require.NoError(t, bdb.Init()) - require.Nil(t, bdb.boltDB) + require.Nil(t, bdb.database) require.NoError(t, bdb.Close()) } diff --git a/pkg/local_object_storage/metabase/pebble.go b/pkg/local_object_storage/metabase/pebble.go new file mode 100644 index 000000000..d1cfda5c3 --- /dev/null +++ b/pkg/local_object_storage/metabase/pebble.go @@ -0,0 +1,23 @@ +package meta + +import ( + "bytes" + "errors" + + "github.com/cockroachdb/pebble" +) + +func getSafe(r pebble.Reader, key []byte) ([]byte, error) { + data, closer, err := r.Get(key) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return nil, nil + } + return nil, err + } + result := bytes.Clone(data) + if err := closer.Close(); err != nil { + return nil, err + } + return result, nil +} diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go index f351cb485..8e15c6605 100644 --- a/pkg/local_object_storage/metabase/put.go +++ b/pkg/local_object_storage/metabase/put.go @@ -86,7 +86,7 @@ func (db *DB) Put(ctx context.Context, prm PutPrm) (res PutRes, err error) { currEpoch := db.epochState.CurrentEpoch() - err = db.boltDB.Batch(func(tx *bbolt.Tx) error { + err = db.database.Batch(func(tx *bbolt.Tx) error { var e error res, e = db.put(tx, prm.obj, prm.id, nil, currEpoch) return e diff --git a/pkg/local_object_storage/metabase/reset_test.go b/pkg/local_object_storage/metabase/reset_test.go index 66f5eefc6..61b6f31cb 100644 --- a/pkg/local_object_storage/metabase/reset_test.go +++ b/pkg/local_object_storage/metabase/reset_test.go @@ -45,7 +45,7 @@ func TestResetDropsContainerBuckets(t *testing.T) { require.NoError(t, db.Reset()) var bucketCount int - require.NoError(t, db.boltDB.Update(func(tx *bbolt.Tx) error { + require.NoError(t, db.database.Update(func(tx *bbolt.Tx) error { return tx.ForEach(func(name []byte, b *bbolt.Bucket) error { _, exists := mStaticBuckets[string(name)] require.True(t, exists, "unexpected bucket:"+string(name)) diff --git a/pkg/local_object_storage/metabase/select.go b/pkg/local_object_storage/metabase/select.go index 3a4d7a227..ff23350db 100644 --- a/pkg/local_object_storage/metabase/select.go +++ b/pkg/local_object_storage/metabase/select.go @@ -91,7 +91,7 @@ func (db *DB) Select(ctx context.Context, prm SelectPrm) (res SelectRes, err err currEpoch := db.epochState.CurrentEpoch() - return res, metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error { + return res, metaerr.Wrap(db.database.View(func(tx *bbolt.Tx) error { res.addrList, err = db.selectObjects(tx, prm.cnr, prm.filters, currEpoch) success = err == nil return err diff --git a/pkg/local_object_storage/metabase/shard_id.go b/pkg/local_object_storage/metabase/shard_id.go index 88446494e..7d9c8b98e 100644 --- a/pkg/local_object_storage/metabase/shard_id.go +++ b/pkg/local_object_storage/metabase/shard_id.go @@ -47,7 +47,7 @@ func (db *DB) GetShardID(mode metamode.Mode) ([]byte, error) { // If id is missing, returns nil, nil. func (db *DB) readShardID() ([]byte, error) { var id []byte - err := db.boltDB.View(func(tx *bbolt.Tx) error { + err := db.database.View(func(tx *bbolt.Tx) error { b := tx.Bucket(shardInfoBucket) if b != nil { id = bytes.Clone(b.Get(shardIDKey)) @@ -86,7 +86,7 @@ func (db *DB) SetShardID(id []byte, mode metamode.Mode) error { // writeShardID writes shard id to db. func (db *DB) writeShardID(id []byte) error { - return metaerr.Wrap(db.boltDB.Update(func(tx *bbolt.Tx) error { + return metaerr.Wrap(db.database.Update(func(tx *bbolt.Tx) error { b, err := tx.CreateBucketIfNotExists(shardInfoBucket) if err != nil { return err diff --git a/pkg/local_object_storage/metabase/storage_id.go b/pkg/local_object_storage/metabase/storage_id.go index 6d620b41a..4ccb21eba 100644 --- a/pkg/local_object_storage/metabase/storage_id.go +++ b/pkg/local_object_storage/metabase/storage_id.go @@ -57,7 +57,7 @@ func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes return res, ErrDegradedMode } - err = db.boltDB.View(func(tx *bbolt.Tx) error { + err = db.database.View(func(tx *bbolt.Tx) error { res.id, err = db.storageID(tx, prm.addr) return err @@ -126,7 +126,7 @@ func (db *DB) UpdateStorageID(ctx context.Context, prm UpdateStorageIDPrm) (res return res, ErrReadOnlyMode } - err = db.boltDB.Batch(func(tx *bbolt.Tx) error { + err = db.database.Batch(func(tx *bbolt.Tx) error { return setStorageID(tx, prm.addr, prm.id, true) }) success = err == nil diff --git a/pkg/local_object_storage/metabase/version_test.go b/pkg/local_object_storage/metabase/version_test.go index b2af428ff..06b108392 100644 --- a/pkg/local_object_storage/metabase/version_test.go +++ b/pkg/local_object_storage/metabase/version_test.go @@ -27,7 +27,7 @@ func TestVersion(t *testing.T) { WithPermissions(0o600), WithEpochState(epochStateImpl{})) } check := func(t *testing.T, db *DB) { - require.NoError(t, db.boltDB.View(func(tx *bbolt.Tx) error { + require.NoError(t, db.database.View(func(tx *bbolt.Tx) error { b := tx.Bucket(shardInfoBucket) if b == nil { return errors.New("shard info bucket not found") @@ -68,7 +68,7 @@ func TestVersion(t *testing.T) { t.Run("invalid version", func(t *testing.T) { db := newDB(t) require.NoError(t, db.Open(context.Background(), mode.ReadWrite)) - require.NoError(t, db.boltDB.Update(func(tx *bbolt.Tx) error { + require.NoError(t, db.database.Update(func(tx *bbolt.Tx) error { return updateVersion(tx, version+1) })) require.NoError(t, db.Close())