diff --git a/pkg/local_object_storage/metabase/inhume.go b/pkg/local_object_storage/metabase/inhume.go index 915de5262..5ac0c0be5 100644 --- a/pkg/local_object_storage/metabase/inhume.go +++ b/pkg/local_object_storage/metabase/inhume.go @@ -224,78 +224,86 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes buf := make([]byte, addressKeySize) for i := range prm.target { - id := prm.target[i].Object() - cnr := prm.target[i].Container() - - // prevent locked objects to be inhumed - if !prm.forceRemoval && objectLocked(tx, cnr, id) { - return new(apistatus.ObjectLocked) - } - - var lockWasChecked bool - - // prevent lock objects to be inhumed - // if `Inhume` was called not with the - // `WithForceGCMark` option - if !prm.forceRemoval { - if isLockObject(tx, cnr, id) { - return ErrLockObjectRemoval - } - - lockWasChecked = true - } - - obj, err := db.get(tx, prm.target[i], buf, false, true, epoch) - targetKey := addressKey(prm.target[i], buf) - var ecErr *objectSDK.ECInfoError - if err == nil { - err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res) - if err != nil { - return err - } - } else if errors.As(err, &ecErr) { - err = db.inhumeECInfo(tx, epoch, prm.tomb, res, garbageBKT, graveyardBKT, ecErr.ECInfo(), cnr, bkt, value) - if err != nil { - return err - } - } - - if prm.tomb != nil { - var isTomb bool - isTomb, err = db.markAsGC(graveyardBKT, garbageBKT, targetKey) - if err != nil { - return err - } - - if isTomb { - continue - } - } - - // consider checking if target is already in graveyard? - err = bkt.Put(targetKey, value) - if err != nil { + if err := db.inhumeTxSingle(bkt, value, graveyardBKT, garbageBKT, prm.target[i], buf, epoch, prm, res); err != nil { return err } - - if prm.lockObjectHandling { - // do not perform lock check if - // it was already called - if lockWasChecked { - // inhumed object is not of - // the LOCK type - continue - } - - if isLockObject(tx, cnr, id) { - res.deletedLockObj = append(res.deletedLockObj, prm.target[i]) - } - } } return db.applyInhumeResToCounters(tx, res) } +func (db *DB) inhumeTxSingle(bkt *bbolt.Bucket, value []byte, graveyardBKT, garbageBKT *bbolt.Bucket, addr oid.Address, buf []byte, epoch uint64, prm InhumePrm, res *InhumeRes) error { + id := addr.Object() + cnr := addr.Container() + tx := bkt.Tx() + + // prevent locked objects to be inhumed + if !prm.forceRemoval && objectLocked(tx, cnr, id) { + return new(apistatus.ObjectLocked) + } + + var lockWasChecked bool + + // prevent lock objects to be inhumed + // if `Inhume` was called not with the + // `WithForceGCMark` option + if !prm.forceRemoval { + if isLockObject(tx, cnr, id) { + return ErrLockObjectRemoval + } + + lockWasChecked = true + } + + obj, err := db.get(tx, addr, buf, false, true, epoch) + targetKey := addressKey(addr, buf) + var ecErr *objectSDK.ECInfoError + if err == nil { + err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res) + if err != nil { + return err + } + } else if errors.As(err, &ecErr) { + err = db.inhumeECInfo(tx, epoch, prm.tomb, res, garbageBKT, graveyardBKT, ecErr.ECInfo(), cnr, bkt, value) + if err != nil { + return err + } + } + + if prm.tomb != nil { + var isTomb bool + isTomb, err = db.markAsGC(graveyardBKT, garbageBKT, targetKey) + if err != nil { + return err + } + + if isTomb { + return nil + } + } + + // consider checking if target is already in graveyard? + err = bkt.Put(targetKey, value) + if err != nil { + return err + } + + if prm.lockObjectHandling { + // do not perform lock check if + // it was already called + if lockWasChecked { + // inhumed object is not of + // the LOCK type + return nil + } + + if isLockObject(tx, cnr, id) { + res.deletedLockObj = append(res.deletedLockObj, addr) + } + } + return nil +} + func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *InhumeRes, garbageBKT *bbolt.Bucket, graveyardBKT *bbolt.Bucket, ecInfo *objectSDK.ECInfo, cnr cid.ID, targetBucket *bbolt.Bucket, value []byte,