WIP: Change metabase engine to pebble #1221

Closed
dstepanov-yadro wants to merge 28 commits from dstepanov-yadro/frostfs-node:feat/pebble_metabase into master
Showing only changes of commit 06b0ad5b89 - Show all commits

View file

@ -11,7 +11,6 @@ import (
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"github.com/cockroachdb/pebble" "github.com/cockroachdb/pebble"
"github.com/dgraph-io/badger/v4"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
) )
@ -68,61 +67,55 @@ func (db *DB) IterateCoveredByTombstones(ctx context.Context, tss map[string]oid
return ErrDegradedMode return ErrDegradedMode
} }
return db.database.View(func(tx *badger.Txn) error { return db.snapshot(func(s *pebble.Snapshot) error {
return db.iterateCoveredByTombstones(ctx, tx, tss, h) return db.iterateCoveredByTombstones(ctx, s, tss, h)
}) })
} }
func (db *DB) iterateCoveredByTombstones(ctx context.Context, tx *badger.Txn, tss map[string]oid.Address, h func(oid.Address) error) error { func (db *DB) iterateCoveredByTombstones(ctx context.Context, r pebble.Reader, tss map[string]oid.Address, h func(oid.Address) error) error {
prefix := []byte{graveyardPrefix} prefix := []byte{graveyardPrefix}
it := tx.NewIterator(badger.IteratorOptions{ it, err := r.NewIter(&pebble.IterOptions{
PrefetchSize: badger.DefaultIteratorOptions.PrefetchSize, LowerBound: prefix,
Prefix: prefix,
PrefetchValues: true,
}) })
defer it.Close() if err != nil {
return err
}
for it.Seek(nil); it.ValidForPrefix(prefix); it.Next() { for v := it.First(); v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return errors.Join(ctx.Err(), it.Close())
default: default:
} }
var tombstoneAddress oid.Address tombstoneAddress, err := decodeAddressFromGrave(it.Value())
if err := it.Item().Value(func(val []byte) error { if err != nil {
var e error return errors.Join(err, it.Close())
tombstoneAddress, e = decodeAddressFromGrave(val)
return e
}); err != nil {
return err
} }
if _, ok := tss[tombstoneAddress.EncodeToString()]; !ok { if _, ok := tss[tombstoneAddress.EncodeToString()]; !ok {
continue continue
} }
var objectAddress oid.Address objectAddress, err := addressFromGraveyardKey(it.Key())
var err error
objectAddress, err = addressFromGraveyardKey(it.Item().Key())
if err != nil { if err != nil {
return err return errors.Join(err, it.Close())
} }
isLocked, err := objectLocked(ctx, tx, objectAddress.Container(), objectAddress.Object()) isLocked, err := objectLocked(ctx, r, objectAddress.Container(), objectAddress.Object())
if err != nil { if err != nil {
return err return errors.Join(err, it.Close())
} }
if isLocked { if isLocked {
continue continue
} }
if err := h(objectAddress); err != nil { if err := h(objectAddress); err != nil {
if errors.Is(err, ErrInterruptIterator) { if errors.Is(err, ErrInterruptIterator) {
return nil return it.Close()
} }
return err return errors.Join(err, it.Close())
} }
} }
return nil return it.Close()
} }
func iteratePhyObjects(r pebble.Reader, f func(cid.ID, oid.ID, *objectSDK.Object) error) error { func iteratePhyObjects(r pebble.Reader, f func(cid.ID, oid.ID, *objectSDK.Object) error) error {