From 1f1aed87be8acbf577fe65bd49cbd5ceb59bf8c5 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Fri, 31 Mar 2023 14:12:18 +0300 Subject: [PATCH] [#188] metabase: Refactor object inhume Resolve funlen linter for db.Inhume method Signed-off-by: Dmitrii Stepanov --- pkg/local_object_storage/metabase/inhume.go | 271 +++++++++++--------- 1 file changed, 150 insertions(+), 121 deletions(-) diff --git a/pkg/local_object_storage/metabase/inhume.go b/pkg/local_object_storage/metabase/inhume.go index 3f33f990c..b6e6cadf1 100644 --- a/pkg/local_object_storage/metabase/inhume.go +++ b/pkg/local_object_storage/metabase/inhume.go @@ -118,8 +118,6 @@ var ErrLockObjectRemoval = logicerr.New("lock object removal") // // NOTE: Marks any object with GC mark (despite any prohibitions on operations // with that object) if WithForceGCMark option has been provided. -// -// nolint: funlen, gocognit func (db *DB) Inhume(prm InhumePrm) (res InhumeRes, err error) { db.modeMtx.RLock() defer db.modeMtx.RUnlock() @@ -131,145 +129,176 @@ func (db *DB) Inhume(prm InhumePrm) (res InhumeRes, err error) { } currEpoch := db.epochState.CurrentEpoch() - var inhumed uint64 - err = db.boltDB.Update(func(tx *bbolt.Tx) error { - garbageBKT := tx.Bucket(garbageBucketName) - graveyardBKT := tx.Bucket(graveyardBucketName) + return db.inhumeTx(tx, currEpoch, prm, &res) + }) - var ( - // target bucket of the operation, one of the: - // 1. Graveyard if Inhume was called with a Tombstone - // 2. Garbage if Inhume was called with a GC mark - bkt *bbolt.Bucket - // value that will be put in the bucket, one of the: - // 1. tombstone address if Inhume was called with - // a Tombstone - // 2. zeroValue if Inhume was called with a GC mark - value []byte - ) + return +} - if prm.tomb != nil { - bkt = graveyardBKT - tombKey := addressKey(*prm.tomb, make([]byte, addressKeySize)) +func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes) error { + garbageBKT := tx.Bucket(garbageBucketName) + graveyardBKT := tx.Bucket(graveyardBucketName) - // it is forbidden to have a tomb-on-tomb in FrostFS, - // so graveyard keys must not be addresses of tombstones - data := bkt.Get(tombKey) - if data != nil { - err := bkt.Delete(tombKey) - if err != nil { - return fmt.Errorf("could not remove grave with tombstone key: %w", err) - } - } + bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, &prm) + if err != nil { + return err + } - value = tombKey - } else { - bkt = garbageBKT - value = zeroValue + buf := make([]byte, addressKeySize) + for i := range prm.target { + id := prm.target[i].Object() + cnr := prm.target[i].Container() + + // prevent locked objects to be inhumed + if !prm.forceRemoval && objectLocked(tx, cnr, id) { + return apistatus.ObjectLocked{} } - buf := make([]byte, addressKeySize) - for i := range prm.target { - id := prm.target[i].Object() - cnr := prm.target[i].Container() + var lockWasChecked bool - // prevent locked objects to be inhumed - if !prm.forceRemoval && objectLocked(tx, cnr, id) { - return apistatus.ObjectLocked{} + // prevent lock objects to be inhumed + // if `Inhume` was called not with the + // `WithForceGCMark` option + if !prm.forceRemoval { + if isLockObject(tx, cnr, id) { + return ErrLockObjectRemoval } - var lockWasChecked bool + lockWasChecked = true + } - // prevent lock objects to be inhumed - // if `Inhume` was called not with the - // `WithForceGCMark` option - if !prm.forceRemoval { - if isLockObject(tx, cnr, id) { - return ErrLockObjectRemoval - } - - lockWasChecked = true + obj, err := db.get(tx, prm.target[i], buf, false, true, epoch) + targetKey := addressKey(prm.target[i], buf) + if err == nil { + err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res) + if err != nil { + return err } + } - obj, err := db.get(tx, prm.target[i], buf, false, true, currEpoch) - targetKey := addressKey(prm.target[i], buf) - if err == nil { - containerID, _ := obj.ContainerID() - if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 { - inhumed++ - res.storeDeletionInfo(containerID, obj.PayloadSize()) - } - - // if object is stored, and it is regular object then update bucket - // with container size estimations - if obj.Type() == object.TypeRegular { - err := changeContainerSize(tx, cnr, obj.PayloadSize(), false) - if err != nil { - return err - } - } - } - - if prm.tomb != nil { - targetIsTomb := false - - // iterate over graveyard and check if target address - // is the address of tombstone in graveyard. - err = bkt.ForEach(func(k, v []byte) error { - // check if graveyard has record with key corresponding - // to tombstone address (at least one) - targetIsTomb = bytes.Equal(v, targetKey) - - if targetIsTomb { - // break bucket iterator - return errBreakBucketForEach - } - - return nil - }) - if err != nil && !errors.Is(err, errBreakBucketForEach) { - return err - } - - // do not add grave if target is a tombstone - if targetIsTomb { - continue - } - - // if tombstone appears object must be - // additionally marked with GC - err = garbageBKT.Put(targetKey, zeroValue) - if err != nil { - return err - } - } - - // consider checking if target is already in graveyard? - err = bkt.Put(targetKey, value) + if prm.tomb != nil { + var isTomb bool + isTomb, err = db.markAsGC(graveyardBKT, garbageBKT, targetKey) if err != nil { return err } - if prm.lockObjectHandling { - // do not perform lock check if - // it was already called - if lockWasChecked { - // inhumed object is not of - // the LOCK type - continue - } - - if isLockObject(tx, cnr, id) { - res.deletedLockObj = append(res.deletedLockObj, prm.target[i]) - } + if isTomb { + continue } } - return db.updateCounter(tx, logical, inhumed, false) - }) + // consider checking if target is already in graveyard? + err = bkt.Put(targetKey, value) + if err != nil { + return err + } - res.availableImhumed = inhumed + if prm.lockObjectHandling { + // do not perform lock check if + // it was already called + if lockWasChecked { + // inhumed object is not of + // the LOCK type + continue + } - return + if isLockObject(tx, cnr, id) { + res.deletedLockObj = append(res.deletedLockObj, prm.target[i]) + } + } + } + + return db.updateCounter(tx, logical, res.availableImhumed, false) +} + +// getInhumeTargetBucketAndValue return target bucket to store inhume result and value that will be put in the bucket. +// +// target bucket of the operation, one of the: +// 1. Graveyard if Inhume was called with a Tombstone +// 2. Garbage if Inhume was called with a GC mark +// +// value that will be put in the bucket, one of the: +// 1. tombstone address if Inhume was called with +// a Tombstone +// 2. zeroValue if Inhume was called with a GC mark +func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Bucket, prm *InhumePrm) (targetBucket *bbolt.Bucket, value []byte, err error) { + if prm.tomb != nil { + targetBucket = graveyardBKT + tombKey := addressKey(*prm.tomb, make([]byte, addressKeySize)) + + // it is forbidden to have a tomb-on-tomb in FrostFS, + // so graveyard keys must not be addresses of tombstones + data := targetBucket.Get(tombKey) + if data != nil { + err := targetBucket.Delete(tombKey) + if err != nil { + return nil, nil, fmt.Errorf("could not remove grave with tombstone key: %w", err) + } + } + + value = tombKey + } else { + targetBucket = garbageBKT + value = zeroValue + } + return targetBucket, value, nil +} + +func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, key []byte) (bool, error) { + targetIsTomb, err := isTomb(graveyardBKT, key) + if err != nil { + return false, err + } + + // do not add grave if target is a tombstone + if targetIsTomb { + return true, nil + } + + // if tombstone appears object must be + // additionally marked with GC + return false, garbageBKT.Put(key, zeroValue) +} + +func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *object.Object, res *InhumeRes) error { + containerID, _ := obj.ContainerID() + if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 { + res.availableImhumed++ + res.storeDeletionInfo(containerID, obj.PayloadSize()) + } + + // if object is stored, and it is regular object then update bucket + // with container size estimations + if obj.Type() == object.TypeRegular { + err := changeContainerSize(tx, cnr, obj.PayloadSize(), false) + if err != nil { + return err + } + } + return nil +} + +func isTomb(graveyardBucket *bbolt.Bucket, key []byte) (bool, error) { + targetIsTomb := false + + // iterate over graveyard and check if target address + // is the address of tombstone in graveyard. + err := graveyardBucket.ForEach(func(k, v []byte) error { + // check if graveyard has record with key corresponding + // to tombstone address (at least one) + targetIsTomb = bytes.Equal(v, key) + + if targetIsTomb { + // break bucket iterator + return errBreakBucketForEach + } + + return nil + }) + if err != nil && !errors.Is(err, errBreakBucketForEach) { + return false, err + } + return targetIsTomb, nil } -- 2.45.2