From 8b3b16fe6244ce5f9825def985d9a0a66f5ed224 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Fri, 30 Sep 2022 13:41:37 +0300 Subject: [PATCH] [#1825] writecache: Flush cache when moving to the DEGRADED mode Degraded mode allows us to operate without an SSD, thus writecache should be unavailable in this mode. Signed-off-by: Evgenii Stratonikov --- CHANGELOG.md | 1 + pkg/local_object_storage/shard/put.go | 5 +-- pkg/local_object_storage/writecache/flush.go | 8 +++-- .../writecache/flush_test.go | 34 ++++++++++++++++--- pkg/local_object_storage/writecache/mode.go | 13 +++++-- 5 files changed, 50 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ac3b7c6e..71ee7500e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ Changelog for NeoFS Node ### Changed - Allow to evacuate shard data with `EvacuateShard` control RPC (#1800) +- Flush write-cache when moving shard to DEGRADED mode (#1825) ### Fixed - Description of command `netmap nodeinfo` (#1821) diff --git a/pkg/local_object_storage/shard/put.go b/pkg/local_object_storage/shard/put.go index 585972e7e..2148e973a 100644 --- a/pkg/local_object_storage/shard/put.go +++ b/pkg/local_object_storage/shard/put.go @@ -49,10 +49,11 @@ func (s *Shard) Put(prm PutPrm) (PutRes, error) { // exist check are not performed there, these checks should be executed // ahead of `Put` by storage engine - if s.hasWriteCache() { + tryCache := s.hasWriteCache() && !m.NoMetabase() + if tryCache { res, err = s.writeCache.Put(putPrm) } - if err != nil || !s.hasWriteCache() { + if err != nil || !tryCache { if err != nil { s.log.Debug("can't put object to the write-cache, trying blobstor", zap.String("err", err.Error())) diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index ef7cf8c5b..5ac7c67ca 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -50,7 +50,7 @@ func (c *cache) runFlushLoop() { for { select { case <-tt.C: - c.flush() + c.flushDB() tt.Reset(defaultFlushInterval) case <-c.closeCh: return @@ -59,7 +59,7 @@ func (c *cache) runFlushLoop() { }() } -func (c *cache) flush() { +func (c *cache) flushDB() { lastKey := []byte{} var m []objectInfo for { @@ -241,6 +241,10 @@ func (c *cache) Flush(ignoreErrors bool) error { return errMustBeReadOnly } + return c.flush(ignoreErrors) +} + +func (c *cache) flush(ignoreErrors bool) error { var prm common.IteratePrm prm.IgnoreErrors = ignoreErrors prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error { diff --git a/pkg/local_object_storage/writecache/flush_test.go b/pkg/local_object_storage/writecache/flush_test.go index 2044b429e..b5dbd48ef 100644 --- a/pkg/local_object_storage/writecache/flush_test.go +++ b/pkg/local_object_storage/writecache/flush_test.go @@ -109,10 +109,6 @@ func TestFlush(t *testing.T) { wc, bs, mb := newCache(t) objects := putObjects(t, wc) - t.Run("must be read-only", func(t *testing.T) { - require.ErrorIs(t, wc.Flush(false), errMustBeReadOnly) - }) - require.NoError(t, wc.SetMode(mode.ReadOnly)) require.NoError(t, bs.SetMode(mode.ReadWrite)) require.NoError(t, mb.SetMode(mode.ReadWrite)) @@ -135,6 +131,36 @@ func TestFlush(t *testing.T) { check(t, mb, bs, objects[2:]) }) + t.Run("flush on moving to degraded mode", func(t *testing.T) { + wc, bs, mb := newCache(t) + objects := putObjects(t, wc) + + // Blobstor is read-only, so we expect en error from `flush` here. + require.Error(t, wc.SetMode(mode.Degraded)) + + // First move to read-only mode to close background workers. + require.NoError(t, wc.SetMode(mode.ReadOnly)) + require.NoError(t, bs.SetMode(mode.ReadWrite)) + require.NoError(t, mb.SetMode(mode.ReadWrite)) + + wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true) + wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false) + + require.NoError(t, wc.SetMode(mode.Degraded)) + + for i := 0; i < 2; i++ { + var mPrm meta.GetPrm + mPrm.SetAddress(objects[i].addr) + _, err := mb.Get(mPrm) + require.Error(t, err) + + _, err = bs.Get(common.GetPrm{Address: objects[i].addr}) + require.Error(t, err) + } + + check(t, mb, bs, objects[2:]) + }) + t.Run("ignore errors", func(t *testing.T) { testIgnoreErrors := func(t *testing.T, f func(*cache)) { wc, bs, mb := newCache(t) diff --git a/pkg/local_object_storage/writecache/mode.go b/pkg/local_object_storage/writecache/mode.go index 389567252..7e83959d1 100644 --- a/pkg/local_object_storage/writecache/mode.go +++ b/pkg/local_object_storage/writecache/mode.go @@ -18,9 +18,11 @@ func (c *cache) SetMode(m mode.Mode) error { c.modeMtx.Lock() defer c.modeMtx.Unlock() - if m.ReadOnly() == c.readOnly() { - c.mode = m - return nil + if m.NoMetabase() && !c.mode.NoMetabase() { + err := c.flush(true) + if err != nil { + return err + } } if c.db != nil { @@ -37,6 +39,11 @@ func (c *cache) SetMode(m mode.Mode) error { time.Sleep(time.Second) } + if m.NoMetabase() { + c.mode = m + return nil + } + if err := c.openStore(m.ReadOnly()); err != nil { return err }