forked from TrueCloudLab/frostfs-node
[#1829] engine: Delete split objects properly
Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
This commit is contained in:
parent
5e493b7f1c
commit
b2aa9947c2
3 changed files with 156 additions and 6 deletions
|
@ -62,21 +62,29 @@ func (e *StorageEngine) delete(prm DeletePrm) (DeleteRes, error) {
|
|||
is bool
|
||||
err apistatus.ObjectLocked
|
||||
}
|
||||
var splitInfo *objectSDK.SplitInfo
|
||||
|
||||
// Removal of a big object is done in multiple stages:
|
||||
// 1. Remove the parent object. If it is locked or already removed, return immediately.
|
||||
// 2. Otherwise, search for all objects with a particular SplitID and delete them too.
|
||||
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
|
||||
var existsPrm shard.ExistsPrm
|
||||
existsPrm.SetAddress(prm.addr)
|
||||
|
||||
resExists, err := sh.Exists(existsPrm)
|
||||
if err != nil {
|
||||
_, ok := err.(*objectSDK.SplitInfoError)
|
||||
if ok || shard.IsErrRemoved(err) || shard.IsErrObjectExpired(err) {
|
||||
if shard.IsErrRemoved(err) || shard.IsErrObjectExpired(err) {
|
||||
return true
|
||||
}
|
||||
if !shard.IsErrNotFound(err) {
|
||||
e.reportShardError(sh, "could not check object existence", err)
|
||||
|
||||
splitErr, ok := err.(*objectSDK.SplitInfoError)
|
||||
if !ok {
|
||||
if !shard.IsErrNotFound(err) {
|
||||
e.reportShardError(sh, "could not check object existence", err)
|
||||
}
|
||||
return false
|
||||
}
|
||||
return false
|
||||
splitInfo = splitErr.SplitInfo()
|
||||
} else if !resExists.Exists() {
|
||||
return false
|
||||
}
|
||||
|
@ -102,12 +110,54 @@ func (e *StorageEngine) delete(prm DeletePrm) (DeleteRes, error) {
|
|||
return locked.is
|
||||
}
|
||||
|
||||
return true
|
||||
// If a parent object is removed we should set GC mark on each shard.
|
||||
return splitInfo == nil
|
||||
})
|
||||
|
||||
if locked.is {
|
||||
return DeleteRes{}, locked.err
|
||||
}
|
||||
|
||||
if splitInfo != nil {
|
||||
e.deleteChildren(prm.addr, prm.forceRemoval, splitInfo.SplitID())
|
||||
}
|
||||
|
||||
return DeleteRes{}, nil
|
||||
}
|
||||
|
||||
func (e *StorageEngine) deleteChildren(addr oid.Address, force bool, splitID *objectSDK.SplitID) {
|
||||
var fs objectSDK.SearchFilters
|
||||
fs.AddSplitIDFilter(objectSDK.MatchStringEqual, splitID)
|
||||
|
||||
var selectPrm shard.SelectPrm
|
||||
selectPrm.SetFilters(fs)
|
||||
selectPrm.SetContainerID(addr.Container())
|
||||
|
||||
var inhumePrm shard.InhumePrm
|
||||
if force {
|
||||
inhumePrm.ForceRemoval()
|
||||
}
|
||||
|
||||
e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
|
||||
res, err := sh.Select(selectPrm)
|
||||
if err != nil {
|
||||
e.log.Warn("error during searching for object children",
|
||||
zap.Stringer("addr", addr),
|
||||
zap.String("error", err.Error()))
|
||||
return false
|
||||
}
|
||||
|
||||
for _, addr := range res.AddressList() {
|
||||
inhumePrm.MarkAsGarbage(addr)
|
||||
|
||||
_, err = sh.Inhume(inhumePrm)
|
||||
if err != nil {
|
||||
e.log.Debug("could not inhume object in shard",
|
||||
zap.Stringer("addr", addr),
|
||||
zap.String("err", err.Error()))
|
||||
continue
|
||||
}
|
||||
}
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue