diff --git a/pkg/local_object_storage/shard/delete.go b/pkg/local_object_storage/shard/delete.go index 025b90380e..6ea7897b64 100644 --- a/pkg/local_object_storage/shard/delete.go +++ b/pkg/local_object_storage/shard/delete.go @@ -33,6 +33,10 @@ func (p *DeletePrm) WithAddresses(addr ...*objectSDK.Address) *DeletePrm { // Delete removes data from the shard's writeCache, metaBase and // blobStor. func (s *Shard) Delete(prm *DeletePrm) (*DeleteRes, error) { + if s.getMode() == ModeReadOnly { + return nil, ErrReadOnlyMode + } + ln := len(prm.addr) delSmallPrm := new(blobstor.DeleteSmallPrm) delBigPrm := new(blobstor.DeleteBigPrm) diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index 336198ac31..b29029fb65 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -171,7 +171,12 @@ func (gc *gc) stop() { // iterates over metabase graveyard and deletes objects // with GC-marked graves. +// Does nothing if shard is in "read-only" mode. func (s *Shard) removeGarbage() { + if s.getMode() == ModeReadOnly { + return + } + buf := make([]*object.Address, 0, s.rmBatchSize) // iterate over metabase graveyard and accumulate diff --git a/pkg/local_object_storage/shard/inhume.go b/pkg/local_object_storage/shard/inhume.go index 099de855f9..4a17831dc0 100644 --- a/pkg/local_object_storage/shard/inhume.go +++ b/pkg/local_object_storage/shard/inhume.go @@ -43,7 +43,13 @@ func (p *InhumePrm) MarkAsGarbage(addr ...*objectSDK.Address) *InhumePrm { // Inhume calls metabase. Inhume method to mark object as removed. It won't be // removed physically from blobStor and metabase until `Delete` operation. +// +// Returns ErrReadOnlyMode error if shard is in "read-only" mode. func (s *Shard) Inhume(prm *InhumePrm) (*InhumeRes, error) { + if s.getMode() == ModeReadOnly { + return nil, ErrReadOnlyMode + } + if s.hasWriteCache() { for i := range prm.target { _ = s.writeCache.Delete(prm.target[i]) diff --git a/pkg/local_object_storage/shard/move.go b/pkg/local_object_storage/shard/move.go index d454a31d5f..f9b963c5c5 100644 --- a/pkg/local_object_storage/shard/move.go +++ b/pkg/local_object_storage/shard/move.go @@ -27,6 +27,10 @@ func (p *ToMoveItPrm) WithAddress(addr *objectSDK.Address) *ToMoveItPrm { // ToMoveIt calls metabase.ToMoveIt method to mark object as relocatable to // another shard. func (s *Shard) ToMoveIt(prm *ToMoveItPrm) (*ToMoveItRes, error) { + if s.getMode() == ModeReadOnly { + return nil, ErrReadOnlyMode + } + err := meta.ToMoveIt(s.metaBase, prm.addr) if err != nil { s.log.Debug("could not mark object for shard relocation in metabase", diff --git a/pkg/local_object_storage/shard/put.go b/pkg/local_object_storage/shard/put.go index 2ee802f1f8..2a17eae005 100644 --- a/pkg/local_object_storage/shard/put.go +++ b/pkg/local_object_storage/shard/put.go @@ -30,7 +30,13 @@ func (p *PutPrm) WithObject(obj *object.Object) *PutPrm { // // Returns any error encountered that // did not allow to completely save the object. +// +// Returns ErrReadOnlyMode error if shard is in "read-only" mode. func (s *Shard) Put(prm *PutPrm) (*PutRes, error) { + if s.getMode() == ModeReadOnly { + return nil, ErrReadOnlyMode + } + putPrm := new(blobstor.PutPrm) // form Put parameters putPrm.SetObject(prm.obj) diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index b585bc40a0..32f4affcc4 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -20,8 +20,6 @@ type Shard struct { gc *gc - mode *atomic.Uint32 - writeCache writecache.Cache blobStor *blobstor.BlobStor @@ -36,6 +34,8 @@ type Option func(*cfg) type ExpiredObjectsCallback func(context.Context, []*object.Address) type cfg struct { + mode *atomic.Uint32 + refillMetabase bool rmBatchSize int @@ -59,6 +59,7 @@ type cfg struct { func defaultCfg() *cfg { return &cfg{ + mode: atomic.NewUint32(uint32(ModeReadWrite)), rmBatchSize: 100, log: zap.L(), gcCfg: defaultGCCfg(), @@ -86,7 +87,6 @@ func New(opts ...Option) *Shard { s := &Shard{ cfg: c, - mode: atomic.NewUint32(0), // TODO: init with particular mode blobStor: bs, metaBase: mb, writeCache: writeCache, @@ -197,6 +197,15 @@ func WithRefillMetabase(v bool) Option { } } +// WithMode returns option to set shard's mode. Mode must be one of the predefined: +// - ModeReadWrite; +// - ModeReadOnly. +func WithMode(v Mode) Option { + return func(c *cfg) { + c.mode.Store(uint32(v)) + } +} + func (s *Shard) fillInfo() { s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo() s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()