From 2ba3abde5cd39cdf38aa32b3eec66106bbb7f630 Mon Sep 17 00:00:00 2001
From: Evgenii Stratonikov <evgeniy@morphbits.ru>
Date: Thu, 10 Nov 2022 13:58:46 +0300
Subject: [PATCH] [#2035] engine: Allow moving to degraded from background
 workers

Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
---
 CHANGELOG.md                                  |   1 +
 pkg/local_object_storage/engine/control.go    |   7 +-
 .../engine/control_test.go                    |   1 -
 pkg/local_object_storage/engine/engine.go     | 159 +++++++++++++-----
 pkg/local_object_storage/engine/error_test.go |   2 +-
 5 files changed, 126 insertions(+), 44 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 35d7abb5c..1505507b8 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -5,6 +5,7 @@ Changelog for NeoFS Node
 
 ### Added
 - `session` flag support to `neofs-cli object hash` (#2029)
+- Shard can now change mode when encountering background disk errors (#2035)
 
 ### Changed
 - `object lock` command reads CID and OID the same way other commands do (#1971)
diff --git a/pkg/local_object_storage/engine/control.go b/pkg/local_object_storage/engine/control.go
index 6470e0d96..420691cd6 100644
--- a/pkg/local_object_storage/engine/control.go
+++ b/pkg/local_object_storage/engine/control.go
@@ -92,6 +92,9 @@ func (e *StorageEngine) Init() error {
 		return errors.New("failed initialization on all shards")
 	}
 
+	e.wg.Add(1)
+	go e.setModeLoop()
+
 	return nil
 }
 
@@ -100,8 +103,10 @@ var errClosed = errors.New("storage engine is closed")
 // Close releases all StorageEngine's components. Waits for all data-related operations to complete.
 // After the call, all the next ones will fail.
 //
-// The method is supposed to be called when the application exits.
+// The method MUST only be called when the application exits.
 func (e *StorageEngine) Close() error {
+	close(e.closeCh)
+	defer e.wg.Wait()
 	return e.setBlockExecErr(errClosed)
 }
 
diff --git a/pkg/local_object_storage/engine/control_test.go b/pkg/local_object_storage/engine/control_test.go
index 1d2dee6c0..6929ab200 100644
--- a/pkg/local_object_storage/engine/control_test.go
+++ b/pkg/local_object_storage/engine/control_test.go
@@ -20,7 +20,6 @@ import (
 func TestExecBlocks(t *testing.T) {
 	e := testNewEngineWithShardNum(t, 2) // number doesn't matter in this test, 2 is several but not many
 	t.Cleanup(func() {
-		e.Close()
 		os.RemoveAll(t.Name())
 	})
 
diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go
index cf61253b4..c7aeed0c7 100644
--- a/pkg/local_object_storage/engine/engine.go
+++ b/pkg/local_object_storage/engine/engine.go
@@ -23,6 +23,10 @@ type StorageEngine struct {
 
 	shardPools map[string]util.WorkerPool
 
+	closeCh   chan struct{}
+	setModeCh chan setModeRequest
+	wg        sync.WaitGroup
+
 	blockExec struct {
 		mtx sync.RWMutex
 
@@ -35,52 +39,51 @@ type shardWrapper struct {
 	*shard.Shard
 }
 
-// reportShardErrorBackground increases shard error counter and logs an error.
-// It is intended to be used from background workers and
-// doesn't change shard mode because of possible deadlocks.
-func (e *StorageEngine) reportShardErrorBackground(id string, msg string, err error) {
-	e.mtx.RLock()
-	sh, ok := e.shards[id]
-	e.mtx.RUnlock()
-
-	if !ok {
-		return
-	}
-
-	errCount := sh.errorCount.Inc()
-	e.log.Warn(msg,
-		zap.String("shard_id", id),
-		zap.Uint32("error count", errCount),
-		zap.String("error", err.Error()))
+type setModeRequest struct {
+	sh         *shard.Shard
+	errorCount uint32
 }
 
-// reportShardError checks that the amount of errors doesn't exceed the configured threshold.
-// If it does, shard is set to read-only mode.
-func (e *StorageEngine) reportShardError(
-	sh hashedShard,
-	msg string,
-	err error,
-	fields ...zap.Field) {
-	if isLogical(err) {
-		e.log.Warn(msg,
-			zap.Stringer("shard_id", sh.ID()),
-			zap.String("error", err.Error()))
-		return
+// setModeLoop listens setModeCh to perform degraded mode transition of a single shard.
+// Instead of creating a worker per single shard we use a single goroutine.
+func (e *StorageEngine) setModeLoop() {
+	defer e.wg.Done()
+
+	var (
+		mtx        sync.RWMutex // protects inProgress map
+		inProgress = make(map[string]struct{})
+	)
+
+	for {
+		select {
+		case <-e.closeCh:
+			return
+		case r := <-e.setModeCh:
+			sid := r.sh.ID().String()
+
+			mtx.Lock()
+			_, ok := inProgress[sid]
+			if !ok {
+				inProgress[sid] = struct{}{}
+				go func() {
+					e.moveToDegraded(r.sh, r.errorCount)
+
+					mtx.Lock()
+					delete(inProgress, sid)
+					mtx.Unlock()
+				}()
+			}
+			mtx.Unlock()
+		}
 	}
+}
+
+func (e *StorageEngine) moveToDegraded(sh *shard.Shard, errCount uint32) {
+	e.mtx.RLock()
+	defer e.mtx.RUnlock()
 
 	sid := sh.ID()
-	errCount := sh.errorCount.Inc()
-	e.log.Warn(msg, append([]zap.Field{
-		zap.Stringer("shard_id", sid),
-		zap.Uint32("error count", errCount),
-		zap.String("error", err.Error()),
-	}, fields...)...)
-
-	if e.errorsThreshold == 0 || errCount < e.errorsThreshold {
-		return
-	}
-
-	err = sh.SetMode(mode.DegradedReadOnly)
+	err := sh.SetMode(mode.DegradedReadOnly)
 	if err != nil {
 		e.log.Error("failed to move shard in degraded-read-only mode, moving to read-only",
 			zap.Stringer("shard_id", sid),
@@ -105,6 +108,78 @@ func (e *StorageEngine) reportShardError(
 	}
 }
 
+// reportShardErrorBackground increases shard error counter and logs an error.
+// It is intended to be used from background workers and
+// doesn't change shard mode because of possible deadlocks.
+func (e *StorageEngine) reportShardErrorBackground(id string, msg string, err error) {
+	e.mtx.RLock()
+	sh, ok := e.shards[id]
+	e.mtx.RUnlock()
+
+	if !ok {
+		return
+	}
+
+	errCount := sh.errorCount.Inc()
+	e.reportShardErrorWithFlags(sh.Shard, errCount, false, msg, err)
+}
+
+// reportShardError checks that the amount of errors doesn't exceed the configured threshold.
+// If it does, shard is set to read-only mode.
+func (e *StorageEngine) reportShardError(
+	sh hashedShard,
+	msg string,
+	err error,
+	fields ...zap.Field) {
+	if isLogical(err) {
+		e.log.Warn(msg,
+			zap.Stringer("shard_id", sh.ID()),
+			zap.String("error", err.Error()))
+		return
+	}
+
+	errCount := sh.errorCount.Inc()
+	e.reportShardErrorWithFlags(sh.Shard, errCount, true, msg, err, fields...)
+}
+
+func (e *StorageEngine) reportShardErrorWithFlags(
+	sh *shard.Shard,
+	errCount uint32,
+	block bool,
+	msg string,
+	err error,
+	fields ...zap.Field) {
+	sid := sh.ID()
+	e.log.Warn(msg, append([]zap.Field{
+		zap.Stringer("shard_id", sid),
+		zap.Uint32("error count", errCount),
+		zap.String("error", err.Error()),
+	}, fields...)...)
+
+	if e.errorsThreshold == 0 || errCount < e.errorsThreshold {
+		return
+	}
+
+	if block {
+		e.moveToDegraded(sh, errCount)
+	} else {
+		req := setModeRequest{
+			errorCount: errCount,
+			sh:         sh,
+		}
+
+		select {
+		case e.setModeCh <- req:
+		default:
+			// For background workers we can have a lot of such errors,
+			// thus logging is done with DEBUG level.
+			e.log.Debug("mode change is in progress, ignoring set-mode request",
+				zap.Stringer("shard_id", sid),
+				zap.Uint32("error_count", errCount))
+		}
+	}
+}
+
 func isLogical(err error) bool {
 	return errors.As(err, &logicerr.Logical{})
 }
@@ -143,6 +218,8 @@ func New(opts ...Option) *StorageEngine {
 		mtx:        new(sync.RWMutex),
 		shards:     make(map[string]shardWrapper),
 		shardPools: make(map[string]util.WorkerPool),
+		closeCh:    make(chan struct{}),
+		setModeCh:  make(chan setModeRequest),
 	}
 }
 
diff --git a/pkg/local_object_storage/engine/error_test.go b/pkg/local_object_storage/engine/error_test.go
index b6bbc1bca..cd0d10658 100644
--- a/pkg/local_object_storage/engine/error_test.go
+++ b/pkg/local_object_storage/engine/error_test.go
@@ -200,8 +200,8 @@ func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, errCount uint
 	sh := e.shards[id.String()]
 	e.mtx.RUnlock()
 
-	require.Equal(t, mode, sh.GetMode())
 	require.Equal(t, errCount, sh.errorCount.Load())
+	require.Equal(t, mode, sh.GetMode())
 }
 
 // corruptSubDir makes random directory except "blobovnicza" in blobstor FSTree unreadable.