forked from TrueCloudLab/frostfs-node
[#9999] metabase: Change DB engine to pebble
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
7e04083c27
commit
1aaf3346ae
25 changed files with 141 additions and 108 deletions
13
go.mod
13
go.mod
|
@ -16,6 +16,7 @@ require (
|
|||
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02
|
||||
github.com/cheggaaa/pb v1.0.29
|
||||
github.com/chzyer/readline v1.5.1
|
||||
github.com/cockroachdb/pebble v1.1.1
|
||||
github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568
|
||||
github.com/go-pkgz/expirable-cache/v3 v3.0.0
|
||||
github.com/google/uuid v1.6.0
|
||||
|
@ -55,11 +56,17 @@ require (
|
|||
|
||||
require (
|
||||
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect
|
||||
github.com/DataDog/zstd v1.4.5 // indirect
|
||||
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/bits-and-blooms/bitset v1.13.0 // indirect
|
||||
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||
github.com/cockroachdb/errors v1.11.3 // indirect
|
||||
github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce // indirect
|
||||
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
|
||||
github.com/cockroachdb/redact v1.1.5 // indirect
|
||||
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect
|
||||
github.com/consensys/bavard v0.1.13 // indirect
|
||||
github.com/consensys/gnark-crypto v0.12.2-0.20231222162921-eb75782795d2 // indirect
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
|
||||
|
@ -67,9 +74,11 @@ require (
|
|||
github.com/davidmz/go-pageant v1.0.2 // indirect
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
|
||||
github.com/fsnotify/fsnotify v1.7.0 // indirect
|
||||
github.com/getsentry/sentry-go v0.27.0 // indirect
|
||||
github.com/go-fed/httpsig v1.1.0 // indirect
|
||||
github.com/go-logr/logr v1.4.1 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/golang/snappy v0.0.4 // indirect
|
||||
github.com/gorilla/websocket v1.5.1 // indirect
|
||||
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0 // indirect
|
||||
|
@ -83,6 +92,8 @@ require (
|
|||
github.com/josharian/intern v1.0.0 // indirect
|
||||
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
|
||||
github.com/klauspost/reedsolomon v1.12.1 // indirect
|
||||
github.com/kr/pretty v0.3.1 // indirect
|
||||
github.com/kr/text v0.2.0 // indirect
|
||||
github.com/magiconair/properties v1.8.7 // indirect
|
||||
github.com/mailru/easyjson v0.7.7 // indirect
|
||||
github.com/mattn/go-runewidth v0.0.15 // indirect
|
||||
|
@ -98,11 +109,13 @@ require (
|
|||
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20240521091047-78685785716d // indirect
|
||||
github.com/nspcc-dev/rfc6979 v0.2.1 // indirect
|
||||
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
|
||||
github.com/prometheus/client_model v0.5.0 // indirect
|
||||
github.com/prometheus/common v0.48.0 // indirect
|
||||
github.com/prometheus/procfs v0.12.0 // indirect
|
||||
github.com/rivo/uniseg v0.4.4 // indirect
|
||||
github.com/rogpeppe/go-internal v1.11.0 // indirect
|
||||
github.com/russross/blackfriday/v2 v2.1.0 // indirect
|
||||
github.com/spaolacci/murmur3 v1.1.0 // indirect
|
||||
github.com/spf13/afero v1.11.0 // indirect
|
||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -39,7 +39,7 @@ func (db *DB) GetChildren(ctx context.Context, addresses []oid.Address) (map[oid
|
|||
result := make(map[oid.Address][]oid.Address, len(addresses))
|
||||
|
||||
buffer := make([]byte, bucketKeySize)
|
||||
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
err := db.database.View(func(tx *bbolt.Tx) error {
|
||||
for _, addr := range addresses {
|
||||
if _, found := result[addr]; found {
|
||||
continue
|
||||
|
|
|
@ -30,7 +30,7 @@ func (db *DB) Containers(ctx context.Context) (list []cid.ID, err error) {
|
|||
return nil, ErrDegradedMode
|
||||
}
|
||||
|
||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
err = db.database.View(func(tx *bbolt.Tx) error {
|
||||
list, err = db.containers(tx)
|
||||
|
||||
return err
|
||||
|
@ -64,7 +64,7 @@ func (db *DB) ContainerSize(id cid.ID) (size uint64, err error) {
|
|||
return 0, ErrDegradedMode
|
||||
}
|
||||
|
||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
err = db.database.View(func(tx *bbolt.Tx) error {
|
||||
size, err = db.containerSize(tx, id)
|
||||
|
||||
return err
|
||||
|
|
|
@ -70,17 +70,17 @@ func (db *DB) openDB(mode mode.Mode) error {
|
|||
func (db *DB) openBolt() error {
|
||||
var err error
|
||||
|
||||
db.boltDB, err = bbolt.Open(db.info.Path, db.info.Permission, db.boltOptions)
|
||||
db.database, err = bbolt.Open(db.info.Path, db.info.Permission, db.boltOptions)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't open boltDB database: %w", err)
|
||||
}
|
||||
db.boltDB.MaxBatchDelay = db.boltBatchDelay
|
||||
db.boltDB.MaxBatchSize = db.boltBatchSize
|
||||
db.database.MaxBatchDelay = db.boltBatchDelay
|
||||
db.database.MaxBatchSize = db.boltBatchSize
|
||||
|
||||
db.log.Debug(logs.MetabaseOpenedBoltDBInstanceForMetabase)
|
||||
|
||||
db.log.Debug(logs.MetabaseCheckingMetabaseVersion)
|
||||
return db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
return db.database.View(func(tx *bbolt.Tx) error {
|
||||
// The safest way to check if the metabase is fresh is to check if it has no buckets.
|
||||
// However, shard info can be present. So here we check that the number of buckets is
|
||||
// at most 1.
|
||||
|
@ -130,7 +130,7 @@ func (db *DB) init(reset bool) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
return db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
return db.database.Update(func(tx *bbolt.Tx) error {
|
||||
var err error
|
||||
if !reset {
|
||||
// Normal open, check version and update if not initialized.
|
||||
|
@ -197,7 +197,7 @@ func (db *DB) SyncCounters() error {
|
|||
return ErrReadOnlyMode
|
||||
}
|
||||
|
||||
return metaerr.Wrap(db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
return metaerr.Wrap(db.database.Update(func(tx *bbolt.Tx) error {
|
||||
return syncCounter(tx, true)
|
||||
}))
|
||||
}
|
||||
|
@ -206,7 +206,7 @@ func (db *DB) SyncCounters() error {
|
|||
// and reports metabase metric.
|
||||
func (db *DB) Close() error {
|
||||
var err error
|
||||
if db.boltDB != nil {
|
||||
if db.database != nil {
|
||||
err = db.close()
|
||||
}
|
||||
if err == nil {
|
||||
|
@ -216,7 +216,7 @@ func (db *DB) Close() error {
|
|||
}
|
||||
|
||||
func (db *DB) close() error {
|
||||
return metaerr.Wrap(db.boltDB.Close())
|
||||
return metaerr.Wrap(db.database.Close())
|
||||
}
|
||||
|
||||
// Reload reloads part of the configuration.
|
||||
|
|
|
@ -63,7 +63,7 @@ func (db *DB) ObjectCounters() (cc ObjectCounters, err error) {
|
|||
return ObjectCounters{}, ErrDegradedMode
|
||||
}
|
||||
|
||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
err = db.database.View(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(shardInfoBucket)
|
||||
if b != nil {
|
||||
data := b.Get(objectPhyCounterKey)
|
||||
|
@ -151,7 +151,7 @@ func (db *DB) containerCountersNextBatch(lastKey []byte, f func(id cid.ID, entit
|
|||
counter := 0
|
||||
const batchSize = 1000
|
||||
|
||||
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
err := db.database.View(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(containerCounterBucketName)
|
||||
if b == nil {
|
||||
return ErrInterruptIterator
|
||||
|
@ -215,7 +215,7 @@ func (db *DB) ContainerCount(ctx context.Context, id cid.ID) (ObjectCounters, er
|
|||
|
||||
var result ObjectCounters
|
||||
|
||||
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
err := db.database.View(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(containerCounterBucketName)
|
||||
key := make([]byte, cidSize)
|
||||
id.Encode(key)
|
||||
|
@ -591,7 +591,7 @@ func (db *DB) containerSizesNextBatch(lastKey []byte, f func(cid.ID, uint64)) (b
|
|||
counter := 0
|
||||
const batchSize = 1000
|
||||
|
||||
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
err := db.database.View(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(containerVolumeBucketName)
|
||||
c := b.Cursor()
|
||||
var key, value []byte
|
||||
|
@ -654,7 +654,7 @@ func (db *DB) DeleteContainerSize(ctx context.Context, id cid.ID) error {
|
|||
return ErrReadOnlyMode
|
||||
}
|
||||
|
||||
err := db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
err := db.database.Update(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(containerVolumeBucketName)
|
||||
|
||||
key := make([]byte, cidSize)
|
||||
|
@ -737,7 +737,7 @@ func (db *DB) DeleteContainerCount(ctx context.Context, id cid.ID) error {
|
|||
return ErrReadOnlyMode
|
||||
}
|
||||
|
||||
err := db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
err := db.database.Update(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(containerCounterBucketName)
|
||||
|
||||
key := make([]byte, cidSize)
|
||||
|
|
|
@ -4,25 +4,25 @@ import (
|
|||
"bytes"
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"io/fs"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"github.com/cockroachdb/pebble"
|
||||
"github.com/mr-tron/base58"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type matcher struct {
|
||||
matchSlow func(string, []byte, string) bool
|
||||
matchBucket func(*bbolt.Bucket, string, string, func([]byte, []byte) error) error
|
||||
matchBucket func(pebble.Reader, string, string, func([]byte, []byte) error) error
|
||||
}
|
||||
|
||||
// EpochState is an interface that provides access to the
|
||||
|
@ -41,7 +41,7 @@ type DB struct {
|
|||
|
||||
matchers map[objectSDK.SearchMatchType]matcher
|
||||
|
||||
boltDB *bbolt.DB
|
||||
database *pebble.DB
|
||||
|
||||
initialized bool
|
||||
}
|
||||
|
@ -50,10 +50,7 @@ type DB struct {
|
|||
type Option func(*cfg)
|
||||
|
||||
type cfg struct {
|
||||
boltOptions *bbolt.Options // optional
|
||||
|
||||
boltBatchSize int
|
||||
boltBatchDelay time.Duration
|
||||
dbOptions *pebble.Options // optional
|
||||
|
||||
info Info
|
||||
|
||||
|
@ -68,10 +65,9 @@ func defaultCfg() *cfg {
|
|||
info: Info{
|
||||
Permission: os.ModePerm, // 0777
|
||||
},
|
||||
boltBatchDelay: bbolt.DefaultMaxBatchDelay,
|
||||
boltBatchSize: bbolt.DefaultMaxBatchSize,
|
||||
log: &logger.Logger{Logger: zap.L()},
|
||||
metrics: &noopMetrics{},
|
||||
log: &logger.Logger{Logger: zap.L()},
|
||||
metrics: &noopMetrics{},
|
||||
dbOptions: &pebble.Options{},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -186,17 +182,18 @@ func stringEqualMatcher(key string, objVal []byte, filterVal string) bool {
|
|||
return stringifyValue(key, objVal) == filterVal
|
||||
}
|
||||
|
||||
func stringEqualMatcherBucket(b *bbolt.Bucket, fKey string, fValue string, f func([]byte, []byte) error) error {
|
||||
func stringEqualMatcherBucket(r pebble.Reader, fKey string, fValue string, f func([]byte, []byte) error) error {
|
||||
// Ignore the second return value because we check for strict equality.
|
||||
val, _, ok := destringifyValue(fKey, fValue, false)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
if data := b.Get(val); data != nil {
|
||||
return f(val, data)
|
||||
data, err := getSafe(r, val)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if b.Bucket(val) != nil {
|
||||
return f(val, nil)
|
||||
if data != nil {
|
||||
return f(val, data)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -205,22 +202,33 @@ func stringNotEqualMatcher(key string, objVal []byte, filterVal string) bool {
|
|||
return stringifyValue(key, objVal) != filterVal
|
||||
}
|
||||
|
||||
func stringNotEqualMatcherBucket(b *bbolt.Bucket, fKey string, fValue string, f func([]byte, []byte) error) error {
|
||||
func stringNotEqualMatcherBucket(r pebble.Reader, fKey string, fValue string, f func([]byte, []byte) error) error {
|
||||
// Ignore the second return value because we check for strict inequality.
|
||||
val, _, ok := destringifyValue(fKey, fValue, false)
|
||||
return b.ForEach(func(k, v []byte) error {
|
||||
if !ok || !bytes.Equal(val, k) {
|
||||
return f(k, v)
|
||||
}
|
||||
return nil
|
||||
it, err := r.NewIter(&pebble.IterOptions{
|
||||
SkipPoint: func(k []byte) bool {
|
||||
return ok && bytes.Equal(val, k)
|
||||
},
|
||||
OnlyReadGuaranteedDurable: true,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for v := it.First(); v; v = it.Next() {
|
||||
if err := f(it.Key(), it.Value()); err != nil {
|
||||
err = errors.Join(err, it.Close())
|
||||
return err
|
||||
}
|
||||
}
|
||||
return it.Close()
|
||||
}
|
||||
|
||||
func stringCommonPrefixMatcher(key string, objVal []byte, filterVal string) bool {
|
||||
return strings.HasPrefix(stringifyValue(key, objVal), filterVal)
|
||||
}
|
||||
|
||||
func stringCommonPrefixMatcherBucket(b *bbolt.Bucket, fKey string, fVal string, f func([]byte, []byte) error) error {
|
||||
func stringCommonPrefixMatcherBucket(r pebble.Reader, fKey string, fVal string, f func([]byte, []byte) error) error {
|
||||
val, checkLast, ok := destringifyValue(fKey, fVal, true)
|
||||
if !ok {
|
||||
return nil
|
||||
|
@ -232,30 +240,48 @@ func stringCommonPrefixMatcherBucket(b *bbolt.Bucket, fKey string, fVal string,
|
|||
}
|
||||
|
||||
if len(val) == 0 {
|
||||
// empty common prefix, all the objects
|
||||
// satisfy that filter
|
||||
return b.ForEach(f)
|
||||
it, err := r.NewIter(&pebble.IterOptions{
|
||||
OnlyReadGuaranteedDurable: true,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for v := it.First(); v; v = it.Next() {
|
||||
if err := f(it.Key(), it.Value()); err != nil {
|
||||
err = errors.Join(err, it.Close())
|
||||
return err
|
||||
}
|
||||
}
|
||||
return it.Close()
|
||||
}
|
||||
|
||||
c := b.Cursor()
|
||||
for k, v := c.Seek(val); bytes.HasPrefix(k, prefix); k, v = c.Next() {
|
||||
if checkLast && (len(k) == len(prefix) || k[len(prefix)]>>4 != val[len(val)-1]) {
|
||||
it, err := r.NewIter(&pebble.IterOptions{
|
||||
OnlyReadGuaranteedDurable: true,
|
||||
LowerBound: prefix,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for v := it.First(); v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() {
|
||||
if checkLast && (len(it.Key()) == len(prefix) || it.Key()[len(prefix)]>>4 != val[len(val)-1]) {
|
||||
// If the last byte doesn't match, this means the prefix does no longer match,
|
||||
// so we need to break here.
|
||||
break
|
||||
}
|
||||
if err := f(k, v); err != nil {
|
||||
|
||||
if err := f(it.Key(), it.Value()); err != nil {
|
||||
err = errors.Join(err, it.Close())
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return it.Close()
|
||||
}
|
||||
|
||||
func unknownMatcher(_ string, _ []byte, _ string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func unknownMatcherBucket(_ *bbolt.Bucket, _ string, _ string, _ func([]byte, []byte) error) error {
|
||||
func unknownMatcherBucket(_ pebble.Reader, _ string, _ string, _ func([]byte, []byte) error) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -307,13 +333,6 @@ func WithLogger(l *logger.Logger) Option {
|
|||
}
|
||||
}
|
||||
|
||||
// WithBoltDBOptions returns option to specify BoltDB options.
|
||||
func WithBoltDBOptions(opts *bbolt.Options) Option {
|
||||
return func(c *cfg) {
|
||||
c.boltOptions = opts
|
||||
}
|
||||
}
|
||||
|
||||
// WithPath returns option to set system path to Metabase.
|
||||
func WithPath(path string) Option {
|
||||
return func(c *cfg) {
|
||||
|
@ -329,28 +348,6 @@ func WithPermissions(perm fs.FileMode) Option {
|
|||
}
|
||||
}
|
||||
|
||||
// WithMaxBatchSize returns option to specify maximum concurrent operations
|
||||
// to be processed in a single transactions.
|
||||
// This option is missing from `bbolt.Options` but is set right after DB is open.
|
||||
func WithMaxBatchSize(s int) Option {
|
||||
return func(c *cfg) {
|
||||
if s != 0 {
|
||||
c.boltBatchSize = s
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// WithMaxBatchDelay returns option to specify maximum time to wait before
|
||||
// the batch of concurrent transactions is processed.
|
||||
// This option is missing from `bbolt.Options` but is set right after DB is open.
|
||||
func WithMaxBatchDelay(d time.Duration) Option {
|
||||
return func(c *cfg) {
|
||||
if d != 0 {
|
||||
c.boltBatchDelay = d
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// WithEpochState return option to specify a source of current epoch height.
|
||||
func WithEpochState(s EpochState) Option {
|
||||
return func(c *cfg) {
|
||||
|
|
|
@ -112,7 +112,7 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
|
|||
var err error
|
||||
var res DeleteRes
|
||||
|
||||
err = db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
err = db.database.Update(func(tx *bbolt.Tx) error {
|
||||
res, err = db.deleteGroup(tx, prm.addrs)
|
||||
return err
|
||||
})
|
||||
|
|
|
@ -81,7 +81,7 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
|
|||
|
||||
currEpoch := db.epochState.CurrentEpoch()
|
||||
|
||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
err = db.database.View(func(tx *bbolt.Tx) error {
|
||||
res.exists, res.locked, err = db.exists(tx, prm.addr, prm.paddr, currEpoch)
|
||||
|
||||
return err
|
||||
|
|
|
@ -49,7 +49,7 @@ func (db *DB) FilterExpired(ctx context.Context, epoch uint64, addresses []oid.A
|
|||
containerIDToObjectIDs[addr.Container()] = append(containerIDToObjectIDs[addr.Container()], addr.Object())
|
||||
}
|
||||
|
||||
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
err := db.database.View(func(tx *bbolt.Tx) error {
|
||||
for containerID, objectIDs := range containerIDToObjectIDs {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
|
|
@ -77,7 +77,7 @@ func (db *DB) Get(ctx context.Context, prm GetPrm) (res GetRes, err error) {
|
|||
|
||||
currEpoch := db.epochState.CurrentEpoch()
|
||||
|
||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
err = db.database.View(func(tx *bbolt.Tx) error {
|
||||
key := make([]byte, addressKeySize)
|
||||
res.hdr, err = db.get(tx, prm.addr, key, true, prm.raw, currEpoch)
|
||||
|
||||
|
|
|
@ -80,7 +80,7 @@ func (db *DB) IterateOverGarbage(ctx context.Context, p GarbageIterationPrm) err
|
|||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
err := metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
err := metaerr.Wrap(db.database.View(func(tx *bbolt.Tx) error {
|
||||
return db.iterateDeletedObj(tx, gcHandler{p.h}, p.offset)
|
||||
}))
|
||||
success = err == nil
|
||||
|
@ -160,7 +160,7 @@ func (db *DB) IterateOverGraveyard(ctx context.Context, p GraveyardIterationPrm)
|
|||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
return metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
return metaerr.Wrap(db.database.View(func(tx *bbolt.Tx) error {
|
||||
return db.iterateDeletedObj(tx, graveyardHandler{p.h}, p.offset)
|
||||
}))
|
||||
}
|
||||
|
@ -282,7 +282,7 @@ func (db *DB) DropGraves(ctx context.Context, tss []TombstonedObject) error {
|
|||
|
||||
buf := make([]byte, addressKeySize)
|
||||
|
||||
return db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
return db.database.Update(func(tx *bbolt.Tx) error {
|
||||
bkt := tx.Bucket(graveyardBucketName)
|
||||
if bkt == nil {
|
||||
return nil
|
||||
|
|
|
@ -181,7 +181,7 @@ func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
|
|||
inhumedByCnrID: make(map[cid.ID]ObjectCounters),
|
||||
}
|
||||
currEpoch := db.epochState.CurrentEpoch()
|
||||
err := db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
err := db.database.Update(func(tx *bbolt.Tx) error {
|
||||
return db.inhumeTx(tx, currEpoch, prm, &res)
|
||||
})
|
||||
success = err == nil
|
||||
|
|
|
@ -71,7 +71,7 @@ func (db *DB) IterateExpired(ctx context.Context, epoch uint64, h ExpiredObjectH
|
|||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
err := metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
err := metaerr.Wrap(db.database.View(func(tx *bbolt.Tx) error {
|
||||
return db.iterateExpired(tx, epoch, h)
|
||||
}))
|
||||
success = err == nil
|
||||
|
@ -164,7 +164,7 @@ func (db *DB) IterateCoveredByTombstones(ctx context.Context, tss map[string]oid
|
|||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
return db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
return db.database.View(func(tx *bbolt.Tx) error {
|
||||
return db.iterateCoveredByTombstones(tx, tss, h)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -91,7 +91,7 @@ func (db *DB) ListWithCursor(ctx context.Context, prm ListPrm) (res ListRes, err
|
|||
|
||||
result := make([]objectcore.Info, 0, prm.count)
|
||||
|
||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
err = db.database.View(func(tx *bbolt.Tx) error {
|
||||
res.addrList, res.cursor, err = db.listWithCursor(tx, result, prm.count, prm.cursor)
|
||||
return err
|
||||
})
|
||||
|
|
|
@ -78,7 +78,7 @@ func (db *DB) lockInternal(locked []oid.ID, cnr cid.ID, locker oid.ID) error {
|
|||
}
|
||||
key := make([]byte, cidSize)
|
||||
|
||||
return metaerr.Wrap(db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
return metaerr.Wrap(db.database.Update(func(tx *bbolt.Tx) error {
|
||||
if firstIrregularObjectType(tx, cnr, bucketKeysLocked...) != objectSDK.TypeRegular {
|
||||
return logicerr.Wrap(new(apistatus.LockNonRegularObject))
|
||||
}
|
||||
|
@ -143,7 +143,7 @@ func (db *DB) FreeLockedBy(lockers []oid.Address) ([]oid.Address, error) {
|
|||
|
||||
var unlockedObjects []oid.Address
|
||||
|
||||
if err := db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
if err := db.database.Update(func(tx *bbolt.Tx) error {
|
||||
for i := range lockers {
|
||||
unlocked, err := freePotentialLocks(tx, lockers[i].Container(), lockers[i].Object())
|
||||
if err != nil {
|
||||
|
@ -343,7 +343,7 @@ func (db *DB) IsLocked(ctx context.Context, prm IsLockedPrm) (res IsLockedRes, e
|
|||
if db.mode.NoMetabase() {
|
||||
return res, ErrDegradedMode
|
||||
}
|
||||
err = metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
err = metaerr.Wrap(db.database.View(func(tx *bbolt.Tx) error {
|
||||
res.locked = objectLocked(tx, prm.addr.Container(), prm.addr.Object())
|
||||
return nil
|
||||
}))
|
||||
|
@ -376,7 +376,7 @@ func (db *DB) GetLocked(ctx context.Context, addr oid.Address) (res []oid.ID, er
|
|||
if db.mode.NoMetabase() {
|
||||
return res, ErrDegradedMode
|
||||
}
|
||||
err = metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
err = metaerr.Wrap(db.database.View(func(tx *bbolt.Tx) error {
|
||||
res, err = getLocked(tx, addr.Container(), addr.Object())
|
||||
return nil
|
||||
}))
|
||||
|
|
|
@ -23,7 +23,7 @@ func (db *DB) SetMode(m mode.Mode) error {
|
|||
}
|
||||
|
||||
if m.NoMetabase() {
|
||||
db.boltDB = nil
|
||||
db.database = nil
|
||||
} else {
|
||||
err := db.openDB(m)
|
||||
if err == nil && !m.ReadOnly() {
|
||||
|
|
|
@ -24,14 +24,14 @@ func Test_Mode(t *testing.T) {
|
|||
}...)
|
||||
|
||||
require.NoError(t, bdb.Open(context.Background(), mode.DegradedReadOnly))
|
||||
require.Nil(t, bdb.boltDB)
|
||||
require.Nil(t, bdb.database)
|
||||
require.NoError(t, bdb.Init())
|
||||
require.Nil(t, bdb.boltDB)
|
||||
require.Nil(t, bdb.database)
|
||||
require.NoError(t, bdb.Close())
|
||||
|
||||
require.NoError(t, bdb.Open(context.Background(), mode.Degraded))
|
||||
require.Nil(t, bdb.boltDB)
|
||||
require.Nil(t, bdb.database)
|
||||
require.NoError(t, bdb.Init())
|
||||
require.Nil(t, bdb.boltDB)
|
||||
require.Nil(t, bdb.database)
|
||||
require.NoError(t, bdb.Close())
|
||||
}
|
||||
|
|
23
pkg/local_object_storage/metabase/pebble.go
Normal file
23
pkg/local_object_storage/metabase/pebble.go
Normal file
|
@ -0,0 +1,23 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
|
||||
"github.com/cockroachdb/pebble"
|
||||
)
|
||||
|
||||
func getSafe(r pebble.Reader, key []byte) ([]byte, error) {
|
||||
data, closer, err := r.Get(key)
|
||||
if err != nil {
|
||||
if errors.Is(err, pebble.ErrNotFound) {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
result := bytes.Clone(data)
|
||||
if err := closer.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return result, nil
|
||||
}
|
|
@ -86,7 +86,7 @@ func (db *DB) Put(ctx context.Context, prm PutPrm) (res PutRes, err error) {
|
|||
|
||||
currEpoch := db.epochState.CurrentEpoch()
|
||||
|
||||
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
||||
err = db.database.Batch(func(tx *bbolt.Tx) error {
|
||||
var e error
|
||||
res, e = db.put(tx, prm.obj, prm.id, nil, currEpoch)
|
||||
return e
|
||||
|
|
|
@ -45,7 +45,7 @@ func TestResetDropsContainerBuckets(t *testing.T) {
|
|||
require.NoError(t, db.Reset())
|
||||
|
||||
var bucketCount int
|
||||
require.NoError(t, db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
require.NoError(t, db.database.Update(func(tx *bbolt.Tx) error {
|
||||
return tx.ForEach(func(name []byte, b *bbolt.Bucket) error {
|
||||
_, exists := mStaticBuckets[string(name)]
|
||||
require.True(t, exists, "unexpected bucket:"+string(name))
|
||||
|
|
|
@ -91,7 +91,7 @@ func (db *DB) Select(ctx context.Context, prm SelectPrm) (res SelectRes, err err
|
|||
|
||||
currEpoch := db.epochState.CurrentEpoch()
|
||||
|
||||
return res, metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
return res, metaerr.Wrap(db.database.View(func(tx *bbolt.Tx) error {
|
||||
res.addrList, err = db.selectObjects(tx, prm.cnr, prm.filters, currEpoch)
|
||||
success = err == nil
|
||||
return err
|
||||
|
|
|
@ -47,7 +47,7 @@ func (db *DB) GetShardID(mode metamode.Mode) ([]byte, error) {
|
|||
// If id is missing, returns nil, nil.
|
||||
func (db *DB) readShardID() ([]byte, error) {
|
||||
var id []byte
|
||||
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
err := db.database.View(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(shardInfoBucket)
|
||||
if b != nil {
|
||||
id = bytes.Clone(b.Get(shardIDKey))
|
||||
|
@ -86,7 +86,7 @@ func (db *DB) SetShardID(id []byte, mode metamode.Mode) error {
|
|||
|
||||
// writeShardID writes shard id to db.
|
||||
func (db *DB) writeShardID(id []byte) error {
|
||||
return metaerr.Wrap(db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
return metaerr.Wrap(db.database.Update(func(tx *bbolt.Tx) error {
|
||||
b, err := tx.CreateBucketIfNotExists(shardInfoBucket)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -57,7 +57,7 @@ func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes
|
|||
return res, ErrDegradedMode
|
||||
}
|
||||
|
||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
err = db.database.View(func(tx *bbolt.Tx) error {
|
||||
res.id, err = db.storageID(tx, prm.addr)
|
||||
|
||||
return err
|
||||
|
@ -126,7 +126,7 @@ func (db *DB) UpdateStorageID(ctx context.Context, prm UpdateStorageIDPrm) (res
|
|||
return res, ErrReadOnlyMode
|
||||
}
|
||||
|
||||
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
||||
err = db.database.Batch(func(tx *bbolt.Tx) error {
|
||||
return setStorageID(tx, prm.addr, prm.id, true)
|
||||
})
|
||||
success = err == nil
|
||||
|
|
|
@ -27,7 +27,7 @@ func TestVersion(t *testing.T) {
|
|||
WithPermissions(0o600), WithEpochState(epochStateImpl{}))
|
||||
}
|
||||
check := func(t *testing.T, db *DB) {
|
||||
require.NoError(t, db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
require.NoError(t, db.database.View(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(shardInfoBucket)
|
||||
if b == nil {
|
||||
return errors.New("shard info bucket not found")
|
||||
|
@ -68,7 +68,7 @@ func TestVersion(t *testing.T) {
|
|||
t.Run("invalid version", func(t *testing.T) {
|
||||
db := newDB(t)
|
||||
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
||||
require.NoError(t, db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
require.NoError(t, db.database.Update(func(tx *bbolt.Tx) error {
|
||||
return updateVersion(tx, version+1)
|
||||
}))
|
||||
require.NoError(t, db.Close())
|
||||
|
|
Loading…
Reference in a new issue