[#568] shard/writecache: Maintain memory size and db size

Signed-off-by: ZhangTao1596 <zhangtao@ngd.neo.org>
This commit is contained in:
ZhangTao1596 2021-06-21 17:42:09 +08:00 committed by Alex Vanin
parent adbbad0beb
commit 9b87e6267d
4 changed files with 32 additions and 20 deletions

View file

@ -19,6 +19,7 @@ func (c *cache) Delete(addr *objectSDK.Address) error {
if saddr == c.mem[i].addr { if saddr == c.mem[i].addr {
copy(c.mem[i:], c.mem[i+1:]) copy(c.mem[i:], c.mem[i+1:])
c.mem = c.mem[:len(c.mem)-1] c.mem = c.mem[:len(c.mem)-1]
c.curMemSize -= uint64(len(c.mem[i].data))
c.mtx.Unlock() c.mtx.Unlock()
return nil return nil
} }
@ -26,18 +27,24 @@ func (c *cache) Delete(addr *objectSDK.Address) error {
c.mtx.Unlock() c.mtx.Unlock()
// Check disk cache. // Check disk cache.
has := false var has int
_ = c.db.View(func(tx *bbolt.Tx) error { _ = c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket) b := tx.Bucket(defaultBucket)
has = b.Get([]byte(saddr)) != nil has = len(b.Get([]byte(saddr)))
return nil return nil
}) })
if has { if 0 < has {
return c.db.Update(func(tx *bbolt.Tx) error { err := c.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket) b := tx.Bucket(defaultBucket)
return b.Delete([]byte(saddr)) err := b.Delete([]byte(saddr))
return err
}) })
if err != nil {
return err
}
c.dbSize.Sub(uint64(has))
return nil
} }
err := c.fsTree.Delete(addr) err := c.fsTree.Delete(addr)

View file

@ -99,8 +99,6 @@ func (c *cache) flush() {
c.flushed.Add(m[i].addr, true) c.flushed.Add(m[i].addr, true)
} }
c.dbSize.Sub(uint64(sz))
c.log.Debug("flushed items from write-cache", c.log.Debug("flushed items from write-cache",
zap.Int("count", len(m)), zap.Int("count", len(m)),
zap.String("start", base58.Encode(lastKey))) zap.String("start", base58.Encode(lastKey)))

View file

@ -31,19 +31,13 @@ func (c *cache) persistLoop() {
zap.Int("total", len(m))) zap.Int("total", len(m)))
c.mtx.Lock() c.mtx.Lock()
c.curMemSize = 0
n := copy(c.mem, c.mem[len(m):]) n := copy(c.mem, c.mem[len(m):])
c.mem = c.mem[:n] c.mem = c.mem[:n]
for i := range c.mem { for i := range c.mem {
c.curMemSize += uint64(len(c.mem[i].data)) c.curMemSize += uint64(len(c.mem[i].data))
} }
c.mtx.Unlock() c.mtx.Unlock()
sz := 0
for i := range m {
sz += len(m[i].addr) + m[i].obj.ToV2().StableSize()
}
c.dbSize.Add(uint64(sz))
case <-c.closeCh: case <-c.closeCh:
return return
} }
@ -55,8 +49,8 @@ func (c *cache) persistToCache(objs []objectInfo) []int {
failMem []int failMem []int
doneMem []int doneMem []int
) )
var sz uint64
_ = c.db.Update(func(tx *bbolt.Tx) error { err := c.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket) b := tx.Bucket(defaultBucket)
for i := range objs { for i := range objs {
if uint64(len(objs[i].data)) >= c.smallObjectSize { if uint64(len(objs[i].data)) >= c.smallObjectSize {
@ -68,12 +62,14 @@ func (c *cache) persistToCache(objs []objectInfo) []int {
if err != nil { if err != nil {
return err return err
} }
sz += uint64(len(objs[i].data))
doneMem = append(doneMem, i) doneMem = append(doneMem, i)
} }
return nil return nil
}) })
if err == nil {
c.dbSize.Add(sz)
}
if len(doneMem) > 0 { if len(doneMem) > 0 {
c.evictObjects(len(doneMem)) c.evictObjects(len(doneMem))
for _, i := range doneMem { for _, i := range doneMem {

View file

@ -8,6 +8,7 @@ import (
lru "github.com/hashicorp/golang-lru" lru "github.com/hashicorp/golang-lru"
"github.com/hashicorp/golang-lru/simplelru" "github.com/hashicorp/golang-lru/simplelru"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
"go.etcd.io/bbolt" "go.etcd.io/bbolt"
"go.uber.org/zap" "go.uber.org/zap"
@ -95,16 +96,26 @@ func (c *cache) deleteFromDB(keys [][]byte) error {
if len(keys) == 0 { if len(keys) == 0 {
return nil return nil
} }
var sz uint64
return c.db.Update(func(tx *bbolt.Tx) error { err := c.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket) b := tx.Bucket(defaultBucket)
for i := range keys { for i := range keys {
has := b.Get(keys[i])
if has == nil {
return object.ErrNotFound
}
if err := b.Delete(keys[i]); err != nil { if err := b.Delete(keys[i]); err != nil {
return err return err
} }
sz += uint64(len(has))
} }
return nil return nil
}) })
if err != nil {
return err
}
c.dbSize.Sub(sz)
return nil
} }
func (c *cache) deleteFromDisk(keys [][]byte) error { func (c *cache) deleteFromDisk(keys [][]byte) error {