[#1493] metabase: Split inhumeTx() into 2 functions

No functional changes.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
Evgenii Stratonikov 2024-10-18 14:08:57 +03:00 committed by Evgenii Stratonikov
parent 1e6f132b4e
commit 44df67492f

View file

@ -224,78 +224,86 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
buf := make([]byte, addressKeySize)
for i := range prm.target {
id := prm.target[i].Object()
cnr := prm.target[i].Container()
// prevent locked objects to be inhumed
if !prm.forceRemoval && objectLocked(tx, cnr, id) {
return new(apistatus.ObjectLocked)
}
var lockWasChecked bool
// prevent lock objects to be inhumed
// if `Inhume` was called not with the
// `WithForceGCMark` option
if !prm.forceRemoval {
if isLockObject(tx, cnr, id) {
return ErrLockObjectRemoval
}
lockWasChecked = true
}
obj, err := db.get(tx, prm.target[i], buf, false, true, epoch)
targetKey := addressKey(prm.target[i], buf)
var ecErr *objectSDK.ECInfoError
if err == nil {
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res)
if err != nil {
return err
}
} else if errors.As(err, &ecErr) {
err = db.inhumeECInfo(tx, epoch, prm.tomb, res, garbageBKT, graveyardBKT, ecErr.ECInfo(), cnr, bkt, value)
if err != nil {
return err
}
}
if prm.tomb != nil {
var isTomb bool
isTomb, err = db.markAsGC(graveyardBKT, garbageBKT, targetKey)
if err != nil {
return err
}
if isTomb {
continue
}
}
// consider checking if target is already in graveyard?
err = bkt.Put(targetKey, value)
if err != nil {
if err := db.inhumeTxSingle(bkt, value, graveyardBKT, garbageBKT, prm.target[i], buf, epoch, prm, res); err != nil {
return err
}
if prm.lockObjectHandling {
// do not perform lock check if
// it was already called
if lockWasChecked {
// inhumed object is not of
// the LOCK type
continue
}
if isLockObject(tx, cnr, id) {
res.deletedLockObj = append(res.deletedLockObj, prm.target[i])
}
}
}
return db.applyInhumeResToCounters(tx, res)
}
func (db *DB) inhumeTxSingle(bkt *bbolt.Bucket, value []byte, graveyardBKT, garbageBKT *bbolt.Bucket, addr oid.Address, buf []byte, epoch uint64, prm InhumePrm, res *InhumeRes) error {
id := addr.Object()
cnr := addr.Container()
tx := bkt.Tx()
// prevent locked objects to be inhumed
if !prm.forceRemoval && objectLocked(tx, cnr, id) {
return new(apistatus.ObjectLocked)
}
var lockWasChecked bool
// prevent lock objects to be inhumed
// if `Inhume` was called not with the
// `WithForceGCMark` option
if !prm.forceRemoval {
if isLockObject(tx, cnr, id) {
return ErrLockObjectRemoval
}
lockWasChecked = true
}
obj, err := db.get(tx, addr, buf, false, true, epoch)
targetKey := addressKey(addr, buf)
var ecErr *objectSDK.ECInfoError
if err == nil {
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res)
if err != nil {
return err
}
} else if errors.As(err, &ecErr) {
err = db.inhumeECInfo(tx, epoch, prm.tomb, res, garbageBKT, graveyardBKT, ecErr.ECInfo(), cnr, bkt, value)
if err != nil {
return err
}
}
if prm.tomb != nil {
var isTomb bool
isTomb, err = db.markAsGC(graveyardBKT, garbageBKT, targetKey)
if err != nil {
return err
}
if isTomb {
return nil
}
}
// consider checking if target is already in graveyard?
err = bkt.Put(targetKey, value)
if err != nil {
return err
}
if prm.lockObjectHandling {
// do not perform lock check if
// it was already called
if lockWasChecked {
// inhumed object is not of
// the LOCK type
return nil
}
if isLockObject(tx, cnr, id) {
res.deletedLockObj = append(res.deletedLockObj, addr)
}
}
return nil
}
func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *InhumeRes,
garbageBKT *bbolt.Bucket, graveyardBKT *bbolt.Bucket,
ecInfo *objectSDK.ECInfo, cnr cid.ID, targetBucket *bbolt.Bucket, value []byte,