WIP: Morph: Add unit tests #2

Closed
dstepanov-yadro wants to merge 233 commits from TrueCloudLab/frostfs-node:master into object-3608-morph-unit-tests
2 changed files with 23 additions and 17 deletions
Showing only changes of commit 69df0d21c2 - Show all commits

View file

@ -6,6 +6,7 @@ import (
"sync/atomic" "sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
@ -45,6 +46,7 @@ type shardWrapper struct {
type setModeRequest struct { type setModeRequest struct {
sh *shard.Shard sh *shard.Shard
isMeta bool
errorCount uint32 errorCount uint32
} }
@ -70,7 +72,7 @@ func (e *StorageEngine) setModeLoop() {
if !ok { if !ok {
inProgress[sid] = struct{}{} inProgress[sid] = struct{}{}
go func() { go func() {
e.moveToDegraded(r.sh, r.errorCount) e.moveToDegraded(r.sh, r.errorCount, r.isMeta)
mtx.Lock() mtx.Lock()
delete(inProgress, sid) delete(inProgress, sid)
@ -82,7 +84,7 @@ func (e *StorageEngine) setModeLoop() {
} }
} }
func (e *StorageEngine) moveToDegraded(sh *shard.Shard, errCount uint32) { func (e *StorageEngine) moveToDegraded(sh *shard.Shard, errCount uint32, isMeta bool) {
sid := sh.ID() sid := sh.ID()
log := e.log.With( log := e.log.With(
zap.Stringer("shard_id", sid), zap.Stringer("shard_id", sid),
@ -91,21 +93,23 @@ func (e *StorageEngine) moveToDegraded(sh *shard.Shard, errCount uint32) {
e.mtx.RLock() e.mtx.RLock()
defer e.mtx.RUnlock() defer e.mtx.RUnlock()
err := sh.SetMode(mode.DegradedReadOnly) if isMeta {
if err != nil { err := sh.SetMode(mode.DegradedReadOnly)
if err == nil {
log.Info(logs.EngineShardIsMovedInDegradedModeDueToErrorThreshold)
return
}
log.Error(logs.EngineFailedToMoveShardInDegradedreadonlyModeMovingToReadonly, log.Error(logs.EngineFailedToMoveShardInDegradedreadonlyModeMovingToReadonly,
zap.Error(err)) zap.Error(err))
err = sh.SetMode(mode.ReadOnly)
if err != nil {
log.Error(logs.EngineFailedToMoveShardInReadonlyMode,
zap.Error(err))
} else {
log.Info(logs.EngineShardIsMovedInReadonlyModeDueToErrorThreshold)
}
} else {
log.Info(logs.EngineShardIsMovedInDegradedModeDueToErrorThreshold)
} }
err := sh.SetMode(mode.ReadOnly)
if err != nil {
log.Error(logs.EngineFailedToMoveShardInReadonlyMode, zap.Error(err))
return
}
log.Info(logs.EngineShardIsMovedInReadonlyModeDueToErrorThreshold)
} }
// reportShardErrorBackground increases shard error counter and logs an error. // reportShardErrorBackground increases shard error counter and logs an error.
@ -169,11 +173,13 @@ func (e *StorageEngine) reportShardErrorWithFlags(
return return
} }
isMeta := errors.As(err, new(metaerr.Error))
if block { if block {
e.moveToDegraded(sh, errCount) e.moveToDegraded(sh, errCount, isMeta)
} else { } else {
req := setModeRequest{ req := setModeRequest{
errorCount: errCount, errorCount: errCount,
isMeta: isMeta,
sh: sh, sh: sh,
} }

View file

@ -154,7 +154,7 @@ func TestErrorReporting(t *testing.T) {
for i := uint32(0); i < 2; i++ { for i := uint32(0); i < 2; i++ {
_, err = te.ng.Get(context.Background(), GetPrm{addr: object.AddressOf(obj)}) _, err = te.ng.Get(context.Background(), GetPrm{addr: object.AddressOf(obj)})
require.Error(t, err) require.Error(t, err)
checkShardState(t, te.ng, te.shards[0].id, errThreshold+i, mode.DegradedReadOnly) checkShardState(t, te.ng, te.shards[0].id, errThreshold+i, mode.ReadOnly)
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite) checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
} }
@ -219,7 +219,7 @@ func TestBlobstorFailback(t *testing.T) {
require.ErrorAs(t, err, &apistatus.ObjectOutOfRange{}) require.ErrorAs(t, err, &apistatus.ObjectOutOfRange{})
} }
checkShardState(t, te.ng, te.shards[0].id, 1, mode.DegradedReadOnly) checkShardState(t, te.ng, te.shards[0].id, 2, mode.ReadOnly)
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite) checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
} }