metabase: optimize Inhume operation #1385

Open
dstepanov-yadro wants to merge 3 commits from dstepanov-yadro/frostfs-node:fix/metabase_inhume_hangs into master
Showing only changes of commit 3b5a9d466f - Show all commits

View file

@ -377,11 +377,8 @@ func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Buck
return targetBucket, value, nil return targetBucket, value, nil
} }
func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, key []byte) (bool, error) { func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, addressKey []byte) (bool, error) {
targetIsTomb, err := isTomb(graveyardBKT, key) targetIsTomb := isTomb(graveyardBKT, addressKey)
if err != nil {
return false, err
}
// do not add grave if target is a tombstone // do not add grave if target is a tombstone
if targetIsTomb { if targetIsTomb {
@ -390,7 +387,7 @@ func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, key []byte) (bool
// if tombstone appears object must be // if tombstone appears object must be
// additionally marked with GC // additionally marked with GC
return false, garbageBKT.Put(key, zeroValue) return false, garbageBKT.Put(addressKey, zeroValue)
} }
func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error { func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error {
@ -410,25 +407,21 @@ func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Buc
return nil return nil
} }
func isTomb(graveyardBucket *bbolt.Bucket, key []byte) (bool, error) { func isTomb(graveyardBucket *bbolt.Bucket, addressKey []byte) bool {
targetIsTomb := false targetIsTomb := false
// iterate over graveyard and check if target address // iterate over graveyard and check if target address
// is the address of tombstone in graveyard. // is the address of tombstone in graveyard.
err := graveyardBucket.ForEach(func(_, v []byte) error { // tombstone must have the same container ID as key.
c := graveyardBucket.Cursor()
containerPrefix := addressKey[:cidSize]
for k, v := c.Seek(containerPrefix); k != nil && bytes.HasPrefix(k, containerPrefix); k, v = c.Next() {
// check if graveyard has record with key corresponding // check if graveyard has record with key corresponding
// to tombstone address (at least one) // to tombstone address (at least one)
targetIsTomb = bytes.Equal(v, key) targetIsTomb = bytes.Equal(v, addressKey)
if targetIsTomb { if targetIsTomb {
// break bucket iterator break
return errBreakBucketForEach
} }
return nil
})
if err != nil && !errors.Is(err, errBreakBucketForEach) {
return false, err
} }
return targetIsTomb, nil return targetIsTomb
} }