[#1502] engine: Check all shards for LOCK
'ing before inhuming
It allows keeping all the locked objects safe after metabase resynchronization. Currently, all `LOCK` objects are broadcast to all nodes in a container, it guarantees `LOCK` object presence in a regular situation. Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
parent
34e8d2ba56
commit
3b61cb4f49
1 changed files with 36 additions and 0 deletions
|
@ -9,6 +9,7 @@ import (
|
|||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// InhumePrm encapsulates parameters for inhume operation.
|
||||
|
@ -79,6 +80,18 @@ func (e *StorageEngine) inhume(prm InhumePrm) (InhumeRes, error) {
|
|||
}
|
||||
|
||||
for i := range prm.addrs {
|
||||
if !prm.forceRemoval {
|
||||
locked, err := e.isLocked(prm.addrs[i])
|
||||
if err != nil {
|
||||
e.log.Warn("removing an object without full locking check",
|
||||
zap.Error(err),
|
||||
zap.Stringer("addr", prm.addrs[i]))
|
||||
} else if locked {
|
||||
var lockedErr apistatus.ObjectLocked
|
||||
return InhumeRes{}, lockedErr
|
||||
}
|
||||
}
|
||||
|
||||
if prm.tombstone != nil {
|
||||
shPrm.SetTarget(*prm.tombstone, prm.addrs[i])
|
||||
} else {
|
||||
|
@ -166,6 +179,29 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm, checkE
|
|||
return ok, retErr
|
||||
}
|
||||
|
||||
func (e *StorageEngine) isLocked(addr oid.Address) (bool, error) {
|
||||
var locked bool
|
||||
var err error
|
||||
var outErr error
|
||||
|
||||
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
|
||||
locked, err = h.Shard.IsLocked(addr)
|
||||
if err != nil {
|
||||
e.reportShardError(h, "can't check object's lockers", err, zap.Stringer("addr", addr))
|
||||
outErr = err
|
||||
return false
|
||||
}
|
||||
|
||||
return locked
|
||||
})
|
||||
|
||||
if locked {
|
||||
return locked, nil
|
||||
}
|
||||
|
||||
return locked, outErr
|
||||
}
|
||||
|
||||
func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []meta.TombstonedObject) {
|
||||
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
||||
sh.HandleExpiredTombstones(addrs)
|
||||
|
|
Loading…
Reference in a new issue