[#1839] engine: Handle Inhume errors properly

If shard is in read-only or degraded mode, there is no need to increase
error counter.

Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
This commit is contained in:
Evgenii Stratonikov 2022-10-05 13:48:09 +03:00 committed by fyrchik
parent 7356ee91ff
commit 6557f5d249
3 changed files with 30 additions and 23 deletions

View file

@ -28,6 +28,7 @@ Changelog for NeoFS Node
- Storage nodes could enter the network with any state (#1796)
- Missing check of new state value in `ControlService.SetNetmapStatus` (#1797)
- Correlation of object session to request (#1420)
- Do not increase error counter in `engine.Inhume` if shard is read-only (#1839)
### Removed
- Remove WIF and NEP2 support in `neofs-cli`'s --wallet flag (#1128)

View file

@ -7,6 +7,7 @@ import (
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.uber.org/zap"
)
// DeletePrm groups the parameters of Delete operation.
@ -88,7 +89,13 @@ func (e *StorageEngine) delete(prm DeletePrm) (DeleteRes, error) {
_, err = sh.Inhume(shPrm)
if err != nil {
if errors.Is(err, shard.ErrReadOnlyMode) || errors.Is(err, shard.ErrDegradedMode) {
e.log.Warn("could not inhume object in shard",
zap.Stringer("shard_id", sh.ID()),
zap.String("error", err.Error()))
} else {
e.reportShardError(sh, "could not inhume object in shard", err)
}
locked.is = errors.As(err, &locked.err)

View file

@ -85,16 +85,15 @@ func (e *StorageEngine) inhume(prm InhumePrm) (InhumeRes, error) {
shPrm.MarkAsGarbage(prm.addrs[i])
}
switch e.inhumeAddr(prm.addrs[i], shPrm, true) {
case 2:
return InhumeRes{}, meta.ErrLockObjectRemoval
case 1:
return InhumeRes{}, apistatus.ObjectLocked{}
case 0:
switch e.inhumeAddr(prm.addrs[i], shPrm, false) {
case 1:
return InhumeRes{}, apistatus.ObjectLocked{}
case 0:
ok, err := e.inhumeAddr(prm.addrs[i], shPrm, true)
if err != nil {
return InhumeRes{}, err
}
if !ok {
ok, err := e.inhumeAddr(prm.addrs[i], shPrm, false)
if err != nil {
return InhumeRes{}, err
} else if !ok {
return InhumeRes{}, errInhumeFailure
}
}
@ -103,15 +102,13 @@ func (e *StorageEngine) inhume(prm InhumePrm) (InhumeRes, error) {
return InhumeRes{}, nil
}
// Returns:
// - 0: fail
// - 1: object locked
// - 2: lock object removal
// - 3: ok
func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm, checkExists bool) (status uint8) {
// Returns ok if object was inhumed during this invocation or before.
func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm, checkExists bool) (bool, error) {
root := false
var errLocked apistatus.ObjectLocked
var existPrm shard.ExistsPrm
var retErr error
var ok bool
e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
defer func() {
@ -128,7 +125,7 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm, checkE
if err != nil {
if shard.IsErrRemoved(err) || shard.IsErrObjectExpired(err) {
// inhumed once - no need to be inhumed again
status = 3
ok = true
return true
}
@ -148,10 +145,13 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm, checkE
if err != nil {
switch {
case errors.As(err, &errLocked):
status = 1
retErr = apistatus.ObjectLocked{}
return true
case errors.Is(err, shard.ErrLockObjectRemoval):
status = 2
retErr = meta.ErrLockObjectRemoval
return true
case errors.Is(err, shard.ErrReadOnlyMode) || errors.Is(err, shard.ErrDegradedMode):
retErr = err
return true
}
@ -159,12 +159,11 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm, checkE
return false
}
status = 3
ok = true
return true
})
return
return ok, retErr
}
func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []meta.TombstonedObject) {