WIP: Morph: Add unit tests #2

Closed
dstepanov-yadro wants to merge 233 commits from TrueCloudLab/frostfs-node:master into object-3608-morph-unit-tests
9 changed files with 42 additions and 5 deletions
Showing only changes of commit 01a0c97760 - Show all commits

View file

@ -492,4 +492,5 @@ const (
ShardGCFailedToGetExpiredWithLinked = "failed to get expired objects with linked"
ShardDeleteCantDeleteFromWriteCache = "can't delete object from write cache"
FrostFSNodeNodeIsUnderMaintenanceSkipInitialBootstrap = "the node is under maintenance, skip initial bootstrap"
EngineCouldNotChangeShardModeToDisabled = "could not change shard mode to disabled"
)

View file

@ -203,7 +203,14 @@ func (e *StorageEngine) removeShards(ids ...string) {
e.mtx.Unlock()
for _, sh := range ss {
err := sh.Close()
err := sh.SetMode(mode.Disabled)
if err != nil {
e.log.Error(logs.EngineCouldNotChangeShardModeToDisabled,
zap.Stringer("id", sh.ID()),
zap.Error(err),
)
}
err = sh.Close()
if err != nil {
e.log.Error(logs.EngineCouldNotCloseRemovedShard,
zap.Stringer("id", sh.ID()),

View file

@ -4,9 +4,12 @@ import (
"errors"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
)
var ErrShardDisabled = logicerr.New("shard disabled")
// IsErrNotFound checks if error returned by Shard Get/Head/GetRange method
// corresponds to missing object.
func IsErrNotFound(err error) bool {

View file

@ -38,6 +38,7 @@ func (p ExistsRes) Exists() bool {
//
// Returns an error of type apistatus.ObjectAlreadyRemoved if object has been marked as removed.
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
// Returns the ErrShardDisabled if the shard is disabled.
func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Exists",
trace.WithAttributes(
@ -52,7 +53,9 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
if s.info.Mode.Disabled() {
return ExistsRes{}, ErrShardDisabled
} else if s.info.Mode.NoMetabase() {
var p common.ExistsPrm
p.Address = prm.addr

View file

@ -66,6 +66,7 @@ func (r GetRes) HasMeta() bool {
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in shard.
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard.
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
// Returns the ErrShardDisabled if the shard is disabled.
func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Get",
trace.WithAttributes(
@ -78,6 +79,10 @@ func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.Disabled() {
return GetRes{}, ErrShardDisabled
}
cb := func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object, error) {
var getPrm common.GetPrm
getPrm.Address = prm.addr

View file

@ -56,11 +56,13 @@ func (s *Shard) setMode(m mode.Mode) error {
}
}
if !m.Disabled() {
for i := range components {
if err := components[i].SetMode(m); err != nil {
return err
}
}
}
s.info.Mode = m
if s.metricsWriter != nil {

View file

@ -57,3 +57,7 @@ func (m Mode) NoMetabase() bool {
func (m Mode) ReadOnly() bool {
return m&ReadOnly != 0
}
func (m Mode) Disabled() bool {
return m == Disabled
}

View file

@ -72,6 +72,7 @@ func (r RngRes) HasMeta() bool {
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing.
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard.
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
// Returns the ErrShardDisabled if the shard is disabled.
func (s *Shard) GetRange(ctx context.Context, prm RngPrm) (RngRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.GetRange",
trace.WithAttributes(
@ -86,6 +87,10 @@ func (s *Shard) GetRange(ctx context.Context, prm RngPrm) (RngRes, error) {
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.Disabled() {
return RngRes{}, ErrShardDisabled
}
cb := func(stor *blobstor.BlobStor, id []byte) (*object.Object, error) {
var getRngPrm common.GetRangePrm
getRngPrm.Address = prm.addr

View file

@ -265,6 +265,13 @@ func (s *Shard) TreeHeight(ctx context.Context, cid cidSDK.ID, treeID string) (u
)
defer span.End()
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return 0, ErrDegradedMode
}
if s.pilorama == nil {
return 0, ErrPiloramaDisabled
}