Change shard mode in case of errors async to support/v0.38 #1098
4 changed files with 25 additions and 24 deletions
|
@ -133,7 +133,7 @@ func (e *StorageEngine) reportShardErrorBackground(id string, msg string, err er
|
|||
|
||||
errCount := sh.errorCount.Add(1)
|
||||
sh.Shard.IncErrorCounter()
|
||||
e.reportShardErrorWithFlags(sh.Shard, errCount, false, msg, err)
|
||||
e.reportShardErrorWithFlags(sh.Shard, errCount, msg, err)
|
||||
}
|
||||
|
||||
// reportShardError checks that the amount of errors doesn't exceed the configured threshold.
|
||||
|
@ -153,13 +153,12 @@ func (e *StorageEngine) reportShardError(
|
|||
|
||||
errCount := sh.errorCount.Add(1)
|
||||
sh.Shard.IncErrorCounter()
|
||||
e.reportShardErrorWithFlags(sh.Shard, errCount, true, msg, err, fields...)
|
||||
e.reportShardErrorWithFlags(sh.Shard, errCount, msg, err, fields...)
|
||||
}
|
||||
|
||||
func (e *StorageEngine) reportShardErrorWithFlags(
|
||||
sh *shard.Shard,
|
||||
errCount uint32,
|
||||
block bool,
|
||||
msg string,
|
||||
err error,
|
||||
fields ...zap.Field,
|
||||
|
@ -175,23 +174,19 @@ func (e *StorageEngine) reportShardErrorWithFlags(
|
|||
return
|
||||
}
|
||||
|
||||
if block {
|
||||
e.moveToDegraded(sh, errCount)
|
||||
} else {
|
||||
req := setModeRequest{
|
||||
errorCount: errCount,
|
||||
sh: sh,
|
||||
}
|
||||
req := setModeRequest{
|
||||
errorCount: errCount,
|
||||
sh: sh,
|
||||
}
|
||||
|
||||
select {
|
||||
case e.setModeCh <- req:
|
||||
default:
|
||||
// For background workers we can have a lot of such errors,
|
||||
// thus logging is done with DEBUG level.
|
||||
e.log.Debug(logs.EngineModeChangeIsInProgressIgnoringSetmodeRequest,
|
||||
zap.Stringer("shard_id", sid),
|
||||
zap.Uint32("error_count", errCount))
|
||||
}
|
||||
select {
|
||||
case e.setModeCh <- req:
|
||||
default:
|
||||
// For background workers we can have a lot of such errors,
|
||||
// thus logging is done with DEBUG level.
|
||||
e.log.Debug(logs.EngineModeChangeIsInProgressIgnoringSetmodeRequest,
|
||||
zap.Stringer("shard_id", sid),
|
||||
zap.Uint32("error_count", errCount))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"path/filepath"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
|
@ -229,6 +230,8 @@ func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, errCount uint
|
|||
sh := e.shards[id.String()]
|
||||
e.mtx.RUnlock()
|
||||
|
||||
require.Equal(t, errCount, sh.errorCount.Load())
|
||||
require.Equal(t, mode, sh.GetMode())
|
||||
require.Eventually(t, func() bool {
|
||||
return errCount == sh.errorCount.Load() &&
|
||||
mode == sh.GetMode()
|
||||
}, 10*time.Second, 10*time.Millisecond, "shard mode doesn't changed to expected state in 10 seconds")
|
||||
}
|
||||
|
|
|
@ -85,6 +85,7 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
|
|||
shPrm.SetRaw(prm.raw)
|
||||
|
||||
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
|
||||
shPrm.ShardLooksBad = sh.errorCount.Load() >= e.errorsThreshold
|
||||
res, err := sh.Head(ctx, shPrm)
|
||||
if err != nil {
|
||||
switch {
|
||||
|
|
|
@ -13,8 +13,9 @@ import (
|
|||
|
||||
// HeadPrm groups the parameters of Head operation.
|
||||
type HeadPrm struct {
|
||||
addr oid.Address
|
||||
raw bool
|
||||
addr oid.Address
|
||||
raw bool
|
||||
ShardLooksBad bool
|
||||
}
|
||||
|
||||
// HeadRes groups the resulting values of Head operation.
|
||||
|
@ -59,7 +60,8 @@ func (s *Shard) Head(ctx context.Context, prm HeadPrm) (HeadRes, error) {
|
|||
|
||||
var obj *objectSDK.Object
|
||||
var err error
|
||||
if s.GetMode().NoMetabase() {
|
||||
mode := s.GetMode()
|
||||
if mode.NoMetabase() || (mode.ReadOnly() && prm.ShardLooksBad) {
|
||||
var getPrm GetPrm
|
||||
getPrm.SetAddress(prm.addr)
|
||||
getPrm.SetIgnoreMeta(true)
|
||||
|
|
Loading…
Reference in a new issue