[#1085] writecache: add read-only mode

In read-only mode modifying operations are immediately returned with
error and all background operations are suspended.

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
remotes/fyrchik/fix-config-wc
Evgenii Stratonikov 2022-01-18 15:47:16 +03:00 committed by LeL
parent 9f963e001b
commit ad01aaf8bf
7 changed files with 139 additions and 28 deletions

View File

@ -1,6 +1,10 @@
package shard
import "errors"
import (
"errors"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache"
)
// Mode represents enumeration of Shard work modes.
type Mode uint32
@ -40,6 +44,15 @@ func (s *Shard) SetMode(m Mode) error {
s.m.Lock()
defer s.m.Unlock()
if s.hasWriteCache() {
switch m {
case ModeReadOnly:
s.writeCache.SetMode(writecache.ModeReadOnly)
case ModeReadWrite:
s.writeCache.SetMode(writecache.ModeReadWrite)
}
}
s.info.Mode = m
return nil

View File

@ -12,6 +12,12 @@ import (
// Delete removes object from write-cache.
func (c *cache) Delete(addr *objectSDK.Address) error {
c.modeMtx.RLock()
defer c.modeMtx.RUnlock()
if c.mode == ModeReadOnly {
return ErrReadOnly
}
saddr := addr.String()
// Check memory cache.

View File

@ -63,6 +63,13 @@ func (c *cache) flush() {
m = m[:0]
sz := 0
c.modeMtx.RLock()
if c.mode == ModeReadOnly {
c.modeMtx.RUnlock()
time.Sleep(time.Second)
continue
}
// We put objects in batches of fixed size to not interfere with main put cycle a lot.
_ = c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
@ -90,6 +97,7 @@ func (c *cache) flush() {
select {
case c.flushCh <- obj:
case <-c.closeCh:
c.modeMtx.RUnlock()
return
}
}
@ -98,6 +106,7 @@ func (c *cache) flush() {
for i := range m {
c.flushed.Add(m[i].addr, true)
}
c.modeMtx.RUnlock()
c.log.Debug("flushed items from write-cache",
zap.Int("count", len(m)),
@ -116,8 +125,13 @@ func (c *cache) flushBigObjects() {
for {
select {
case <-tick.C:
evictNum := 0
c.modeMtx.RLock()
if c.mode == ModeReadOnly {
c.modeMtx.RUnlock()
break
}
evictNum := 0
_ = c.fsTree.Iterate(func(addr *objectSDK.Address, data []byte) error {
sAddr := addr.String()
@ -150,6 +164,7 @@ func (c *cache) flushBigObjects() {
// evict objects which were successfully written to BlobStor
c.evictObjects(evictNum)
c.modeMtx.RUnlock()
case <-c.closeCh:
}
}

View File

@ -0,0 +1,52 @@
package writecache
import (
"errors"
"time"
)
// Mode represents write-cache mode of operation.
type Mode uint32
const (
// ModeReadWrite is a default mode allowing objects to be flushed.
ModeReadWrite Mode = iota
// ModeReadOnly is a mode in which write-cache doesn't flush anything to a metabase.
ModeReadOnly
)
// ErrReadOnly is returned when Put/Write is performed in a read-only mode.
var ErrReadOnly = errors.New("write-cache is in read-only mode")
// SetMode sets write-cache mode of operation.
// When shard is put in read-only mode all objects in memory are flushed to disk
// and all background jobs are suspended.
func (c *cache) SetMode(m Mode) {
c.modeMtx.Lock()
defer c.modeMtx.Unlock()
if c.mode == m {
return
}
c.mode = m
if m == ModeReadWrite {
return
}
// Because modeMtx is taken no new objects will arrive an all other modifying
// operations are completed.
// 1. Persist objects already in memory on disk.
c.persistMemoryCache()
// 2. Suspend producers to ensure there are channel send operations in fly.
// metaCh and directCh can be populated either during Put or in background memory persist thread.
// Former possibility is eliminated by taking `modeMtx` mutex and
// latter by explicit persist in the previous step.
// flushCh is populated by `flush` with `modeMtx` is also taken.
// Thus all producers are shutdown and we only need to wait until all channels are empty.
for len(c.metaCh) != 0 || len(c.directCh) != 0 || len(c.flushCh) != 0 {
c.log.Info("waiting for channels to flush")
time.Sleep(time.Second)
}
}

View File

@ -19,39 +19,53 @@ func (c *cache) persistLoop() {
for {
select {
case <-tick.C:
c.mtx.RLock()
m := c.mem
c.mtx.RUnlock()
sort.Slice(m, func(i, j int) bool { return m[i].addr < m[j].addr })
start := time.Now()
c.persistSmallObjects(m)
c.log.Debug("persisted items to disk",
zap.Duration("took", time.Since(start)),
zap.Int("total", len(m)))
for i := range m {
storagelog.Write(c.log,
storagelog.AddressField(m[i].addr),
storagelog.OpField("in-mem DELETE persist"),
)
c.modeMtx.RLock()
if c.mode == ModeReadOnly {
c.modeMtx.RUnlock()
continue
}
c.mtx.Lock()
c.curMemSize = 0
n := copy(c.mem, c.mem[len(m):])
c.mem = c.mem[:n]
for i := range c.mem {
c.curMemSize += uint64(len(c.mem[i].data))
}
c.mtx.Unlock()
c.persistMemoryCache()
c.modeMtx.RUnlock()
case <-c.closeCh:
return
}
}
}
func (c *cache) persistMemoryCache() {
c.mtx.RLock()
m := c.mem
c.mtx.RUnlock()
if len(m) == 0 {
return
}
sort.Slice(m, func(i, j int) bool { return m[i].addr < m[j].addr })
start := time.Now()
c.persistSmallObjects(m)
c.log.Debug("persisted items to disk",
zap.Duration("took", time.Since(start)),
zap.Int("total", len(m)))
for i := range m {
storagelog.Write(c.log,
storagelog.AddressField(m[i].addr),
storagelog.OpField("in-mem DELETE persist"),
)
}
c.mtx.Lock()
c.curMemSize = 0
n := copy(c.mem, c.mem[len(m):])
c.mem = c.mem[:n]
for i := range c.mem {
c.curMemSize += uint64(len(c.mem[i].data))
}
c.mtx.Unlock()
}
// persistSmallObjects persists small objects to the write-cache database and
// pushes the to the flush workers queue.
func (c *cache) persistSmallObjects(objs []objectInfo) {

View File

@ -12,6 +12,12 @@ var ErrBigObject = errors.New("too big object")
// Put puts object to write-cache.
func (c *cache) Put(o *object.Object) error {
c.modeMtx.RLock()
defer c.modeMtx.RUnlock()
if c.mode == ModeReadOnly {
return ErrReadOnly
}
sz := uint64(o.ToV2().StableSize())
if sz > c.maxObjectSize {
return ErrBigObject

View File

@ -21,6 +21,7 @@ type Cache interface {
Head(*objectSDK.Address) (*object.Object, error)
Delete(*objectSDK.Address) error
Put(*object.Object) error
SetMode(Mode)
DumpInfo() Info
Init() error
@ -35,6 +36,9 @@ type cache struct {
mtx sync.RWMutex
mem []objectInfo
mode Mode
modeMtx sync.RWMutex
// compressFlags maps address of a big object to boolean value indicating
// whether object should be compressed.
compressFlags map[string]struct{}
@ -83,6 +87,7 @@ func New(opts ...Option) Cache {
metaCh: make(chan *object.Object),
closeCh: make(chan struct{}),
evictCh: make(chan []byte),
mode: ModeReadWrite,
compressFlags: make(map[string]struct{}),
options: options{