diff --git a/pkg/local_object_storage/shard/mode.go b/pkg/local_object_storage/shard/mode.go index 428850e1..c43e2b22 100644 --- a/pkg/local_object_storage/shard/mode.go +++ b/pkg/local_object_storage/shard/mode.go @@ -1,6 +1,10 @@ package shard -import "errors" +import ( + "errors" + + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache" +) // Mode represents enumeration of Shard work modes. type Mode uint32 @@ -40,6 +44,15 @@ func (s *Shard) SetMode(m Mode) error { s.m.Lock() defer s.m.Unlock() + if s.hasWriteCache() { + switch m { + case ModeReadOnly: + s.writeCache.SetMode(writecache.ModeReadOnly) + case ModeReadWrite: + s.writeCache.SetMode(writecache.ModeReadWrite) + } + } + s.info.Mode = m return nil diff --git a/pkg/local_object_storage/writecache/delete.go b/pkg/local_object_storage/writecache/delete.go index 36281372..75ab3735 100644 --- a/pkg/local_object_storage/writecache/delete.go +++ b/pkg/local_object_storage/writecache/delete.go @@ -12,6 +12,12 @@ import ( // Delete removes object from write-cache. func (c *cache) Delete(addr *objectSDK.Address) error { + c.modeMtx.RLock() + defer c.modeMtx.RUnlock() + if c.mode == ModeReadOnly { + return ErrReadOnly + } + saddr := addr.String() // Check memory cache. diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index c7d4cb05..bf42938e 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -63,6 +63,13 @@ func (c *cache) flush() { m = m[:0] sz := 0 + c.modeMtx.RLock() + if c.mode == ModeReadOnly { + c.modeMtx.RUnlock() + time.Sleep(time.Second) + continue + } + // We put objects in batches of fixed size to not interfere with main put cycle a lot. _ = c.db.View(func(tx *bbolt.Tx) error { b := tx.Bucket(defaultBucket) @@ -90,6 +97,7 @@ func (c *cache) flush() { select { case c.flushCh <- obj: case <-c.closeCh: + c.modeMtx.RUnlock() return } } @@ -98,6 +106,7 @@ func (c *cache) flush() { for i := range m { c.flushed.Add(m[i].addr, true) } + c.modeMtx.RUnlock() c.log.Debug("flushed items from write-cache", zap.Int("count", len(m)), @@ -116,8 +125,13 @@ func (c *cache) flushBigObjects() { for { select { case <-tick.C: - evictNum := 0 + c.modeMtx.RLock() + if c.mode == ModeReadOnly { + c.modeMtx.RUnlock() + break + } + evictNum := 0 _ = c.fsTree.Iterate(func(addr *objectSDK.Address, data []byte) error { sAddr := addr.String() @@ -150,6 +164,7 @@ func (c *cache) flushBigObjects() { // evict objects which were successfully written to BlobStor c.evictObjects(evictNum) + c.modeMtx.RUnlock() case <-c.closeCh: } } diff --git a/pkg/local_object_storage/writecache/mode.go b/pkg/local_object_storage/writecache/mode.go new file mode 100644 index 00000000..fce239f5 --- /dev/null +++ b/pkg/local_object_storage/writecache/mode.go @@ -0,0 +1,52 @@ +package writecache + +import ( + "errors" + "time" +) + +// Mode represents write-cache mode of operation. +type Mode uint32 + +const ( + // ModeReadWrite is a default mode allowing objects to be flushed. + ModeReadWrite Mode = iota + + // ModeReadOnly is a mode in which write-cache doesn't flush anything to a metabase. + ModeReadOnly +) + +// ErrReadOnly is returned when Put/Write is performed in a read-only mode. +var ErrReadOnly = errors.New("write-cache is in read-only mode") + +// SetMode sets write-cache mode of operation. +// When shard is put in read-only mode all objects in memory are flushed to disk +// and all background jobs are suspended. +func (c *cache) SetMode(m Mode) { + c.modeMtx.Lock() + defer c.modeMtx.Unlock() + if c.mode == m { + return + } + + c.mode = m + if m == ModeReadWrite { + return + } + + // Because modeMtx is taken no new objects will arrive an all other modifying + // operations are completed. + // 1. Persist objects already in memory on disk. + c.persistMemoryCache() + + // 2. Suspend producers to ensure there are channel send operations in fly. + // metaCh and directCh can be populated either during Put or in background memory persist thread. + // Former possibility is eliminated by taking `modeMtx` mutex and + // latter by explicit persist in the previous step. + // flushCh is populated by `flush` with `modeMtx` is also taken. + // Thus all producers are shutdown and we only need to wait until all channels are empty. + for len(c.metaCh) != 0 || len(c.directCh) != 0 || len(c.flushCh) != 0 { + c.log.Info("waiting for channels to flush") + time.Sleep(time.Second) + } +} diff --git a/pkg/local_object_storage/writecache/persist.go b/pkg/local_object_storage/writecache/persist.go index 491d293f..895cc674 100644 --- a/pkg/local_object_storage/writecache/persist.go +++ b/pkg/local_object_storage/writecache/persist.go @@ -19,39 +19,53 @@ func (c *cache) persistLoop() { for { select { case <-tick.C: - c.mtx.RLock() - m := c.mem - c.mtx.RUnlock() - - sort.Slice(m, func(i, j int) bool { return m[i].addr < m[j].addr }) - - start := time.Now() - c.persistSmallObjects(m) - c.log.Debug("persisted items to disk", - zap.Duration("took", time.Since(start)), - zap.Int("total", len(m))) - - for i := range m { - storagelog.Write(c.log, - storagelog.AddressField(m[i].addr), - storagelog.OpField("in-mem DELETE persist"), - ) + c.modeMtx.RLock() + if c.mode == ModeReadOnly { + c.modeMtx.RUnlock() + continue } - - c.mtx.Lock() - c.curMemSize = 0 - n := copy(c.mem, c.mem[len(m):]) - c.mem = c.mem[:n] - for i := range c.mem { - c.curMemSize += uint64(len(c.mem[i].data)) - } - c.mtx.Unlock() + c.persistMemoryCache() + c.modeMtx.RUnlock() case <-c.closeCh: return } } } +func (c *cache) persistMemoryCache() { + c.mtx.RLock() + m := c.mem + c.mtx.RUnlock() + + if len(m) == 0 { + return + } + + sort.Slice(m, func(i, j int) bool { return m[i].addr < m[j].addr }) + + start := time.Now() + c.persistSmallObjects(m) + c.log.Debug("persisted items to disk", + zap.Duration("took", time.Since(start)), + zap.Int("total", len(m))) + + for i := range m { + storagelog.Write(c.log, + storagelog.AddressField(m[i].addr), + storagelog.OpField("in-mem DELETE persist"), + ) + } + + c.mtx.Lock() + c.curMemSize = 0 + n := copy(c.mem, c.mem[len(m):]) + c.mem = c.mem[:n] + for i := range c.mem { + c.curMemSize += uint64(len(c.mem[i].data)) + } + c.mtx.Unlock() +} + // persistSmallObjects persists small objects to the write-cache database and // pushes the to the flush workers queue. func (c *cache) persistSmallObjects(objs []objectInfo) { diff --git a/pkg/local_object_storage/writecache/put.go b/pkg/local_object_storage/writecache/put.go index f2dc39e8..9634ca74 100644 --- a/pkg/local_object_storage/writecache/put.go +++ b/pkg/local_object_storage/writecache/put.go @@ -12,6 +12,12 @@ var ErrBigObject = errors.New("too big object") // Put puts object to write-cache. func (c *cache) Put(o *object.Object) error { + c.modeMtx.RLock() + defer c.modeMtx.RUnlock() + if c.mode == ModeReadOnly { + return ErrReadOnly + } + sz := uint64(o.ToV2().StableSize()) if sz > c.maxObjectSize { return ErrBigObject diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go index 2addf239..acd6578b 100644 --- a/pkg/local_object_storage/writecache/writecache.go +++ b/pkg/local_object_storage/writecache/writecache.go @@ -21,6 +21,7 @@ type Cache interface { Head(*objectSDK.Address) (*object.Object, error) Delete(*objectSDK.Address) error Put(*object.Object) error + SetMode(Mode) DumpInfo() Info Init() error @@ -35,6 +36,9 @@ type cache struct { mtx sync.RWMutex mem []objectInfo + mode Mode + modeMtx sync.RWMutex + // compressFlags maps address of a big object to boolean value indicating // whether object should be compressed. compressFlags map[string]struct{} @@ -83,6 +87,7 @@ func New(opts ...Option) Cache { metaCh: make(chan *object.Object), closeCh: make(chan struct{}), evictCh: make(chan []byte), + mode: ModeReadWrite, compressFlags: make(map[string]struct{}), options: options{