[#1057] shard: Do not allow memory change operations in "read-only"

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
Pavel Karpy 2021-12-27 14:04:07 +03:00 committed by Pavel Karpy
parent 4f756bf121
commit 93bd6be743
6 changed files with 37 additions and 3 deletions

View file

@ -33,6 +33,10 @@ func (p *DeletePrm) WithAddresses(addr ...*objectSDK.Address) *DeletePrm {
// Delete removes data from the shard's writeCache, metaBase and // Delete removes data from the shard's writeCache, metaBase and
// blobStor. // blobStor.
func (s *Shard) Delete(prm *DeletePrm) (*DeleteRes, error) { func (s *Shard) Delete(prm *DeletePrm) (*DeleteRes, error) {
if s.getMode() == ModeReadOnly {
return nil, ErrReadOnlyMode
}
ln := len(prm.addr) ln := len(prm.addr)
delSmallPrm := new(blobstor.DeleteSmallPrm) delSmallPrm := new(blobstor.DeleteSmallPrm)
delBigPrm := new(blobstor.DeleteBigPrm) delBigPrm := new(blobstor.DeleteBigPrm)

View file

@ -171,7 +171,12 @@ func (gc *gc) stop() {
// iterates over metabase graveyard and deletes objects // iterates over metabase graveyard and deletes objects
// with GC-marked graves. // with GC-marked graves.
// Does nothing if shard is in "read-only" mode.
func (s *Shard) removeGarbage() { func (s *Shard) removeGarbage() {
if s.getMode() == ModeReadOnly {
return
}
buf := make([]*object.Address, 0, s.rmBatchSize) buf := make([]*object.Address, 0, s.rmBatchSize)
// iterate over metabase graveyard and accumulate // iterate over metabase graveyard and accumulate

View file

@ -43,7 +43,13 @@ func (p *InhumePrm) MarkAsGarbage(addr ...*objectSDK.Address) *InhumePrm {
// Inhume calls metabase. Inhume method to mark object as removed. It won't be // Inhume calls metabase. Inhume method to mark object as removed. It won't be
// removed physically from blobStor and metabase until `Delete` operation. // removed physically from blobStor and metabase until `Delete` operation.
//
// Returns ErrReadOnlyMode error if shard is in "read-only" mode.
func (s *Shard) Inhume(prm *InhumePrm) (*InhumeRes, error) { func (s *Shard) Inhume(prm *InhumePrm) (*InhumeRes, error) {
if s.getMode() == ModeReadOnly {
return nil, ErrReadOnlyMode
}
if s.hasWriteCache() { if s.hasWriteCache() {
for i := range prm.target { for i := range prm.target {
_ = s.writeCache.Delete(prm.target[i]) _ = s.writeCache.Delete(prm.target[i])

View file

@ -27,6 +27,10 @@ func (p *ToMoveItPrm) WithAddress(addr *objectSDK.Address) *ToMoveItPrm {
// ToMoveIt calls metabase.ToMoveIt method to mark object as relocatable to // ToMoveIt calls metabase.ToMoveIt method to mark object as relocatable to
// another shard. // another shard.
func (s *Shard) ToMoveIt(prm *ToMoveItPrm) (*ToMoveItRes, error) { func (s *Shard) ToMoveIt(prm *ToMoveItPrm) (*ToMoveItRes, error) {
if s.getMode() == ModeReadOnly {
return nil, ErrReadOnlyMode
}
err := meta.ToMoveIt(s.metaBase, prm.addr) err := meta.ToMoveIt(s.metaBase, prm.addr)
if err != nil { if err != nil {
s.log.Debug("could not mark object for shard relocation in metabase", s.log.Debug("could not mark object for shard relocation in metabase",

View file

@ -30,7 +30,13 @@ func (p *PutPrm) WithObject(obj *object.Object) *PutPrm {
// //
// Returns any error encountered that // Returns any error encountered that
// did not allow to completely save the object. // did not allow to completely save the object.
//
// Returns ErrReadOnlyMode error if shard is in "read-only" mode.
func (s *Shard) Put(prm *PutPrm) (*PutRes, error) { func (s *Shard) Put(prm *PutPrm) (*PutRes, error) {
if s.getMode() == ModeReadOnly {
return nil, ErrReadOnlyMode
}
putPrm := new(blobstor.PutPrm) // form Put parameters putPrm := new(blobstor.PutPrm) // form Put parameters
putPrm.SetObject(prm.obj) putPrm.SetObject(prm.obj)

View file

@ -20,8 +20,6 @@ type Shard struct {
gc *gc gc *gc
mode *atomic.Uint32
writeCache writecache.Cache writeCache writecache.Cache
blobStor *blobstor.BlobStor blobStor *blobstor.BlobStor
@ -36,6 +34,8 @@ type Option func(*cfg)
type ExpiredObjectsCallback func(context.Context, []*object.Address) type ExpiredObjectsCallback func(context.Context, []*object.Address)
type cfg struct { type cfg struct {
mode *atomic.Uint32
refillMetabase bool refillMetabase bool
rmBatchSize int rmBatchSize int
@ -59,6 +59,7 @@ type cfg struct {
func defaultCfg() *cfg { func defaultCfg() *cfg {
return &cfg{ return &cfg{
mode: atomic.NewUint32(uint32(ModeReadWrite)),
rmBatchSize: 100, rmBatchSize: 100,
log: zap.L(), log: zap.L(),
gcCfg: defaultGCCfg(), gcCfg: defaultGCCfg(),
@ -86,7 +87,6 @@ func New(opts ...Option) *Shard {
s := &Shard{ s := &Shard{
cfg: c, cfg: c,
mode: atomic.NewUint32(0), // TODO: init with particular mode
blobStor: bs, blobStor: bs,
metaBase: mb, metaBase: mb,
writeCache: writeCache, writeCache: writeCache,
@ -197,6 +197,15 @@ func WithRefillMetabase(v bool) Option {
} }
} }
// WithMode returns option to set shard's mode. Mode must be one of the predefined:
// - ModeReadWrite;
// - ModeReadOnly.
func WithMode(v Mode) Option {
return func(c *cfg) {
c.mode.Store(uint32(v))
}
}
func (s *Shard) fillInfo() { func (s *Shard) fillInfo() {
s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo() s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo()
s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo() s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()