forked from TrueCloudLab/frostfs-node
[#1956] node: Lock shard's mode on its methods switch
Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
This commit is contained in:
parent
33d279a3f2
commit
eea2892109
13 changed files with 94 additions and 22 deletions
|
@ -67,6 +67,7 @@ Changelog for NeoFS Node
|
||||||
- Synchronizing a tree now longer reports an error for a single-node container (#2154)
|
- Synchronizing a tree now longer reports an error for a single-node container (#2154)
|
||||||
- Prevent leaking goroutines in the tree service (#2162)
|
- Prevent leaking goroutines in the tree service (#2162)
|
||||||
- Do not search for LOCK objects when delete container when session provided (#2152)
|
- Do not search for LOCK objects when delete container when session provided (#2152)
|
||||||
|
- Race conditions on shard's mode switch (#1956)
|
||||||
|
|
||||||
### Removed
|
### Removed
|
||||||
- `-g` option from `neofs-cli control ...` and `neofs-cli container create` commands (#2089)
|
- `-g` option from `neofs-cli control ...` and `neofs-cli container create` commands (#2089)
|
||||||
|
|
|
@ -28,10 +28,16 @@ func (p *DeletePrm) SetAddresses(addr ...oid.Address) {
|
||||||
// Delete removes data from the shard's writeCache, metaBase and
|
// Delete removes data from the shard's writeCache, metaBase and
|
||||||
// blobStor.
|
// blobStor.
|
||||||
func (s *Shard) Delete(prm DeletePrm) (DeleteRes, error) {
|
func (s *Shard) Delete(prm DeletePrm) (DeleteRes, error) {
|
||||||
m := s.GetMode()
|
s.m.RLock()
|
||||||
if m.ReadOnly() {
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
|
return s.delete(prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Shard) delete(prm DeletePrm) (DeleteRes, error) {
|
||||||
|
if s.info.Mode.ReadOnly() {
|
||||||
return DeleteRes{}, ErrReadOnlyMode
|
return DeleteRes{}, ErrReadOnlyMode
|
||||||
} else if m.NoMetabase() {
|
} else if s.info.Mode.NoMetabase() {
|
||||||
return DeleteRes{}, ErrDegradedMode
|
return DeleteRes{}, ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,7 +37,10 @@ func (s *Shard) Exists(prm ExistsPrm) (ExistsRes, error) {
|
||||||
var exists bool
|
var exists bool
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
if s.GetMode().NoMetabase() {
|
s.m.RLock()
|
||||||
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
|
if s.info.Mode.NoMetabase() {
|
||||||
var p common.ExistsPrm
|
var p common.ExistsPrm
|
||||||
p.Address = prm.addr
|
p.Address = prm.addr
|
||||||
|
|
||||||
|
|
|
@ -187,7 +187,10 @@ func (gc *gc) stop() {
|
||||||
// with GC-marked graves.
|
// with GC-marked graves.
|
||||||
// Does nothing if shard is in "read-only" mode.
|
// Does nothing if shard is in "read-only" mode.
|
||||||
func (s *Shard) removeGarbage() {
|
func (s *Shard) removeGarbage() {
|
||||||
if s.GetMode() != mode.ReadWrite {
|
s.m.RLock()
|
||||||
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
|
if s.info.Mode != mode.ReadWrite {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -221,7 +224,7 @@ func (s *Shard) removeGarbage() {
|
||||||
deletePrm.SetAddresses(buf...)
|
deletePrm.SetAddresses(buf...)
|
||||||
|
|
||||||
// delete accumulated objects
|
// delete accumulated objects
|
||||||
_, err = s.Delete(deletePrm)
|
_, err = s.delete(deletePrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Warn("could not delete the objects",
|
s.log.Warn("could not delete the objects",
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
|
@ -242,6 +245,13 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.m.RLock()
|
||||||
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
|
if s.info.Mode.NoMetabase() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
var inhumePrm meta.InhumePrm
|
var inhumePrm meta.InhumePrm
|
||||||
|
|
||||||
inhumePrm.SetAddresses(expired...)
|
inhumePrm.SetAddresses(expired...)
|
||||||
|
@ -284,17 +294,25 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
|
||||||
for {
|
for {
|
||||||
log.Debug("iterating tombstones")
|
log.Debug("iterating tombstones")
|
||||||
|
|
||||||
if s.GetMode().NoMetabase() {
|
s.m.RLock()
|
||||||
|
|
||||||
|
if s.info.Mode.NoMetabase() {
|
||||||
s.log.Debug("shard is in a degraded mode, skip collecting expired tombstones")
|
s.log.Debug("shard is in a degraded mode, skip collecting expired tombstones")
|
||||||
|
s.m.RUnlock()
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err := s.metaBase.IterateOverGraveyard(iterPrm)
|
err := s.metaBase.IterateOverGraveyard(iterPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("iterator over graveyard failed", zap.Error(err))
|
log.Error("iterator over graveyard failed", zap.Error(err))
|
||||||
|
s.m.RUnlock()
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.m.RUnlock()
|
||||||
|
|
||||||
tssLen := len(tss)
|
tssLen := len(tss)
|
||||||
if tssLen == 0 {
|
if tssLen == 0 {
|
||||||
break
|
break
|
||||||
|
@ -332,7 +350,10 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, typeCond func(object.Type) bool) ([]oid.Address, error) {
|
func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, typeCond func(object.Type) bool) ([]oid.Address, error) {
|
||||||
if s.GetMode().NoMetabase() {
|
s.m.RLock()
|
||||||
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
|
if s.info.Mode.NoMetabase() {
|
||||||
return nil, ErrDegradedMode
|
return nil, ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -62,6 +62,9 @@ func (r GetRes) HasMeta() bool {
|
||||||
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard.
|
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard.
|
||||||
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
|
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
|
||||||
func (s *Shard) Get(prm GetPrm) (GetRes, error) {
|
func (s *Shard) Get(prm GetPrm) (GetRes, error) {
|
||||||
|
s.m.RLock()
|
||||||
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
cb := func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object, error) {
|
cb := func(stor *blobstor.BlobStor, id []byte) (*objectSDK.Object, error) {
|
||||||
var getPrm common.GetPrm
|
var getPrm common.GetPrm
|
||||||
getPrm.Address = prm.addr
|
getPrm.Address = prm.addr
|
||||||
|
@ -79,7 +82,7 @@ func (s *Shard) Get(prm GetPrm) (GetRes, error) {
|
||||||
return c.Get(prm.addr)
|
return c.Get(prm.addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
skipMeta := prm.skipMeta || s.GetMode().NoMetabase()
|
skipMeta := prm.skipMeta || s.info.Mode.NoMetabase()
|
||||||
obj, hasMeta, err := s.fetchObjectData(prm.addr, skipMeta, cb, wc)
|
obj, hasMeta, err := s.fetchObjectData(prm.addr, skipMeta, cb, wc)
|
||||||
|
|
||||||
return GetRes{
|
return GetRes{
|
||||||
|
@ -114,7 +117,7 @@ func (s *Shard) fetchObjectData(addr oid.Address, skipMeta bool, cb storFetcher,
|
||||||
mPrm.SetAddress(addr)
|
mPrm.SetAddress(addr)
|
||||||
|
|
||||||
mRes, err := s.metaBase.Exists(mPrm)
|
mRes, err := s.metaBase.Exists(mPrm)
|
||||||
if err != nil && !s.GetMode().NoMetabase() {
|
if err != nil && !s.info.Mode.NoMetabase() {
|
||||||
return res, false, err
|
return res, false, err
|
||||||
}
|
}
|
||||||
exists = mRes.Exists()
|
exists = mRes.Exists()
|
||||||
|
|
|
@ -61,10 +61,13 @@ var ErrLockObjectRemoval = meta.ErrLockObjectRemoval
|
||||||
//
|
//
|
||||||
// Returns ErrReadOnlyMode error if shard is in "read-only" mode.
|
// Returns ErrReadOnlyMode error if shard is in "read-only" mode.
|
||||||
func (s *Shard) Inhume(prm InhumePrm) (InhumeRes, error) {
|
func (s *Shard) Inhume(prm InhumePrm) (InhumeRes, error) {
|
||||||
m := s.GetMode()
|
s.m.RLock()
|
||||||
if m.ReadOnly() {
|
|
||||||
|
if s.info.Mode.ReadOnly() {
|
||||||
|
s.m.RUnlock()
|
||||||
return InhumeRes{}, ErrReadOnlyMode
|
return InhumeRes{}, ErrReadOnlyMode
|
||||||
} else if m.NoMetabase() {
|
} else if s.info.Mode.NoMetabase() {
|
||||||
|
s.m.RUnlock()
|
||||||
return InhumeRes{}, ErrDegradedMode
|
return InhumeRes{}, ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -91,6 +94,7 @@ func (s *Shard) Inhume(prm InhumePrm) (InhumeRes, error) {
|
||||||
res, err := s.metaBase.Inhume(metaPrm)
|
res, err := s.metaBase.Inhume(metaPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, meta.ErrLockObjectRemoval) {
|
if errors.Is(err, meta.ErrLockObjectRemoval) {
|
||||||
|
s.m.RUnlock()
|
||||||
return InhumeRes{}, ErrLockObjectRemoval
|
return InhumeRes{}, ErrLockObjectRemoval
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -98,9 +102,13 @@ func (s *Shard) Inhume(prm InhumePrm) (InhumeRes, error) {
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
s.m.RUnlock()
|
||||||
|
|
||||||
return InhumeRes{}, fmt.Errorf("metabase inhume: %w", err)
|
return InhumeRes{}, fmt.Errorf("metabase inhume: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.m.RUnlock()
|
||||||
|
|
||||||
s.decObjectCounterBy(logical, res.AvailableInhumed())
|
s.decObjectCounterBy(logical, res.AvailableInhumed())
|
||||||
|
|
||||||
if deletedLockObjs := res.DeletedLockObjects(); len(deletedLockObjs) != 0 {
|
if deletedLockObjs := res.DeletedLockObjects(); len(deletedLockObjs) != 0 {
|
||||||
|
|
|
@ -64,7 +64,10 @@ func (r ListWithCursorRes) Cursor() *Cursor {
|
||||||
|
|
||||||
// List returns all objects physically stored in the Shard.
|
// List returns all objects physically stored in the Shard.
|
||||||
func (s *Shard) List() (res SelectRes, err error) {
|
func (s *Shard) List() (res SelectRes, err error) {
|
||||||
if s.GetMode().NoMetabase() {
|
s.m.RLock()
|
||||||
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
|
if s.info.Mode.NoMetabase() {
|
||||||
return SelectRes{}, ErrDegradedMode
|
return SelectRes{}, ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -15,7 +15,10 @@ import (
|
||||||
//
|
//
|
||||||
// Locked list should be unique. Panics if it is empty.
|
// Locked list should be unique. Panics if it is empty.
|
||||||
func (s *Shard) Lock(idCnr cid.ID, locker oid.ID, locked []oid.ID) error {
|
func (s *Shard) Lock(idCnr cid.ID, locker oid.ID, locked []oid.ID) error {
|
||||||
m := s.GetMode()
|
s.m.RLock()
|
||||||
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
|
m := s.info.Mode
|
||||||
if m.ReadOnly() {
|
if m.ReadOnly() {
|
||||||
return ErrReadOnlyMode
|
return ErrReadOnlyMode
|
||||||
} else if m.NoMetabase() {
|
} else if m.NoMetabase() {
|
||||||
|
|
|
@ -23,7 +23,10 @@ func (p *ToMoveItPrm) SetAddress(addr oid.Address) {
|
||||||
// ToMoveIt calls metabase.ToMoveIt method to mark object as relocatable to
|
// ToMoveIt calls metabase.ToMoveIt method to mark object as relocatable to
|
||||||
// another shard.
|
// another shard.
|
||||||
func (s *Shard) ToMoveIt(prm ToMoveItPrm) (ToMoveItRes, error) {
|
func (s *Shard) ToMoveIt(prm ToMoveItPrm) (ToMoveItRes, error) {
|
||||||
m := s.GetMode()
|
s.m.RLock()
|
||||||
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
|
m := s.info.Mode
|
||||||
if m.ReadOnly() {
|
if m.ReadOnly() {
|
||||||
return ToMoveItRes{}, ErrReadOnlyMode
|
return ToMoveItRes{}, ErrReadOnlyMode
|
||||||
} else if m.NoMetabase() {
|
} else if m.NoMetabase() {
|
||||||
|
|
|
@ -30,7 +30,10 @@ func (p *PutPrm) SetObject(obj *object.Object) {
|
||||||
//
|
//
|
||||||
// Returns ErrReadOnlyMode error if shard is in "read-only" mode.
|
// Returns ErrReadOnlyMode error if shard is in "read-only" mode.
|
||||||
func (s *Shard) Put(prm PutPrm) (PutRes, error) {
|
func (s *Shard) Put(prm PutPrm) (PutRes, error) {
|
||||||
m := s.GetMode()
|
s.m.RLock()
|
||||||
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
|
m := s.info.Mode
|
||||||
if m.ReadOnly() {
|
if m.ReadOnly() {
|
||||||
return PutRes{}, ErrReadOnlyMode
|
return PutRes{}, ErrReadOnlyMode
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,6 +67,9 @@ func (r RngRes) HasMeta() bool {
|
||||||
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard.
|
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard.
|
||||||
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
|
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
|
||||||
func (s *Shard) GetRange(prm RngPrm) (RngRes, error) {
|
func (s *Shard) GetRange(prm RngPrm) (RngRes, error) {
|
||||||
|
s.m.RLock()
|
||||||
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
cb := func(stor *blobstor.BlobStor, id []byte) (*object.Object, error) {
|
cb := func(stor *blobstor.BlobStor, id []byte) (*object.Object, error) {
|
||||||
var getRngPrm common.GetRangePrm
|
var getRngPrm common.GetRangePrm
|
||||||
getRngPrm.Address = prm.addr
|
getRngPrm.Address = prm.addr
|
||||||
|
@ -103,7 +106,7 @@ func (s *Shard) GetRange(prm RngPrm) (RngRes, error) {
|
||||||
return obj, nil
|
return obj, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
skipMeta := prm.skipMeta || s.GetMode().NoMetabase()
|
skipMeta := prm.skipMeta || s.info.Mode.NoMetabase()
|
||||||
obj, hasMeta, err := s.fetchObjectData(prm.addr, skipMeta, cb, wc)
|
obj, hasMeta, err := s.fetchObjectData(prm.addr, skipMeta, cb, wc)
|
||||||
|
|
||||||
return RngRes{
|
return RngRes{
|
||||||
|
|
|
@ -40,7 +40,10 @@ func (r SelectRes) AddressList() []oid.Address {
|
||||||
// Returns any error encountered that
|
// Returns any error encountered that
|
||||||
// did not allow to completely select the objects.
|
// did not allow to completely select the objects.
|
||||||
func (s *Shard) Select(prm SelectPrm) (SelectRes, error) {
|
func (s *Shard) Select(prm SelectPrm) (SelectRes, error) {
|
||||||
if s.GetMode().NoMetabase() {
|
s.m.RLock()
|
||||||
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
|
if s.info.Mode.NoMetabase() {
|
||||||
return SelectRes{}, ErrDegradedMode
|
return SelectRes{}, ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,7 +16,11 @@ func (s *Shard) TreeMove(d pilorama.CIDDescriptor, treeID string, m *pilorama.Mo
|
||||||
if s.pilorama == nil {
|
if s.pilorama == nil {
|
||||||
return nil, ErrPiloramaDisabled
|
return nil, ErrPiloramaDisabled
|
||||||
}
|
}
|
||||||
if s.GetMode().ReadOnly() {
|
|
||||||
|
s.m.RLock()
|
||||||
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
|
if s.info.Mode.ReadOnly() {
|
||||||
return nil, ErrReadOnlyMode
|
return nil, ErrReadOnlyMode
|
||||||
}
|
}
|
||||||
return s.pilorama.TreeMove(d, treeID, m)
|
return s.pilorama.TreeMove(d, treeID, m)
|
||||||
|
@ -27,7 +31,11 @@ func (s *Shard) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, attr stri
|
||||||
if s.pilorama == nil {
|
if s.pilorama == nil {
|
||||||
return nil, ErrPiloramaDisabled
|
return nil, ErrPiloramaDisabled
|
||||||
}
|
}
|
||||||
if s.GetMode().ReadOnly() {
|
|
||||||
|
s.m.RLock()
|
||||||
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
|
if s.info.Mode.ReadOnly() {
|
||||||
return nil, ErrReadOnlyMode
|
return nil, ErrReadOnlyMode
|
||||||
}
|
}
|
||||||
return s.pilorama.TreeAddByPath(d, treeID, attr, path, meta)
|
return s.pilorama.TreeAddByPath(d, treeID, attr, path, meta)
|
||||||
|
@ -38,7 +46,11 @@ func (s *Shard) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.M
|
||||||
if s.pilorama == nil {
|
if s.pilorama == nil {
|
||||||
return ErrPiloramaDisabled
|
return ErrPiloramaDisabled
|
||||||
}
|
}
|
||||||
if s.GetMode().ReadOnly() {
|
|
||||||
|
s.m.RLock()
|
||||||
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
|
if s.info.Mode.ReadOnly() {
|
||||||
return ErrReadOnlyMode
|
return ErrReadOnlyMode
|
||||||
}
|
}
|
||||||
return s.pilorama.TreeApply(d, treeID, m)
|
return s.pilorama.TreeApply(d, treeID, m)
|
||||||
|
|
Loading…
Reference in a new issue