[#9999] metabase: Fix db engine to pebble in iterators.go
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
268862c9f0
commit
4ef55a622f
1 changed files with 20 additions and 27 deletions
|
@ -11,7 +11,6 @@ import (
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
"github.com/cockroachdb/pebble"
|
"github.com/cockroachdb/pebble"
|
||||||
"github.com/dgraph-io/badger/v4"
|
|
||||||
|
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
)
|
)
|
||||||
|
@ -68,61 +67,55 @@ func (db *DB) IterateCoveredByTombstones(ctx context.Context, tss map[string]oid
|
||||||
return ErrDegradedMode
|
return ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
return db.database.View(func(tx *badger.Txn) error {
|
return db.snapshot(func(s *pebble.Snapshot) error {
|
||||||
return db.iterateCoveredByTombstones(ctx, tx, tss, h)
|
return db.iterateCoveredByTombstones(ctx, s, tss, h)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) iterateCoveredByTombstones(ctx context.Context, tx *badger.Txn, tss map[string]oid.Address, h func(oid.Address) error) error {
|
func (db *DB) iterateCoveredByTombstones(ctx context.Context, r pebble.Reader, tss map[string]oid.Address, h func(oid.Address) error) error {
|
||||||
prefix := []byte{graveyardPrefix}
|
prefix := []byte{graveyardPrefix}
|
||||||
it := tx.NewIterator(badger.IteratorOptions{
|
it, err := r.NewIter(&pebble.IterOptions{
|
||||||
PrefetchSize: badger.DefaultIteratorOptions.PrefetchSize,
|
LowerBound: prefix,
|
||||||
Prefix: prefix,
|
|
||||||
PrefetchValues: true,
|
|
||||||
})
|
})
|
||||||
defer it.Close()
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
for it.Seek(nil); it.ValidForPrefix(prefix); it.Next() {
|
for v := it.First(); v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return errors.Join(ctx.Err(), it.Close())
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
var tombstoneAddress oid.Address
|
tombstoneAddress, err := decodeAddressFromGrave(it.Value())
|
||||||
if err := it.Item().Value(func(val []byte) error {
|
if err != nil {
|
||||||
var e error
|
return errors.Join(err, it.Close())
|
||||||
tombstoneAddress, e = decodeAddressFromGrave(val)
|
|
||||||
return e
|
|
||||||
}); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
if _, ok := tss[tombstoneAddress.EncodeToString()]; !ok {
|
if _, ok := tss[tombstoneAddress.EncodeToString()]; !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
var objectAddress oid.Address
|
objectAddress, err := addressFromGraveyardKey(it.Key())
|
||||||
var err error
|
|
||||||
objectAddress, err = addressFromGraveyardKey(it.Item().Key())
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return errors.Join(err, it.Close())
|
||||||
}
|
}
|
||||||
|
|
||||||
isLocked, err := objectLocked(ctx, tx, objectAddress.Container(), objectAddress.Object())
|
isLocked, err := objectLocked(ctx, r, objectAddress.Container(), objectAddress.Object())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return errors.Join(err, it.Close())
|
||||||
}
|
}
|
||||||
if isLocked {
|
if isLocked {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err := h(objectAddress); err != nil {
|
if err := h(objectAddress); err != nil {
|
||||||
if errors.Is(err, ErrInterruptIterator) {
|
if errors.Is(err, ErrInterruptIterator) {
|
||||||
return nil
|
return it.Close()
|
||||||
}
|
}
|
||||||
return err
|
return errors.Join(err, it.Close())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return it.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func iteratePhyObjects(r pebble.Reader, f func(cid.ID, oid.ID, *objectSDK.Object) error) error {
|
func iteratePhyObjects(r pebble.Reader, f func(cid.ID, oid.ID, *objectSDK.Object) error) error {
|
||||||
|
|
Loading…
Reference in a new issue