engine: Set Disabled mode to deleted shard #452

Merged
fyrchik merged 1 commit from dstepanov-yadro/frostfs-node:fix/read_after_close_support into support/v0.36 2023-06-20 07:43:07 +00:00
10 changed files with 46 additions and 7 deletions

View file

@ -4,7 +4,7 @@ import "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shar
// DumpShard dumps objects from the shard with provided identifier.
//
// Returns an error if shard is not read-only.
// Returns an error if shard is not read-only or disabled.
func (e *StorageEngine) DumpShard(id *shard.ID, prm shard.DumpPrm) error {
e.mtx.RLock()
defer e.mtx.RUnlock()

View file

@ -174,7 +174,14 @@ func (e *StorageEngine) removeShards(ids ...string) {
e.mtx.Unlock()
for _, sh := range ss {
err := sh.Close()
err := sh.SetMode(mode.Disabled)

Disabled mode sets only when shards are deleted fron engine

Disabled mode sets only when shards are deleted fron engine
if err != nil {
e.log.Error("could not change shard mode to disabled",
zap.Stringer("id", sh.ID()),
zap.Error(err),
)
}
err = sh.Close()
if err != nil {
e.log.Error("could not close removed shard",
zap.Stringer("id", sh.ID()),

View file

@ -55,7 +55,9 @@ func (s *Shard) Dump(prm DumpPrm) (DumpRes, error) {
s.m.RLock()
defer s.m.RUnlock()
if !s.info.Mode.ReadOnly() {
if s.info.Mode.Disabled() {
return DumpRes{}, ErrShardDisabled
} else if !s.info.Mode.ReadOnly() {
return DumpRes{}, ErrMustBeReadOnly
}

View file

@ -4,9 +4,12 @@ import (
"errors"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
)
var ErrShardDisabled = logicerr.New("shard disabled")

Error type is logic to not to count this kind of errors with error counter

Error type is logic to not to count this kind of errors with error counter
// IsErrNotFound checks if error returned by Shard Get/Head/GetRange method
// corresponds to missing object.
func IsErrNotFound(err error) bool {

View file

@ -35,6 +35,7 @@ func (p ExistsRes) Exists() bool {
//
// Returns an error of type apistatus.ObjectAlreadyRemoved if object has been marked as removed.
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
// Returns the ErrShardDisabled if the shard is disabled.
func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
var exists bool
var err error
@ -42,7 +43,9 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
if s.info.Mode.Disabled() {
return ExistsRes{}, ErrShardDisabled
} else if s.info.Mode.NoMetabase() {
var p common.ExistsPrm
p.Address = prm.addr

View file

@ -65,6 +65,7 @@ func (r GetRes) HasMeta() bool {
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in shard.
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard.
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
// Returns the ErrShardDisabled if the shard is disabled.
func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Get",
trace.WithAttributes(
@ -77,6 +78,10 @@ func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.Disabled() {
return GetRes{}, ErrShardDisabled
}
cb := func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object, error) {
var getPrm common.GetPrm
getPrm.Address = prm.addr

View file

@ -55,9 +55,11 @@ func (s *Shard) setMode(m mode.Mode) error {
}
}
for i := range components {
if err := components[i].SetMode(m); err != nil {
return err
if !m.Disabled() {
for i := range components {
if err := components[i].SetMode(m); err != nil {
return err
}
}
}

View file

@ -57,3 +57,7 @@ func (m Mode) NoMetabase() bool {
func (m Mode) ReadOnly() bool {
return m&ReadOnly != 0
}
func (m Mode) Disabled() bool {
return m == Disabled
}

View file

@ -72,6 +72,7 @@ func (r RngRes) HasMeta() bool {
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing.
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard.
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
// Returns the ErrShardDisabled if the shard is disabled.
func (s *Shard) GetRange(ctx context.Context, prm RngPrm) (RngRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.GetRange",
trace.WithAttributes(
@ -86,6 +87,10 @@ func (s *Shard) GetRange(ctx context.Context, prm RngPrm) (RngRes, error) {
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.Disabled() {
return RngRes{}, ErrShardDisabled
}
cb := func(stor *blobstor.BlobStor, id []byte) (*object.Object, error) {
var getRngPrm common.GetRangePrm
getRngPrm.Address = prm.addr

View file

@ -159,6 +159,14 @@ func (s *Shard) TreeHeight(cid cidSDK.ID, treeID string) (uint64, error) {
if s.pilorama == nil {
return 0, ErrPiloramaDisabled
}
s.m.RLock()

I think it must be mode check too.

I think it must be mode check too.
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return 0, ErrDegradedMode
}
return s.pilorama.TreeHeight(cid, treeID)
}