Change shard mode in case of errors async to support/v0.38 #1098

Merged
fyrchik merged 2 commits from dstepanov-yadro/frostfs-node:fix/shard_read_only_mode into support/v0.38 2024-05-02 11:33:25 +00:00
4 changed files with 25 additions and 24 deletions

View file

@ -133,7 +133,7 @@ func (e *StorageEngine) reportShardErrorBackground(id string, msg string, err er
errCount := sh.errorCount.Add(1) errCount := sh.errorCount.Add(1)
sh.Shard.IncErrorCounter() sh.Shard.IncErrorCounter()
e.reportShardErrorWithFlags(sh.Shard, errCount, false, msg, err) e.reportShardErrorWithFlags(sh.Shard, errCount, msg, err)
} }
// reportShardError checks that the amount of errors doesn't exceed the configured threshold. // reportShardError checks that the amount of errors doesn't exceed the configured threshold.
@ -153,13 +153,12 @@ func (e *StorageEngine) reportShardError(
errCount := sh.errorCount.Add(1) errCount := sh.errorCount.Add(1)
sh.Shard.IncErrorCounter() sh.Shard.IncErrorCounter()
e.reportShardErrorWithFlags(sh.Shard, errCount, true, msg, err, fields...) e.reportShardErrorWithFlags(sh.Shard, errCount, msg, err, fields...)
} }
func (e *StorageEngine) reportShardErrorWithFlags( func (e *StorageEngine) reportShardErrorWithFlags(
sh *shard.Shard, sh *shard.Shard,
errCount uint32, errCount uint32,
block bool,
msg string, msg string,
err error, err error,
fields ...zap.Field, fields ...zap.Field,
@ -175,23 +174,19 @@ func (e *StorageEngine) reportShardErrorWithFlags(
return return
} }
if block { req := setModeRequest{
e.moveToDegraded(sh, errCount) errorCount: errCount,
} else { sh: sh,
req := setModeRequest{ }
errorCount: errCount,
sh: sh,
}
select { select {
case e.setModeCh <- req: case e.setModeCh <- req:
default: default:
// For background workers we can have a lot of such errors, // For background workers we can have a lot of such errors,
// thus logging is done with DEBUG level. // thus logging is done with DEBUG level.
e.log.Debug(logs.EngineModeChangeIsInProgressIgnoringSetmodeRequest, e.log.Debug(logs.EngineModeChangeIsInProgressIgnoringSetmodeRequest,
zap.Stringer("shard_id", sid), zap.Stringer("shard_id", sid),
zap.Uint32("error_count", errCount)) zap.Uint32("error_count", errCount))
}
} }
} }

View file

@ -7,6 +7,7 @@ import (
"path/filepath" "path/filepath"
"strconv" "strconv"
"testing" "testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
@ -229,6 +230,8 @@ func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, errCount uint
sh := e.shards[id.String()] sh := e.shards[id.String()]
e.mtx.RUnlock() e.mtx.RUnlock()
require.Equal(t, errCount, sh.errorCount.Load()) require.Eventually(t, func() bool {
require.Equal(t, mode, sh.GetMode()) return errCount == sh.errorCount.Load() &&
mode == sh.GetMode()
}, 10*time.Second, 10*time.Millisecond, "shard mode doesn't changed to expected state in 10 seconds")
} }

View file

@ -85,6 +85,7 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
shPrm.SetRaw(prm.raw) shPrm.SetRaw(prm.raw)
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
shPrm.ShardLooksBad = sh.errorCount.Load() >= e.errorsThreshold
res, err := sh.Head(ctx, shPrm) res, err := sh.Head(ctx, shPrm)
if err != nil { if err != nil {
switch { switch {

View file

@ -13,8 +13,9 @@ import (
// HeadPrm groups the parameters of Head operation. // HeadPrm groups the parameters of Head operation.
type HeadPrm struct { type HeadPrm struct {
addr oid.Address addr oid.Address
raw bool raw bool
ShardLooksBad bool
} }
// HeadRes groups the resulting values of Head operation. // HeadRes groups the resulting values of Head operation.
@ -59,7 +60,8 @@ func (s *Shard) Head(ctx context.Context, prm HeadPrm) (HeadRes, error) {
var obj *objectSDK.Object var obj *objectSDK.Object
var err error var err error
if s.GetMode().NoMetabase() { mode := s.GetMode()
if mode.NoMetabase() || (mode.ReadOnly() && prm.ShardLooksBad) {
var getPrm GetPrm var getPrm GetPrm
getPrm.SetAddress(prm.addr) getPrm.SetAddress(prm.addr)
getPrm.SetIgnoreMeta(true) getPrm.SetIgnoreMeta(true)