From 6557f5d249d725a050717867907e470b1b9c1e59 Mon Sep 17 00:00:00 2001
From: Evgenii Stratonikov <evgeniy@morphbits.ru>
Date: Wed, 5 Oct 2022 13:48:09 +0300
Subject: [PATCH] [#1839] engine: Handle Inhume errors properly

If shard is in read-only or degraded mode, there is no need to increase
error counter.

Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
---
 CHANGELOG.md                              |  1 +
 pkg/local_object_storage/engine/delete.go |  9 ++++-
 pkg/local_object_storage/engine/inhume.go | 43 +++++++++++------------
 3 files changed, 30 insertions(+), 23 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 450de65b1e..f17a3c088e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -28,6 +28,7 @@ Changelog for NeoFS Node
 - Storage nodes could enter the network with any state (#1796)
 - Missing check of new state value in `ControlService.SetNetmapStatus` (#1797)
 - Correlation of object session to request (#1420)
+- Do not increase error counter in `engine.Inhume` if shard is read-only (#1839)
 
 ### Removed
 - Remove WIF and NEP2 support in `neofs-cli`'s --wallet flag (#1128)
diff --git a/pkg/local_object_storage/engine/delete.go b/pkg/local_object_storage/engine/delete.go
index 25dafd95a4..c73ed798eb 100644
--- a/pkg/local_object_storage/engine/delete.go
+++ b/pkg/local_object_storage/engine/delete.go
@@ -7,6 +7,7 @@ import (
 	apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
 	objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
 	oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
+	"go.uber.org/zap"
 )
 
 // DeletePrm groups the parameters of Delete operation.
@@ -88,7 +89,13 @@ func (e *StorageEngine) delete(prm DeletePrm) (DeleteRes, error) {
 
 		_, err = sh.Inhume(shPrm)
 		if err != nil {
-			e.reportShardError(sh, "could not inhume object in shard", err)
+			if errors.Is(err, shard.ErrReadOnlyMode) || errors.Is(err, shard.ErrDegradedMode) {
+				e.log.Warn("could not inhume object in shard",
+					zap.Stringer("shard_id", sh.ID()),
+					zap.String("error", err.Error()))
+			} else {
+				e.reportShardError(sh, "could not inhume object in shard", err)
+			}
 
 			locked.is = errors.As(err, &locked.err)
 
diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go
index a5ab729680..2a3bdef9a3 100644
--- a/pkg/local_object_storage/engine/inhume.go
+++ b/pkg/local_object_storage/engine/inhume.go
@@ -85,16 +85,15 @@ func (e *StorageEngine) inhume(prm InhumePrm) (InhumeRes, error) {
 			shPrm.MarkAsGarbage(prm.addrs[i])
 		}
 
-		switch e.inhumeAddr(prm.addrs[i], shPrm, true) {
-		case 2:
-			return InhumeRes{}, meta.ErrLockObjectRemoval
-		case 1:
-			return InhumeRes{}, apistatus.ObjectLocked{}
-		case 0:
-			switch e.inhumeAddr(prm.addrs[i], shPrm, false) {
-			case 1:
-				return InhumeRes{}, apistatus.ObjectLocked{}
-			case 0:
+		ok, err := e.inhumeAddr(prm.addrs[i], shPrm, true)
+		if err != nil {
+			return InhumeRes{}, err
+		}
+		if !ok {
+			ok, err := e.inhumeAddr(prm.addrs[i], shPrm, false)
+			if err != nil {
+				return InhumeRes{}, err
+			} else if !ok {
 				return InhumeRes{}, errInhumeFailure
 			}
 		}
@@ -103,15 +102,13 @@ func (e *StorageEngine) inhume(prm InhumePrm) (InhumeRes, error) {
 	return InhumeRes{}, nil
 }
 
-// Returns:
-//   - 0: fail
-//   - 1: object locked
-//   - 2: lock object removal
-//   - 3: ok
-func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm, checkExists bool) (status uint8) {
+// Returns ok if object was inhumed during this invocation or before.
+func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm, checkExists bool) (bool, error) {
 	root := false
 	var errLocked apistatus.ObjectLocked
 	var existPrm shard.ExistsPrm
+	var retErr error
+	var ok bool
 
 	e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
 		defer func() {
@@ -128,7 +125,7 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm, checkE
 			if err != nil {
 				if shard.IsErrRemoved(err) || shard.IsErrObjectExpired(err) {
 					// inhumed once - no need to be inhumed again
-					status = 3
+					ok = true
 					return true
 				}
 
@@ -148,10 +145,13 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm, checkE
 		if err != nil {
 			switch {
 			case errors.As(err, &errLocked):
-				status = 1
+				retErr = apistatus.ObjectLocked{}
 				return true
 			case errors.Is(err, shard.ErrLockObjectRemoval):
-				status = 2
+				retErr = meta.ErrLockObjectRemoval
+				return true
+			case errors.Is(err, shard.ErrReadOnlyMode) || errors.Is(err, shard.ErrDegradedMode):
+				retErr = err
 				return true
 			}
 
@@ -159,12 +159,11 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm, checkE
 			return false
 		}
 
-		status = 3
-
+		ok = true
 		return true
 	})
 
-	return
+	return ok, retErr
 }
 
 func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []meta.TombstonedObject) {