writecache: Avoid manipulation with cache in DEGRADED mode #998

Merged
fyrchik merged 1 commit from acid-ant/frostfs-node:bugfix/959-init-fstree-degraded into master 2024-03-11 18:35:43 +00:00
6 changed files with 35 additions and 1 deletions
Showing only changes of commit f6f327b5dc - Show all commits

View file

@ -19,6 +19,7 @@ import (
//
// Returns an error of type apistatus.ObjectNotFound if object is missing in write-cache.
// Returns ErrNotInitialized if write-cache has not been initialized yet.
// Returns ErrDegraded if write-cache is in DEGRADED mode.
func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Delete",
trace.WithAttributes(
@ -40,6 +41,9 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
if c.readOnly() {
return ErrReadOnly
}
if c.noMetabase() {
return ErrDegraded
}
saddr := addr.EncodeToString()

View file

@ -155,7 +155,7 @@ func (c *cache) workerFlushBig(ctx context.Context) {
select {
case <-tick.C:
c.modeMtx.RLock()
if c.readOnly() {
if c.readOnly() || c.noMetabase() {
c.modeMtx.RUnlock()
break
}
@ -281,6 +281,9 @@ func (c *cache) Flush(ctx context.Context, ignoreErrors, seal bool) error {
c.modeMtx.Lock() // exclusive lock to not to conflict with background flush
defer c.modeMtx.Unlock()
if c.noMetabase() {
return ErrDegraded
}
if err := c.flush(ctx, ignoreErrors); err != nil {
return err

View file

@ -29,6 +29,14 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
))
defer span.End()
if !c.modeMtx.TryRLock() {
return nil, ErrNotInitialized
}
defer c.modeMtx.RUnlock()
if c.mode.NoMetabase() {
return nil, ErrDegraded
}
obj, err := c.getInternal(ctx, saddr, addr)
return obj, metaerr.Wrap(err)
}
@ -71,6 +79,14 @@ func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object,
))
defer span.End()
if !c.modeMtx.TryRLock() {
return nil, ErrNotInitialized
}
defer c.modeMtx.RUnlock()
if c.mode.NoMetabase() {
return nil, ErrDegraded
}
obj, err := c.getInternal(ctx, saddr, addr)
if err != nil {
return nil, metaerr.Wrap(err)

View file

@ -76,3 +76,9 @@ func (c *cache) setMode(ctx context.Context, m mode.Mode, ignoreErrors bool) err
func (c *cache) readOnly() bool {
return c.mode.ReadOnly()
}
// noMetabase returns true if c is operating without the metabase.
// `c.modeMtx` must be taken.
func (c *cache) noMetabase() bool {
return c.mode.NoMetabase()
}

View file

@ -41,6 +41,9 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
if c.readOnly() {
return common.PutRes{}, ErrReadOnly
}
if c.noMetabase() {
return common.PutRes{}, ErrDegraded
}
sz := uint64(len(prm.RawData))
if sz > c.maxObjectSize {

View file

@ -59,6 +59,8 @@ type Metabase interface {
var (
// ErrReadOnly is returned when Put/Write is performed in a read-only mode.
ErrReadOnly = logicerr.New("write-cache is in read-only mode")
// ErrDegraded is returned when writecache is in degraded mode.
ErrDegraded = logicerr.New("write-cache is in degraded mode")
// ErrNotInitialized is returned when write-cache is initializing.
ErrNotInitialized = logicerr.New("write-cache is not initialized yet")
// ErrBigObject is returned when object is too big to be placed in cache.