From 35c9b6b26dcbb8fe1e03f69f0f8f10262db07e29 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Wed, 10 May 2023 17:43:49 +0300 Subject: [PATCH] [#314] writecache: remove objects right after they are flushed Signed-off-by: Evgenii Stratonikov --- pkg/local_object_storage/writecache/flush.go | 28 +-- .../writecache/flush_test.go | 99 +-------- pkg/local_object_storage/writecache/get.go | 2 - pkg/local_object_storage/writecache/init.go | 192 ------------------ .../writecache/iterate.go | 6 - pkg/local_object_storage/writecache/mode.go | 16 -- pkg/local_object_storage/writecache/put.go | 2 - .../writecache/storage.go | 46 +---- .../writecache/writecache.go | 42 ++-- 9 files changed, 25 insertions(+), 408 deletions(-) delete mode 100644 pkg/local_object_storage/writecache/init.go diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index 1e24a42e..c6c8a946 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -78,7 +78,7 @@ func (c *cache) flushSmallObjects() { m = m[:0] c.modeMtx.RLock() - if c.readOnly() || !c.initialized.Load() { + if c.readOnly() { c.modeMtx.RUnlock() time.Sleep(time.Second) continue @@ -117,10 +117,6 @@ func (c *cache) flushSmallObjects() { var count int for i := range m { - if c.flushed.Contains(m[i].addr) { - continue - } - obj := object.New() if err := obj.Unmarshal(m[i].data); err != nil { continue @@ -157,9 +153,6 @@ func (c *cache) workerFlushBig(ctx context.Context) { if c.readOnly() { c.modeMtx.RUnlock() break - } else if !c.initialized.Load() { - c.modeMtx.RUnlock() - continue } _ = c.flushFSTree(ctx, true) @@ -187,10 +180,6 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error { sAddr := addr.EncodeToString() - if _, ok := c.store.flushed.Peek(sAddr); ok { - return nil - } - data, err := f() if err != nil { c.reportFlushError("can't read a file", sAddr, err) @@ -218,9 +207,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { return err } - // mark object as flushed - c.flushed.Add(sAddr, false) - + c.deleteFromDisk(ctx, []string{sAddr}) return nil } @@ -242,9 +229,12 @@ func (c *cache) workerFlushSmall() { } err := c.flushObject(context.TODO(), obj, nil) - if err == nil { - c.flushed.Add(objectCore.AddressOf(obj).EncodeToString(), true) + if err != nil { + // Error is handled in flushObject. + continue } + + c.deleteFromDB([]string{objectCore.AddressOf(obj).EncodeToString()}) } } @@ -306,10 +296,6 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { cs := b.Cursor() for k, data := cs.Seek(nil); k != nil; k, data = cs.Next() { sa := string(k) - if _, ok := c.flushed.Peek(sa); ok { - continue - } - if err := addr.DecodeString(sa); err != nil { c.reportFlushError("can't decode object address from the DB", sa, err) if ignoreErrors { diff --git a/pkg/local_object_storage/writecache/flush_test.go b/pkg/local_object_storage/writecache/flush_test.go index 2cec0708..e5ca8573 100644 --- a/pkg/local_object_storage/writecache/flush_test.go +++ b/pkg/local_object_storage/writecache/flush_test.go @@ -5,7 +5,6 @@ import ( "os" "path/filepath" "testing" - "time" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" @@ -15,7 +14,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" checksumtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum/test" - apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" @@ -109,22 +107,9 @@ func TestFlush(t *testing.T) { require.NoError(t, bs.SetMode(mode.ReadWrite)) require.NoError(t, mb.SetMode(mode.ReadWrite)) - wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true) - wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false) - require.NoError(t, wc.Flush(context.Background(), false)) - for i := 0; i < 2; i++ { - var mPrm meta.GetPrm - mPrm.SetAddress(objects[i].addr) - _, err := mb.Get(context.Background(), mPrm) - require.Error(t, err) - - _, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr}) - require.Error(t, err) - } - - check(t, mb, bs, objects[2:]) + check(t, mb, bs, objects) }) t.Run("flush on moving to degraded mode", func(t *testing.T) { @@ -138,23 +123,9 @@ func TestFlush(t *testing.T) { require.NoError(t, wc.SetMode(mode.ReadOnly)) require.NoError(t, bs.SetMode(mode.ReadWrite)) require.NoError(t, mb.SetMode(mode.ReadWrite)) - - wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true) - wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false) - require.NoError(t, wc.SetMode(mode.Degraded)) - for i := 0; i < 2; i++ { - var mPrm meta.GetPrm - mPrm.SetAddress(objects[i].addr) - _, err := mb.Get(context.Background(), mPrm) - require.Error(t, err) - - _, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr}) - require.Error(t, err) - } - - check(t, mb, bs, objects[2:]) + check(t, mb, bs, objects) }) t.Run("ignore errors", func(t *testing.T) { @@ -223,67 +194,6 @@ func TestFlush(t *testing.T) { }) }) }) - - t.Run("on init", func(t *testing.T) { - wc, bs, mb := newCache(t) - objects := []objectPair{ - // removed - putObject(t, wc, 1), - putObject(t, wc, smallSize+1), - // not found - putObject(t, wc, 1), - putObject(t, wc, smallSize+1), - // ok - putObject(t, wc, 1), - putObject(t, wc, smallSize+1), - } - - require.NoError(t, wc.Close()) - require.NoError(t, bs.SetMode(mode.ReadWrite)) - require.NoError(t, mb.SetMode(mode.ReadWrite)) - - for i := range objects { - var prm meta.PutPrm - prm.SetObject(objects[i].obj) - _, err := mb.Put(context.Background(), prm) - require.NoError(t, err) - } - - var inhumePrm meta.InhumePrm - inhumePrm.SetAddresses(objects[0].addr, objects[1].addr) - inhumePrm.SetTombstoneAddress(oidtest.Address()) - _, err := mb.Inhume(context.Background(), inhumePrm) - require.NoError(t, err) - - var deletePrm meta.DeletePrm - deletePrm.SetAddresses(objects[2].addr, objects[3].addr) - _, err = mb.Delete(context.Background(), deletePrm) - require.NoError(t, err) - - require.NoError(t, bs.SetMode(mode.ReadOnly)) - require.NoError(t, mb.SetMode(mode.ReadOnly)) - - // Open in read-only: no error, nothing is removed. - require.NoError(t, wc.Open(true)) - initWC(t, wc) - for i := range objects { - _, err := wc.Get(context.Background(), objects[i].addr) - require.NoError(t, err, i) - } - require.NoError(t, wc.Close()) - - // Open in read-write: no error, something is removed. - require.NoError(t, wc.Open(false)) - initWC(t, wc) - for i := range objects { - _, err := wc.Get(context.Background(), objects[i].addr) - if i < 2 { - require.ErrorAs(t, err, new(apistatus.ObjectNotFound), i) - } else { - require.NoError(t, err, i) - } - } - }) } func putObject(t *testing.T, c Cache, size int) objectPair { @@ -321,11 +231,6 @@ func newObject(t *testing.T, size int) (*object.Object, []byte) { func initWC(t *testing.T, wc Cache) { require.NoError(t, wc.Init()) - - require.Eventually(t, func() bool { - rawWc := wc.(*cache) - return rawWc.initialized.Load() - }, 100*time.Second, 1*time.Millisecond) } type dummyEpoch struct{} diff --git a/pkg/local_object_storage/writecache/get.go b/pkg/local_object_storage/writecache/get.go index 6af1bd18..030f9b41 100644 --- a/pkg/local_object_storage/writecache/get.go +++ b/pkg/local_object_storage/writecache/get.go @@ -30,7 +30,6 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e value, err := Get(c.db, []byte(saddr)) if err == nil { obj := objectSDK.New() - c.flushed.Get(saddr) return obj, obj.Unmarshal(value) } @@ -39,7 +38,6 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e return nil, logicerr.Wrap(apistatus.ObjectNotFound{}) } - c.flushed.Get(saddr) return res.Object, nil } diff --git a/pkg/local_object_storage/writecache/init.go b/pkg/local_object_storage/writecache/init.go deleted file mode 100644 index 2ca8ccee..00000000 --- a/pkg/local_object_storage/writecache/init.go +++ /dev/null @@ -1,192 +0,0 @@ -package writecache - -import ( - "context" - "errors" - "sync" - - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" - storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" - meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" - apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.etcd.io/bbolt" - "go.uber.org/zap" -) - -func (c *cache) initFlushMarks(ctx context.Context) { - var localWG sync.WaitGroup - - localWG.Add(1) - go func() { - defer localWG.Done() - - c.fsTreeFlushMarkUpdate(ctx) - }() - - localWG.Add(1) - go func() { - defer localWG.Done() - - c.dbFlushMarkUpdate(ctx) - }() - - c.initWG.Add(1) - c.wg.Add(1) - go func() { - defer c.wg.Done() - defer c.initWG.Done() - - localWG.Wait() - - select { - case <-c.stopInitCh: - return - case <-c.closeCh: - return - default: - } - - c.initialized.Store(true) - }() -} - -var errStopIter = errors.New("stop iteration") - -func (c *cache) fsTreeFlushMarkUpdate(ctx context.Context) { - c.log.Info(logs.WritecacheFillingFlushMarksForObjectsInFSTree) - - var prm common.IteratePrm - prm.LazyHandler = func(addr oid.Address, _ func() ([]byte, error)) error { - select { - case <-c.closeCh: - return errStopIter - case <-c.stopInitCh: - return errStopIter - default: - } - - flushed, needRemove := c.flushStatus(ctx, addr) - if flushed { - c.store.flushed.Add(addr.EncodeToString(), true) - if needRemove { - var prm common.DeletePrm - prm.Address = addr - - _, err := c.fsTree.Delete(ctx, prm) - if err == nil { - storagelog.Write(c.log, - storagelog.AddressField(addr), - storagelog.StorageTypeField(wcStorageType), - storagelog.OpField("fstree DELETE"), - ) - } - } - } - return nil - } - - c.modeMtx.RLock() - defer c.modeMtx.RUnlock() - - _, _ = c.fsTree.Iterate(prm) - - c.log.Info(logs.WritecacheFinishedUpdatingFSTreeFlushMarks) -} - -func (c *cache) dbFlushMarkUpdate(ctx context.Context) { - c.log.Info(logs.WritecacheFillingFlushMarksForObjectsInDatabase) - - c.modeMtx.RLock() - defer c.modeMtx.RUnlock() - - var m []string - var indices []int - var lastKey []byte - var batchSize = flushBatchSize - for { - select { - case <-c.closeCh: - return - case <-c.stopInitCh: - return - default: - } - - m = m[:0] - indices = indices[:0] - - // We put objects in batches of fixed size to not interfere with main put cycle a lot. - _ = c.db.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - cs := b.Cursor() - for k, _ := cs.Seek(lastKey); k != nil && len(m) < batchSize; k, _ = cs.Next() { - m = append(m, string(k)) - } - return nil - }) - - var addr oid.Address - for i := range m { - if err := addr.DecodeString(m[i]); err != nil { - continue - } - - flushed, needRemove := c.flushStatus(ctx, addr) - if flushed { - c.store.flushed.Add(addr.EncodeToString(), true) - if needRemove { - indices = append(indices, i) - } - } - } - - if len(m) == 0 { - break - } - - err := c.db.Batch(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - for _, j := range indices { - if err := b.Delete([]byte(m[j])); err != nil { - return err - } - } - return nil - }) - if err == nil { - for _, j := range indices { - storagelog.Write(c.log, - zap.String("address", m[j]), - storagelog.StorageTypeField(wcStorageType), - storagelog.OpField("db DELETE"), - ) - } - } - lastKey = append([]byte(m[len(m)-1]), 0) - } - - c.log.Info(logs.WritecacheFinishedUpdatingFlushMarks) -} - -// flushStatus returns info about the object state in the main storage. -// First return value is true iff object exists. -// Second return value is true iff object can be safely removed. -func (c *cache) flushStatus(ctx context.Context, addr oid.Address) (bool, bool) { - var existsPrm meta.ExistsPrm - existsPrm.SetAddress(addr) - - _, err := c.metabase.Exists(ctx, existsPrm) - if err != nil { - needRemove := errors.Is(err, meta.ErrObjectIsExpired) || errors.As(err, new(apistatus.ObjectAlreadyRemoved)) - return needRemove, needRemove - } - - var prm meta.StorageIDPrm - prm.SetAddress(addr) - - mRes, _ := c.metabase.StorageID(ctx, prm) - res, err := c.blobstor.Exists(ctx, common.ExistsPrm{Address: addr, StorageID: mRes.StorageID()}) - return err == nil && res.Exists, false -} diff --git a/pkg/local_object_storage/writecache/iterate.go b/pkg/local_object_storage/writecache/iterate.go index 228dd259..ebe97952 100644 --- a/pkg/local_object_storage/writecache/iterate.go +++ b/pkg/local_object_storage/writecache/iterate.go @@ -41,9 +41,6 @@ func (c *cache) Iterate(prm IterationPrm) error { err := c.db.View(func(tx *bbolt.Tx) error { b := tx.Bucket(defaultBucket) return b.ForEach(func(k, data []byte) error { - if _, ok := c.flushed.Peek(string(k)); ok { - return nil - } return prm.handler(data) }) }) @@ -54,9 +51,6 @@ func (c *cache) Iterate(prm IterationPrm) error { var fsPrm common.IteratePrm fsPrm.IgnoreErrors = prm.ignoreErrors fsPrm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error { - if _, ok := c.flushed.Peek(addr.EncodeToString()); ok { - return nil - } data, err := f() if err != nil { if prm.ignoreErrors { diff --git a/pkg/local_object_storage/writecache/mode.go b/pkg/local_object_storage/writecache/mode.go index 14f8af49..ca6faff4 100644 --- a/pkg/local_object_storage/writecache/mode.go +++ b/pkg/local_object_storage/writecache/mode.go @@ -37,22 +37,6 @@ func (c *cache) setMode(ctx context.Context, m mode.Mode) error { var err error turnOffMeta := m.NoMetabase() - if !c.initialized.Load() { - close(c.stopInitCh) - - c.initWG.Wait() - c.stopInitCh = make(chan struct{}) - - defer func() { - if err == nil && !turnOffMeta { - c.initFlushMarks(ctx) - } - }() - } - - c.modeMtx.Lock() - defer c.modeMtx.Unlock() - if turnOffMeta && !c.mode.NoMetabase() { err = c.flush(ctx, true) if err != nil { diff --git a/pkg/local_object_storage/writecache/put.go b/pkg/local_object_storage/writecache/put.go index 223a73be..04d818b3 100644 --- a/pkg/local_object_storage/writecache/put.go +++ b/pkg/local_object_storage/writecache/put.go @@ -37,8 +37,6 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro defer c.modeMtx.RUnlock() if c.readOnly() { return common.PutRes{}, ErrReadOnly - } else if !c.initialized.Load() { - return common.PutRes{}, ErrNotInitialized } sz := uint64(len(prm.RawData)) diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go index aeae752e..c06d16c0 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -13,8 +13,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - lru "github.com/hashicorp/golang-lru/v2" - "github.com/hashicorp/golang-lru/v2/simplelru" "go.etcd.io/bbolt" "go.uber.org/zap" ) @@ -22,19 +20,7 @@ import ( // store represents persistent storage with in-memory LRU cache // for flushed items on top of it. type store struct { - maxFlushedMarksCount int - maxRemoveBatchSize int - - // flushed contains addresses of objects that were already flushed to the main storage. - // We use LRU cache instead of map here to facilitate removing of unused object in favour of - // frequently read ones. - // MUST NOT be used inside bolt db transaction because it's eviction handler - // removes untracked items from the database. - flushed simplelru.LRUCache[string, bool] - db *bbolt.DB - - dbKeysToRemove []string - fsKeysToRemove []string + db *bbolt.DB } const dbName = "small.bolt" @@ -73,35 +59,9 @@ func (c *cache) openStore(readOnly bool) error { return fmt.Errorf("could not open FSTree: %w", err) } - // Write-cache can be opened multiple times during `SetMode`. - // flushed map must not be re-created in this case. - if c.flushed == nil { - c.flushed, _ = lru.NewWithEvict[string, bool](c.maxFlushedMarksCount, c.removeFlushed) - } - - c.initialized.Store(false) - return nil } -// removeFlushed removes an object from the writecache. -// To minimize interference with the client operations, the actual removal -// is done in batches. -// It is not thread-safe and is used only as an evict callback to LRU cache. -func (c *cache) removeFlushed(key string, value bool) { - fromDatabase := value - if fromDatabase { - c.dbKeysToRemove = append(c.dbKeysToRemove, key) - } else { - c.fsKeysToRemove = append(c.fsKeysToRemove, key) - } - - if len(c.dbKeysToRemove)+len(c.fsKeysToRemove) >= c.maxRemoveBatchSize { - c.dbKeysToRemove = c.deleteFromDB(c.dbKeysToRemove) - c.fsKeysToRemove = c.deleteFromDisk(c.fsKeysToRemove) - } -} - func (c *cache) deleteFromDB(keys []string) []string { if len(keys) == 0 { return keys @@ -133,7 +93,7 @@ func (c *cache) deleteFromDB(keys []string) []string { return keys[:len(keys)-errorIndex] } -func (c *cache) deleteFromDisk(keys []string) []string { +func (c *cache) deleteFromDisk(ctx context.Context, keys []string) []string { if len(keys) == 0 { return keys } @@ -147,7 +107,7 @@ func (c *cache) deleteFromDisk(keys []string) []string { continue } - _, err := c.fsTree.Delete(context.TODO(), common.DeletePrm{Address: addr}) + _, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr}) if err != nil && !errors.As(err, new(apistatus.ObjectNotFound)) { c.log.Error(logs.WritecacheCantRemoveObjectFromWritecache, zap.Error(err)) diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go index bdcc9bbf..83ecf219 100644 --- a/pkg/local_object_storage/writecache/writecache.go +++ b/pkg/local_object_storage/writecache/writecache.go @@ -5,7 +5,6 @@ import ( "os" "sync" - "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" @@ -13,7 +12,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.etcd.io/bbolt" - "go.uber.org/atomic" "go.uber.org/zap" ) @@ -52,11 +50,8 @@ type cache struct { // mtx protects statistics, counters and compressFlags. mtx sync.RWMutex - mode mode.Mode - initialized atomic.Bool - stopInitCh chan struct{} // used to sync initWG initialisation routines and _only_ them - initWG sync.WaitGroup // for initialisation routines only - modeMtx sync.RWMutex + mode mode.Mode + modeMtx sync.RWMutex // compressFlags maps address of a big object to boolean value indicating // whether object should be compressed. @@ -96,9 +91,8 @@ var ( // New creates new writecache instance. func New(opts ...Option) Cache { c := &cache{ - flushCh: make(chan *object.Object), - mode: mode.ReadWrite, - stopInitCh: make(chan struct{}), + flushCh: make(chan *object.Object), + mode: mode.ReadWrite, compressFlags: make(map[string]struct{}), options: options{ @@ -117,12 +111,6 @@ func New(opts ...Option) Cache { opts[i](&c.options) } - // Make the LRU cache contain which take approximately 3/4 of the maximum space. - // Assume small and big objects are stored in 50-50 proportion. - c.maxFlushedMarksCount = int(c.maxCacheSize/c.maxObjectSize+c.maxCacheSize/c.smallObjectSize) / 2 * 3 / 4 - // Trigger the removal when the cache is 7/8 full, so that new items can still arrive. - c.maxRemoveBatchSize = c.maxFlushedMarksCount / 8 - return c } @@ -153,31 +141,27 @@ func (c *cache) Open(readOnly bool) error { // Init runs necessary services. func (c *cache) Init() error { - ctx, span := tracing.StartSpanFromContext(context.TODO(), "writecache.Init") - defer span.End() - - c.initFlushMarks(ctx) c.runFlushLoop() return nil } // Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op. func (c *cache) Close() error { - // Finish all in-progress operations. - if err := c.setMode(context.TODO(), mode.ReadOnly); err != nil { - return err - } - + // We cannot lock mutex for the whole operation duration + // because it is taken by some background workers, so `wg.Wait()` is done without modeMtx. + c.modeMtx.Lock() if c.closeCh != nil { close(c.closeCh) } + c.mode = mode.DegradedReadOnly // prevent new operations from being processed + c.modeMtx.Unlock() + c.wg.Wait() - if c.closeCh != nil { - c.closeCh = nil - } - c.initialized.Store(false) + c.modeMtx.Lock() + defer c.modeMtx.Unlock() + c.closeCh = nil var err error if c.db != nil { err = c.db.Close()