Refactor local object storage #188
1 changed files with 150 additions and 121 deletions
|
@ -118,8 +118,6 @@ var ErrLockObjectRemoval = logicerr.New("lock object removal")
|
|||
//
|
||||
// NOTE: Marks any object with GC mark (despite any prohibitions on operations
|
||||
// with that object) if WithForceGCMark option has been provided.
|
||||
//
|
||||
// nolint: funlen, gocognit
|
||||
func (db *DB) Inhume(prm InhumePrm) (res InhumeRes, err error) {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
@ -131,145 +129,176 @@ func (db *DB) Inhume(prm InhumePrm) (res InhumeRes, err error) {
|
|||
}
|
||||
|
||||
currEpoch := db.epochState.CurrentEpoch()
|
||||
var inhumed uint64
|
||||
|
||||
err = db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
garbageBKT := tx.Bucket(garbageBucketName)
|
||||
graveyardBKT := tx.Bucket(graveyardBucketName)
|
||||
return db.inhumeTx(tx, currEpoch, prm, &res)
|
||||
})
|
||||
|
||||
var (
|
||||
// target bucket of the operation, one of the:
|
||||
// 1. Graveyard if Inhume was called with a Tombstone
|
||||
// 2. Garbage if Inhume was called with a GC mark
|
||||
bkt *bbolt.Bucket
|
||||
// value that will be put in the bucket, one of the:
|
||||
// 1. tombstone address if Inhume was called with
|
||||
// a Tombstone
|
||||
// 2. zeroValue if Inhume was called with a GC mark
|
||||
value []byte
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
if prm.tomb != nil {
|
||||
bkt = graveyardBKT
|
||||
tombKey := addressKey(*prm.tomb, make([]byte, addressKeySize))
|
||||
func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes) error {
|
||||
garbageBKT := tx.Bucket(garbageBucketName)
|
||||
graveyardBKT := tx.Bucket(graveyardBucketName)
|
||||
|
||||
// it is forbidden to have a tomb-on-tomb in FrostFS,
|
||||
// so graveyard keys must not be addresses of tombstones
|
||||
data := bkt.Get(tombKey)
|
||||
if data != nil {
|
||||
err := bkt.Delete(tombKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not remove grave with tombstone key: %w", err)
|
||||
}
|
||||
}
|
||||
bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, &prm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
value = tombKey
|
||||
} else {
|
||||
bkt = garbageBKT
|
||||
value = zeroValue
|
||||
buf := make([]byte, addressKeySize)
|
||||
for i := range prm.target {
|
||||
id := prm.target[i].Object()
|
||||
cnr := prm.target[i].Container()
|
||||
|
||||
// prevent locked objects to be inhumed
|
||||
if !prm.forceRemoval && objectLocked(tx, cnr, id) {
|
||||
return apistatus.ObjectLocked{}
|
||||
}
|
||||
|
||||
buf := make([]byte, addressKeySize)
|
||||
for i := range prm.target {
|
||||
id := prm.target[i].Object()
|
||||
cnr := prm.target[i].Container()
|
||||
var lockWasChecked bool
|
||||
|
||||
// prevent locked objects to be inhumed
|
||||
if !prm.forceRemoval && objectLocked(tx, cnr, id) {
|
||||
return apistatus.ObjectLocked{}
|
||||
// prevent lock objects to be inhumed
|
||||
// if `Inhume` was called not with the
|
||||
// `WithForceGCMark` option
|
||||
if !prm.forceRemoval {
|
||||
if isLockObject(tx, cnr, id) {
|
||||
return ErrLockObjectRemoval
|
||||
}
|
||||
|
||||
var lockWasChecked bool
|
||||
lockWasChecked = true
|
||||
}
|
||||
|
||||
// prevent lock objects to be inhumed
|
||||
// if `Inhume` was called not with the
|
||||
// `WithForceGCMark` option
|
||||
if !prm.forceRemoval {
|
||||
if isLockObject(tx, cnr, id) {
|
||||
return ErrLockObjectRemoval
|
||||
}
|
||||
|
||||
lockWasChecked = true
|
||||
obj, err := db.get(tx, prm.target[i], buf, false, true, epoch)
|
||||
targetKey := addressKey(prm.target[i], buf)
|
||||
if err == nil {
|
||||
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
obj, err := db.get(tx, prm.target[i], buf, false, true, currEpoch)
|
||||
targetKey := addressKey(prm.target[i], buf)
|
||||
if err == nil {
|
||||
containerID, _ := obj.ContainerID()
|
||||
if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 {
|
||||
inhumed++
|
||||
res.storeDeletionInfo(containerID, obj.PayloadSize())
|
||||
}
|
||||
|
||||
// if object is stored, and it is regular object then update bucket
|
||||
// with container size estimations
|
||||
if obj.Type() == object.TypeRegular {
|
||||
err := changeContainerSize(tx, cnr, obj.PayloadSize(), false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if prm.tomb != nil {
|
||||
targetIsTomb := false
|
||||
|
||||
// iterate over graveyard and check if target address
|
||||
// is the address of tombstone in graveyard.
|
||||
err = bkt.ForEach(func(k, v []byte) error {
|
||||
// check if graveyard has record with key corresponding
|
||||
// to tombstone address (at least one)
|
||||
targetIsTomb = bytes.Equal(v, targetKey)
|
||||
|
||||
if targetIsTomb {
|
||||
// break bucket iterator
|
||||
return errBreakBucketForEach
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil && !errors.Is(err, errBreakBucketForEach) {
|
||||
return err
|
||||
}
|
||||
|
||||
// do not add grave if target is a tombstone
|
||||
if targetIsTomb {
|
||||
continue
|
||||
}
|
||||
|
||||
// if tombstone appears object must be
|
||||
// additionally marked with GC
|
||||
err = garbageBKT.Put(targetKey, zeroValue)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// consider checking if target is already in graveyard?
|
||||
err = bkt.Put(targetKey, value)
|
||||
if prm.tomb != nil {
|
||||
var isTomb bool
|
||||
isTomb, err = db.markAsGC(graveyardBKT, garbageBKT, targetKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if prm.lockObjectHandling {
|
||||
// do not perform lock check if
|
||||
// it was already called
|
||||
if lockWasChecked {
|
||||
// inhumed object is not of
|
||||
// the LOCK type
|
||||
continue
|
||||
}
|
||||
|
||||
if isLockObject(tx, cnr, id) {
|
||||
res.deletedLockObj = append(res.deletedLockObj, prm.target[i])
|
||||
}
|
||||
if isTomb {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
return db.updateCounter(tx, logical, inhumed, false)
|
||||
})
|
||||
// consider checking if target is already in graveyard?
|
||||
err = bkt.Put(targetKey, value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
res.availableImhumed = inhumed
|
||||
if prm.lockObjectHandling {
|
||||
// do not perform lock check if
|
||||
// it was already called
|
||||
if lockWasChecked {
|
||||
// inhumed object is not of
|
||||
// the LOCK type
|
||||
continue
|
||||
}
|
||||
|
||||
return
|
||||
if isLockObject(tx, cnr, id) {
|
||||
res.deletedLockObj = append(res.deletedLockObj, prm.target[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return db.updateCounter(tx, logical, res.availableImhumed, false)
|
||||
}
|
||||
|
||||
// getInhumeTargetBucketAndValue return target bucket to store inhume result and value that will be put in the bucket.
|
||||
//
|
||||
// target bucket of the operation, one of the:
|
||||
// 1. Graveyard if Inhume was called with a Tombstone
|
||||
// 2. Garbage if Inhume was called with a GC mark
|
||||
//
|
||||
// value that will be put in the bucket, one of the:
|
||||
// 1. tombstone address if Inhume was called with
|
||||
// a Tombstone
|
||||
// 2. zeroValue if Inhume was called with a GC mark
|
||||
func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Bucket, prm *InhumePrm) (targetBucket *bbolt.Bucket, value []byte, err error) {
|
||||
if prm.tomb != nil {
|
||||
targetBucket = graveyardBKT
|
||||
tombKey := addressKey(*prm.tomb, make([]byte, addressKeySize))
|
||||
|
||||
// it is forbidden to have a tomb-on-tomb in FrostFS,
|
||||
// so graveyard keys must not be addresses of tombstones
|
||||
data := targetBucket.Get(tombKey)
|
||||
if data != nil {
|
||||
err := targetBucket.Delete(tombKey)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("could not remove grave with tombstone key: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
value = tombKey
|
||||
} else {
|
||||
targetBucket = garbageBKT
|
||||
value = zeroValue
|
||||
}
|
||||
return targetBucket, value, nil
|
||||
}
|
||||
|
||||
func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, key []byte) (bool, error) {
|
||||
targetIsTomb, err := isTomb(graveyardBKT, key)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// do not add grave if target is a tombstone
|
||||
if targetIsTomb {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// if tombstone appears object must be
|
||||
// additionally marked with GC
|
||||
return false, garbageBKT.Put(key, zeroValue)
|
||||
}
|
||||
|
||||
func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *object.Object, res *InhumeRes) error {
|
||||
containerID, _ := obj.ContainerID()
|
||||
if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 {
|
||||
res.availableImhumed++
|
||||
res.storeDeletionInfo(containerID, obj.PayloadSize())
|
||||
}
|
||||
|
||||
// if object is stored, and it is regular object then update bucket
|
||||
// with container size estimations
|
||||
if obj.Type() == object.TypeRegular {
|
||||
err := changeContainerSize(tx, cnr, obj.PayloadSize(), false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func isTomb(graveyardBucket *bbolt.Bucket, key []byte) (bool, error) {
|
||||
targetIsTomb := false
|
||||
|
||||
// iterate over graveyard and check if target address
|
||||
// is the address of tombstone in graveyard.
|
||||
err := graveyardBucket.ForEach(func(k, v []byte) error {
|
||||
// check if graveyard has record with key corresponding
|
||||
// to tombstone address (at least one)
|
||||
targetIsTomb = bytes.Equal(v, key)
|
||||
|
||||
if targetIsTomb {
|
||||
// break bucket iterator
|
||||
return errBreakBucketForEach
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil && !errors.Is(err, errBreakBucketForEach) {
|
||||
return false, err
|
||||
}
|
||||
return targetIsTomb, nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue