[#188] metabase: Refactor object inhume

Resolve funlen linter for db.Inhume method

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2023-03-31 14:12:18 +03:00
parent 5a66db80c5
commit 1f1aed87be

View file

@ -118,8 +118,6 @@ var ErrLockObjectRemoval = logicerr.New("lock object removal")
// //
// NOTE: Marks any object with GC mark (despite any prohibitions on operations // NOTE: Marks any object with GC mark (despite any prohibitions on operations
// with that object) if WithForceGCMark option has been provided. // with that object) if WithForceGCMark option has been provided.
//
// nolint: funlen, gocognit
func (db *DB) Inhume(prm InhumePrm) (res InhumeRes, err error) { func (db *DB) Inhume(prm InhumePrm) (res InhumeRes, err error) {
db.modeMtx.RLock() db.modeMtx.RLock()
defer db.modeMtx.RUnlock() defer db.modeMtx.RUnlock()
@ -131,145 +129,176 @@ func (db *DB) Inhume(prm InhumePrm) (res InhumeRes, err error) {
} }
currEpoch := db.epochState.CurrentEpoch() currEpoch := db.epochState.CurrentEpoch()
var inhumed uint64
err = db.boltDB.Update(func(tx *bbolt.Tx) error { err = db.boltDB.Update(func(tx *bbolt.Tx) error {
garbageBKT := tx.Bucket(garbageBucketName) return db.inhumeTx(tx, currEpoch, prm, &res)
graveyardBKT := tx.Bucket(graveyardBucketName) })
var ( return
// target bucket of the operation, one of the: }
// 1. Graveyard if Inhume was called with a Tombstone
// 2. Garbage if Inhume was called with a GC mark
bkt *bbolt.Bucket
// value that will be put in the bucket, one of the:
// 1. tombstone address if Inhume was called with
// a Tombstone
// 2. zeroValue if Inhume was called with a GC mark
value []byte
)
if prm.tomb != nil { func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes) error {
bkt = graveyardBKT garbageBKT := tx.Bucket(garbageBucketName)
tombKey := addressKey(*prm.tomb, make([]byte, addressKeySize)) graveyardBKT := tx.Bucket(graveyardBucketName)
// it is forbidden to have a tomb-on-tomb in FrostFS, bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, &prm)
// so graveyard keys must not be addresses of tombstones if err != nil {
data := bkt.Get(tombKey) return err
if data != nil { }
err := bkt.Delete(tombKey)
if err != nil {
return fmt.Errorf("could not remove grave with tombstone key: %w", err)
}
}
value = tombKey buf := make([]byte, addressKeySize)
} else { for i := range prm.target {
bkt = garbageBKT id := prm.target[i].Object()
value = zeroValue cnr := prm.target[i].Container()
// prevent locked objects to be inhumed
if !prm.forceRemoval && objectLocked(tx, cnr, id) {
return apistatus.ObjectLocked{}
} }
buf := make([]byte, addressKeySize) var lockWasChecked bool
for i := range prm.target {
id := prm.target[i].Object()
cnr := prm.target[i].Container()
// prevent locked objects to be inhumed // prevent lock objects to be inhumed
if !prm.forceRemoval && objectLocked(tx, cnr, id) { // if `Inhume` was called not with the
return apistatus.ObjectLocked{} // `WithForceGCMark` option
if !prm.forceRemoval {
if isLockObject(tx, cnr, id) {
return ErrLockObjectRemoval
} }
var lockWasChecked bool lockWasChecked = true
}
// prevent lock objects to be inhumed obj, err := db.get(tx, prm.target[i], buf, false, true, epoch)
// if `Inhume` was called not with the targetKey := addressKey(prm.target[i], buf)
// `WithForceGCMark` option if err == nil {
if !prm.forceRemoval { err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res)
if isLockObject(tx, cnr, id) { if err != nil {
return ErrLockObjectRemoval return err
}
lockWasChecked = true
} }
}
obj, err := db.get(tx, prm.target[i], buf, false, true, currEpoch) if prm.tomb != nil {
targetKey := addressKey(prm.target[i], buf) var isTomb bool
if err == nil { isTomb, err = db.markAsGC(graveyardBKT, garbageBKT, targetKey)
containerID, _ := obj.ContainerID()
if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 {
inhumed++
res.storeDeletionInfo(containerID, obj.PayloadSize())
}
// if object is stored, and it is regular object then update bucket
// with container size estimations
if obj.Type() == object.TypeRegular {
err := changeContainerSize(tx, cnr, obj.PayloadSize(), false)
if err != nil {
return err
}
}
}
if prm.tomb != nil {
targetIsTomb := false
// iterate over graveyard and check if target address
// is the address of tombstone in graveyard.
err = bkt.ForEach(func(k, v []byte) error {
// check if graveyard has record with key corresponding
// to tombstone address (at least one)
targetIsTomb = bytes.Equal(v, targetKey)
if targetIsTomb {
// break bucket iterator
return errBreakBucketForEach
}
return nil
})
if err != nil && !errors.Is(err, errBreakBucketForEach) {
return err
}
// do not add grave if target is a tombstone
if targetIsTomb {
continue
}
// if tombstone appears object must be
// additionally marked with GC
err = garbageBKT.Put(targetKey, zeroValue)
if err != nil {
return err
}
}
// consider checking if target is already in graveyard?
err = bkt.Put(targetKey, value)
if err != nil { if err != nil {
return err return err
} }
if prm.lockObjectHandling { if isTomb {
// do not perform lock check if continue
// it was already called
if lockWasChecked {
// inhumed object is not of
// the LOCK type
continue
}
if isLockObject(tx, cnr, id) {
res.deletedLockObj = append(res.deletedLockObj, prm.target[i])
}
} }
} }
return db.updateCounter(tx, logical, inhumed, false) // consider checking if target is already in graveyard?
}) err = bkt.Put(targetKey, value)
if err != nil {
return err
}
res.availableImhumed = inhumed if prm.lockObjectHandling {
// do not perform lock check if
// it was already called
if lockWasChecked {
// inhumed object is not of
// the LOCK type
continue
}
return if isLockObject(tx, cnr, id) {
res.deletedLockObj = append(res.deletedLockObj, prm.target[i])
}
}
}
return db.updateCounter(tx, logical, res.availableImhumed, false)
}
// getInhumeTargetBucketAndValue return target bucket to store inhume result and value that will be put in the bucket.
//
// target bucket of the operation, one of the:
// 1. Graveyard if Inhume was called with a Tombstone
// 2. Garbage if Inhume was called with a GC mark
//
// value that will be put in the bucket, one of the:
// 1. tombstone address if Inhume was called with
// a Tombstone
// 2. zeroValue if Inhume was called with a GC mark
func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Bucket, prm *InhumePrm) (targetBucket *bbolt.Bucket, value []byte, err error) {
if prm.tomb != nil {
targetBucket = graveyardBKT
tombKey := addressKey(*prm.tomb, make([]byte, addressKeySize))
// it is forbidden to have a tomb-on-tomb in FrostFS,
// so graveyard keys must not be addresses of tombstones
data := targetBucket.Get(tombKey)
if data != nil {
err := targetBucket.Delete(tombKey)
if err != nil {
return nil, nil, fmt.Errorf("could not remove grave with tombstone key: %w", err)
}
}
value = tombKey
} else {
targetBucket = garbageBKT
value = zeroValue
}
return targetBucket, value, nil
}
func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, key []byte) (bool, error) {
targetIsTomb, err := isTomb(graveyardBKT, key)
if err != nil {
return false, err
}
// do not add grave if target is a tombstone
if targetIsTomb {
return true, nil
}
// if tombstone appears object must be
// additionally marked with GC
return false, garbageBKT.Put(key, zeroValue)
}
func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *object.Object, res *InhumeRes) error {
containerID, _ := obj.ContainerID()
if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 {
res.availableImhumed++
res.storeDeletionInfo(containerID, obj.PayloadSize())
}
// if object is stored, and it is regular object then update bucket
// with container size estimations
if obj.Type() == object.TypeRegular {
err := changeContainerSize(tx, cnr, obj.PayloadSize(), false)
if err != nil {
return err
}
}
return nil
}
func isTomb(graveyardBucket *bbolt.Bucket, key []byte) (bool, error) {
targetIsTomb := false
// iterate over graveyard and check if target address
// is the address of tombstone in graveyard.
err := graveyardBucket.ForEach(func(k, v []byte) error {
// check if graveyard has record with key corresponding
// to tombstone address (at least one)
targetIsTomb = bytes.Equal(v, key)
if targetIsTomb {
// break bucket iterator
return errBreakBucketForEach
}
return nil
})
if err != nil && !errors.Is(err, errBreakBucketForEach) {
return false, err
}
return targetIsTomb, nil
} }