[#1502] engine: Check all shards for LOCK'ing before inhuming

It allows keeping all the locked objects safe after metabase
resynchronization. Currently, all `LOCK` objects are broadcast to all nodes
in a container, it guarantees `LOCK` object presence in a regular situation.

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
Pavel Karpy 2022-11-12 15:27:54 +03:00 committed by fyrchik
parent 049ab58336
commit 84f545dacc

View file

@ -9,6 +9,7 @@ import (
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.uber.org/zap"
)
// InhumePrm encapsulates parameters for inhume operation.
@ -79,6 +80,18 @@ func (e *StorageEngine) inhume(prm InhumePrm) (InhumeRes, error) {
}
for i := range prm.addrs {
if !prm.forceRemoval {
locked, err := e.isLocked(prm.addrs[i])
if err != nil {
e.log.Warn("removing an object without full locking check",
zap.Error(err),
zap.Stringer("addr", prm.addrs[i]))
} else if locked {
var lockedErr apistatus.ObjectLocked
return InhumeRes{}, lockedErr
}
}
if prm.tombstone != nil {
shPrm.SetTarget(*prm.tombstone, prm.addrs[i])
} else {
@ -166,6 +179,29 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm, checkE
return ok, retErr
}
func (e *StorageEngine) isLocked(addr oid.Address) (bool, error) {
var locked bool
var err error
var outErr error
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
locked, err = h.Shard.IsLocked(addr)
if err != nil {
e.reportShardError(h, "can't check object's lockers", err, zap.Stringer("addr", addr))
outErr = err
return false
}
return locked
})
if locked {
return locked, nil
}
return locked, outErr
}
func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []meta.TombstonedObject) {
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
sh.HandleExpiredTombstones(addrs)