From a1696a81b6e0ff758288decd671c3c5c4cf5ba2e Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Wed, 8 Sep 2021 12:32:20 +0300 Subject: [PATCH] [#776] writecache: Limit size of used disk space There is a need to limit disk space used by write-cache. It is almost impossible to calculate the value exactly. It is proposed to estimate the size of the cache by the number of objects stored in it. Track amounts of objects saved in DB and FSTree separately. To do this, `ObjectCounters` interface is defined. It is generalized to a store of numbers that can be made persistent (new option `WithObjectCounters`). By default DB number is calculated as key number in default bucket, and FS number is set same to DB since it is currently hard to read the actual value from `FSTree` instance. Each PUT/DELETE operation to DB or FS increases/decreases corresponding counter. Before each PUT op an overflow check is performed with the following formula for evaluating the occupied space: `NumDB * MaxDBSize + NumFS * MaxFSSize`. If next PUT can cause write-cache overflow, object is written to the main storage. By default maximum write-cache size is set to 1GB. Signed-off-by: Leonard Lyubich --- pkg/local_object_storage/writecache/delete.go | 2 + .../writecache/options.go | 19 ++++ .../writecache/persist.go | 66 +++++++++-- pkg/local_object_storage/writecache/state.go | 103 ++++++++++++++++++ .../writecache/storage.go | 2 + .../writecache/writecache.go | 21 +++- 6 files changed, 203 insertions(+), 10 deletions(-) create mode 100644 pkg/local_object_storage/writecache/state.go diff --git a/pkg/local_object_storage/writecache/delete.go b/pkg/local_object_storage/writecache/delete.go index a6ad60e96..99f00d0fc 100644 --- a/pkg/local_object_storage/writecache/delete.go +++ b/pkg/local_object_storage/writecache/delete.go @@ -47,6 +47,7 @@ func (c *cache) Delete(addr *objectSDK.Address) error { } c.dbSize.Sub(uint64(has)) storagelog.Write(c.log, storagelog.AddressField(saddr), storagelog.OpField("db DELETE")) + c.objCounters.DecDB() return nil } @@ -57,6 +58,7 @@ func (c *cache) Delete(addr *objectSDK.Address) error { if err == nil { storagelog.Write(c.log, storagelog.AddressField(saddr), storagelog.OpField("fstree DELETE")) + c.objCounters.DecFS() } return err diff --git a/pkg/local_object_storage/writecache/options.go b/pkg/local_object_storage/writecache/options.go index 62a40cc6c..71ecb4427 100644 --- a/pkg/local_object_storage/writecache/options.go +++ b/pkg/local_object_storage/writecache/options.go @@ -26,6 +26,11 @@ type options struct { smallObjectSize uint64 // workersCount is the number of workers flushing objects in parallel. workersCount int + // maxCacheSize is the maximum total size of all objects saved in cache (DB + FS). + // 1 GiB by default. + maxCacheSize uint64 + // objCounters is an ObjectCounters instance needed for cache size estimation. + objCounters ObjectCounters } // WithLogger sets logger. @@ -88,3 +93,17 @@ func WithFlushWorkersCount(c int) Option { } } } + +// WithObjectCounters sets ObjectCounters instance needed for cache write-cache size estimation. +func WithObjectCounters(v ObjectCounters) Option { + return func(o *options) { + o.objCounters = v + } +} + +// WithMaxCacheSize sets maximum write-cache size in bytes. +func WithMaxCacheSize(sz uint64) Option { + return func(o *options) { + o.maxCacheSize = sz + } +} diff --git a/pkg/local_object_storage/writecache/persist.go b/pkg/local_object_storage/writecache/persist.go index d6d766212..afb7ff097 100644 --- a/pkg/local_object_storage/writecache/persist.go +++ b/pkg/local_object_storage/writecache/persist.go @@ -54,18 +54,28 @@ func (c *cache) persistLoop() { func (c *cache) persistToCache(objs []objectInfo) []int { var ( - failMem []int + failMem []int // some index is negative => all objects starting from it will overflow the cache doneMem []int ) var sz uint64 err := c.db.Update(func(tx *bbolt.Tx) error { b := tx.Bucket(defaultBucket) + cacheSz := c.estimateCacheSize() for i := range objs { if uint64(len(objs[i].data)) >= c.smallObjectSize { failMem = append(failMem, i) continue } + // check if object will overflow write-cache size limit + updCacheSz := c.incSizeDB(cacheSz) + if updCacheSz > c.maxCacheSize { + // set negative index. We decrement index to cover 0 val (overflow is practically impossible) + failMem = append(failMem, -i-1) + + return nil + } + err := b.Put([]byte(objs[i].addr), objs[i].data) if err != nil { return err @@ -73,6 +83,10 @@ func (c *cache) persistToCache(objs []objectInfo) []int { sz += uint64(len(objs[i].data)) doneMem = append(doneMem, i) storagelog.Write(c.log, storagelog.AddressField(objs[i].addr), storagelog.OpField("db PUT")) + + // update cache size + cacheSz = updCacheSz + c.objCounters.IncDB() } return nil }) @@ -88,17 +102,55 @@ func (c *cache) persistToCache(objs []objectInfo) []int { var failDisk []int - for _, i := range failMem { - if uint64(len(objs[i].data)) > c.maxObjectSize { - failDisk = append(failDisk, i) + cacheSz := c.estimateCacheSize() + + for _, objInd := range failMem { + var ( + updCacheSz uint64 + overflowInd = -1 + ) + + if objInd < 0 { + // actually, since the overflow was detected in DB tx, the required space could well have been freed, + // but it is easier to consider the entire method atomic + overflowInd = -objInd - 1 // subtract 1 since we decremented index above + } else { + // check if object will overflow write-cache size limit + if updCacheSz = c.incSizeFS(cacheSz); updCacheSz > c.maxCacheSize { + overflowInd = objInd + } + } + + if overflowInd >= 0 { + loop: + for j := range objs[overflowInd:] { + // exclude objects which are already stored in DB + for _, doneMemInd := range doneMem { + if j == doneMemInd { + continue loop + } + } + + failDisk = append(failDisk, j) + } + + break + } + + if uint64(len(objs[objInd].data)) > c.maxObjectSize { + failDisk = append(failDisk, objInd) continue } - err := c.fsTree.Put(objs[i].obj.Address(), objs[i].data) + err := c.fsTree.Put(objs[objInd].obj.Address(), objs[objInd].data) if err != nil { - failDisk = append(failDisk, i) + failDisk = append(failDisk, objInd) } else { - storagelog.Write(c.log, storagelog.AddressField(objs[i].addr), storagelog.OpField("fstree PUT")) + storagelog.Write(c.log, storagelog.AddressField(objs[objInd].addr), storagelog.OpField("fstree PUT")) + + // update cache size + cacheSz = updCacheSz + c.objCounters.IncFS() } } diff --git a/pkg/local_object_storage/writecache/state.go b/pkg/local_object_storage/writecache/state.go new file mode 100644 index 000000000..d51d1f937 --- /dev/null +++ b/pkg/local_object_storage/writecache/state.go @@ -0,0 +1,103 @@ +package writecache + +import ( + "fmt" + + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" + "go.etcd.io/bbolt" + "go.uber.org/atomic" +) + +// ObjectCounters is an interface of the storage of cached object amount. +type ObjectCounters interface { + // Increments number of objects saved in DB. + IncDB() + // Decrements number of objects saved in DB. + DecDB() + // Returns number of objects saved in DB. + DB() uint64 + + // Increments number of objects saved in FSTree. + IncFS() + // Decrements number of objects saved in FSTree. + DecFS() + // Returns number of objects saved in FSTree. + FS() uint64 + + // Reads number of objects saved in write-cache. It is called on write-cache initialization step. + Read() error + // Flushes the values and closes the storage. It is called on write-cache shutdown. + FlushAndClose() +} + +func (c *cache) estimateCacheSize() uint64 { + return c.objCounters.DB()*c.smallObjectSize + c.objCounters.FS()*c.maxObjectSize +} + +func (c *cache) incSizeDB(sz uint64) uint64 { + return sz + c.smallObjectSize +} + +func (c *cache) incSizeFS(sz uint64) uint64 { + return sz + c.maxObjectSize +} + +type counters struct { + cDB, cFS atomic.Uint64 + + db *bbolt.DB + + fs *fstree.FSTree +} + +func (x *counters) IncDB() { + x.cDB.Inc() +} + +func (x *counters) DecDB() { + x.cDB.Dec() +} + +func (x *counters) DB() uint64 { + return x.cDB.Load() +} + +func (x *counters) IncFS() { + x.cFS.Inc() +} + +func (x *counters) DecFS() { + x.cFS.Dec() +} + +func (x *counters) FS() uint64 { + return x.cFS.Load() +} + +func (x *counters) Read() error { + var inDB uint64 + + err := x.db.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + if b != nil { + inDB = uint64(b.Stats().KeyN) + } + + return nil + }) + if err != nil { + return fmt.Errorf("could not read write-cache DB counter: %w", err) + } + + x.cDB.Store(inDB) + + // FIXME: calculate the actual value in FSTree (new method?). + // For now we can think that db/fs = 50/50. + x.cFS.Store(inDB) + + return nil +} + +func (x *counters) FlushAndClose() { + // values aren't stored +} diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go index 17e4075aa..e82e1fb41 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -118,6 +118,7 @@ func (c *cache) deleteFromDB(keys [][]byte) error { return err } c.dbSize.Sub(sz) + c.objCounters.DecDB() return nil } @@ -139,6 +140,7 @@ func (c *cache) deleteFromDisk(keys [][]byte) error { continue } else if err == nil { storagelog.Write(c.log, storagelog.AddressField(string(keys[i])), storagelog.OpField("fstree DELETE")) + c.objCounters.DecFS() } } diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go index 593437ba3..1c4afa06e 100644 --- a/pkg/local_object_storage/writecache/writecache.go +++ b/pkg/local_object_storage/writecache/writecache.go @@ -60,6 +60,7 @@ const ( maxInMemorySizeBytes = 1024 * 1024 * 1024 // 1 GiB maxObjectSize = 64 * 1024 * 1024 // 64 MiB smallObjectSize = 32 * 1024 // 32 KiB + maxCacheSizeBytes = 1 << 30 // 1 GiB ) var ( @@ -81,6 +82,7 @@ func New(opts ...Option) Cache { maxObjectSize: maxObjectSize, smallObjectSize: smallObjectSize, workersCount: flushWorkersCount, + maxCacheSize: maxCacheSizeBytes, }, } @@ -91,9 +93,21 @@ func New(opts ...Option) Cache { return c } -// Open opens and initializes database. +// Open opens and initializes database. Reads object counters from the ObjectCounters instance. func (c *cache) Open() error { - return c.openStore() + err := c.openStore() + if err != nil { + return err + } + + if c.objCounters == nil { + c.objCounters = &counters{ + db: c.db, + fs: c.fsTree, + } + } + + return c.objCounters.Read() } // Init runs necessary services. @@ -103,8 +117,9 @@ func (c *cache) Init() error { return nil } -// Close closes db connection and stops services. +// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op. func (c *cache) Close() error { close(c.closeCh) + c.objCounters.FlushAndClose() return c.db.Close() }