From 9b87e6267dfe979e35bcefd0dcf7cc4d8f6bc638 Mon Sep 17 00:00:00 2001 From: ZhangTao1596 Date: Mon, 21 Jun 2021 17:42:09 +0800 Subject: [PATCH] [#568] shard/writecache: Maintain memory size and db size Signed-off-by: ZhangTao1596 --- pkg/local_object_storage/writecache/delete.go | 17 ++++++++++++----- pkg/local_object_storage/writecache/flush.go | 2 -- pkg/local_object_storage/writecache/persist.go | 18 +++++++----------- pkg/local_object_storage/writecache/storage.go | 15 +++++++++++++-- 4 files changed, 32 insertions(+), 20 deletions(-) diff --git a/pkg/local_object_storage/writecache/delete.go b/pkg/local_object_storage/writecache/delete.go index c0d6533a0..fb0f752d2 100644 --- a/pkg/local_object_storage/writecache/delete.go +++ b/pkg/local_object_storage/writecache/delete.go @@ -19,6 +19,7 @@ func (c *cache) Delete(addr *objectSDK.Address) error { if saddr == c.mem[i].addr { copy(c.mem[i:], c.mem[i+1:]) c.mem = c.mem[:len(c.mem)-1] + c.curMemSize -= uint64(len(c.mem[i].data)) c.mtx.Unlock() return nil } @@ -26,18 +27,24 @@ func (c *cache) Delete(addr *objectSDK.Address) error { c.mtx.Unlock() // Check disk cache. - has := false + var has int _ = c.db.View(func(tx *bbolt.Tx) error { b := tx.Bucket(defaultBucket) - has = b.Get([]byte(saddr)) != nil + has = len(b.Get([]byte(saddr))) return nil }) - if has { - return c.db.Update(func(tx *bbolt.Tx) error { + if 0 < has { + err := c.db.Update(func(tx *bbolt.Tx) error { b := tx.Bucket(defaultBucket) - return b.Delete([]byte(saddr)) + err := b.Delete([]byte(saddr)) + return err }) + if err != nil { + return err + } + c.dbSize.Sub(uint64(has)) + return nil } err := c.fsTree.Delete(addr) diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index f42e01d9b..8b682a465 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -99,8 +99,6 @@ func (c *cache) flush() { c.flushed.Add(m[i].addr, true) } - c.dbSize.Sub(uint64(sz)) - c.log.Debug("flushed items from write-cache", zap.Int("count", len(m)), zap.String("start", base58.Encode(lastKey))) diff --git a/pkg/local_object_storage/writecache/persist.go b/pkg/local_object_storage/writecache/persist.go index 1d793decb..636b06112 100644 --- a/pkg/local_object_storage/writecache/persist.go +++ b/pkg/local_object_storage/writecache/persist.go @@ -31,19 +31,13 @@ func (c *cache) persistLoop() { zap.Int("total", len(m))) c.mtx.Lock() + c.curMemSize = 0 n := copy(c.mem, c.mem[len(m):]) c.mem = c.mem[:n] for i := range c.mem { c.curMemSize += uint64(len(c.mem[i].data)) } c.mtx.Unlock() - - sz := 0 - for i := range m { - sz += len(m[i].addr) + m[i].obj.ToV2().StableSize() - } - c.dbSize.Add(uint64(sz)) - case <-c.closeCh: return } @@ -55,8 +49,8 @@ func (c *cache) persistToCache(objs []objectInfo) []int { failMem []int doneMem []int ) - - _ = c.db.Update(func(tx *bbolt.Tx) error { + var sz uint64 + err := c.db.Update(func(tx *bbolt.Tx) error { b := tx.Bucket(defaultBucket) for i := range objs { if uint64(len(objs[i].data)) >= c.smallObjectSize { @@ -68,12 +62,14 @@ func (c *cache) persistToCache(objs []objectInfo) []int { if err != nil { return err } - + sz += uint64(len(objs[i].data)) doneMem = append(doneMem, i) } return nil }) - + if err == nil { + c.dbSize.Add(sz) + } if len(doneMem) > 0 { c.evictObjects(len(doneMem)) for _, i := range doneMem { diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go index 2e92b490d..aa7386cb4 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -8,6 +8,7 @@ import ( lru "github.com/hashicorp/golang-lru" "github.com/hashicorp/golang-lru/simplelru" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" "go.etcd.io/bbolt" "go.uber.org/zap" @@ -95,16 +96,26 @@ func (c *cache) deleteFromDB(keys [][]byte) error { if len(keys) == 0 { return nil } - - return c.db.Update(func(tx *bbolt.Tx) error { + var sz uint64 + err := c.db.Update(func(tx *bbolt.Tx) error { b := tx.Bucket(defaultBucket) for i := range keys { + has := b.Get(keys[i]) + if has == nil { + return object.ErrNotFound + } if err := b.Delete(keys[i]); err != nil { return err } + sz += uint64(len(has)) } return nil }) + if err != nil { + return err + } + c.dbSize.Sub(sz) + return nil } func (c *cache) deleteFromDisk(keys [][]byte) error {