diff --git a/pkg/local_object_storage/writecache/delete.go b/pkg/local_object_storage/writecache/delete.go index a6ad60e96..99f00d0fc 100644 --- a/pkg/local_object_storage/writecache/delete.go +++ b/pkg/local_object_storage/writecache/delete.go @@ -47,6 +47,7 @@ func (c *cache) Delete(addr *objectSDK.Address) error { } c.dbSize.Sub(uint64(has)) storagelog.Write(c.log, storagelog.AddressField(saddr), storagelog.OpField("db DELETE")) + c.objCounters.DecDB() return nil } @@ -57,6 +58,7 @@ func (c *cache) Delete(addr *objectSDK.Address) error { if err == nil { storagelog.Write(c.log, storagelog.AddressField(saddr), storagelog.OpField("fstree DELETE")) + c.objCounters.DecFS() } return err diff --git a/pkg/local_object_storage/writecache/options.go b/pkg/local_object_storage/writecache/options.go index 62a40cc6c..71ecb4427 100644 --- a/pkg/local_object_storage/writecache/options.go +++ b/pkg/local_object_storage/writecache/options.go @@ -26,6 +26,11 @@ type options struct { smallObjectSize uint64 // workersCount is the number of workers flushing objects in parallel. workersCount int + // maxCacheSize is the maximum total size of all objects saved in cache (DB + FS). + // 1 GiB by default. + maxCacheSize uint64 + // objCounters is an ObjectCounters instance needed for cache size estimation. + objCounters ObjectCounters } // WithLogger sets logger. @@ -88,3 +93,17 @@ func WithFlushWorkersCount(c int) Option { } } } + +// WithObjectCounters sets ObjectCounters instance needed for cache write-cache size estimation. +func WithObjectCounters(v ObjectCounters) Option { + return func(o *options) { + o.objCounters = v + } +} + +// WithMaxCacheSize sets maximum write-cache size in bytes. +func WithMaxCacheSize(sz uint64) Option { + return func(o *options) { + o.maxCacheSize = sz + } +} diff --git a/pkg/local_object_storage/writecache/persist.go b/pkg/local_object_storage/writecache/persist.go index d6d766212..afb7ff097 100644 --- a/pkg/local_object_storage/writecache/persist.go +++ b/pkg/local_object_storage/writecache/persist.go @@ -54,18 +54,28 @@ func (c *cache) persistLoop() { func (c *cache) persistToCache(objs []objectInfo) []int { var ( - failMem []int + failMem []int // some index is negative => all objects starting from it will overflow the cache doneMem []int ) var sz uint64 err := c.db.Update(func(tx *bbolt.Tx) error { b := tx.Bucket(defaultBucket) + cacheSz := c.estimateCacheSize() for i := range objs { if uint64(len(objs[i].data)) >= c.smallObjectSize { failMem = append(failMem, i) continue } + // check if object will overflow write-cache size limit + updCacheSz := c.incSizeDB(cacheSz) + if updCacheSz > c.maxCacheSize { + // set negative index. We decrement index to cover 0 val (overflow is practically impossible) + failMem = append(failMem, -i-1) + + return nil + } + err := b.Put([]byte(objs[i].addr), objs[i].data) if err != nil { return err @@ -73,6 +83,10 @@ func (c *cache) persistToCache(objs []objectInfo) []int { sz += uint64(len(objs[i].data)) doneMem = append(doneMem, i) storagelog.Write(c.log, storagelog.AddressField(objs[i].addr), storagelog.OpField("db PUT")) + + // update cache size + cacheSz = updCacheSz + c.objCounters.IncDB() } return nil }) @@ -88,17 +102,55 @@ func (c *cache) persistToCache(objs []objectInfo) []int { var failDisk []int - for _, i := range failMem { - if uint64(len(objs[i].data)) > c.maxObjectSize { - failDisk = append(failDisk, i) + cacheSz := c.estimateCacheSize() + + for _, objInd := range failMem { + var ( + updCacheSz uint64 + overflowInd = -1 + ) + + if objInd < 0 { + // actually, since the overflow was detected in DB tx, the required space could well have been freed, + // but it is easier to consider the entire method atomic + overflowInd = -objInd - 1 // subtract 1 since we decremented index above + } else { + // check if object will overflow write-cache size limit + if updCacheSz = c.incSizeFS(cacheSz); updCacheSz > c.maxCacheSize { + overflowInd = objInd + } + } + + if overflowInd >= 0 { + loop: + for j := range objs[overflowInd:] { + // exclude objects which are already stored in DB + for _, doneMemInd := range doneMem { + if j == doneMemInd { + continue loop + } + } + + failDisk = append(failDisk, j) + } + + break + } + + if uint64(len(objs[objInd].data)) > c.maxObjectSize { + failDisk = append(failDisk, objInd) continue } - err := c.fsTree.Put(objs[i].obj.Address(), objs[i].data) + err := c.fsTree.Put(objs[objInd].obj.Address(), objs[objInd].data) if err != nil { - failDisk = append(failDisk, i) + failDisk = append(failDisk, objInd) } else { - storagelog.Write(c.log, storagelog.AddressField(objs[i].addr), storagelog.OpField("fstree PUT")) + storagelog.Write(c.log, storagelog.AddressField(objs[objInd].addr), storagelog.OpField("fstree PUT")) + + // update cache size + cacheSz = updCacheSz + c.objCounters.IncFS() } } diff --git a/pkg/local_object_storage/writecache/state.go b/pkg/local_object_storage/writecache/state.go new file mode 100644 index 000000000..d51d1f937 --- /dev/null +++ b/pkg/local_object_storage/writecache/state.go @@ -0,0 +1,103 @@ +package writecache + +import ( + "fmt" + + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" + "go.etcd.io/bbolt" + "go.uber.org/atomic" +) + +// ObjectCounters is an interface of the storage of cached object amount. +type ObjectCounters interface { + // Increments number of objects saved in DB. + IncDB() + // Decrements number of objects saved in DB. + DecDB() + // Returns number of objects saved in DB. + DB() uint64 + + // Increments number of objects saved in FSTree. + IncFS() + // Decrements number of objects saved in FSTree. + DecFS() + // Returns number of objects saved in FSTree. + FS() uint64 + + // Reads number of objects saved in write-cache. It is called on write-cache initialization step. + Read() error + // Flushes the values and closes the storage. It is called on write-cache shutdown. + FlushAndClose() +} + +func (c *cache) estimateCacheSize() uint64 { + return c.objCounters.DB()*c.smallObjectSize + c.objCounters.FS()*c.maxObjectSize +} + +func (c *cache) incSizeDB(sz uint64) uint64 { + return sz + c.smallObjectSize +} + +func (c *cache) incSizeFS(sz uint64) uint64 { + return sz + c.maxObjectSize +} + +type counters struct { + cDB, cFS atomic.Uint64 + + db *bbolt.DB + + fs *fstree.FSTree +} + +func (x *counters) IncDB() { + x.cDB.Inc() +} + +func (x *counters) DecDB() { + x.cDB.Dec() +} + +func (x *counters) DB() uint64 { + return x.cDB.Load() +} + +func (x *counters) IncFS() { + x.cFS.Inc() +} + +func (x *counters) DecFS() { + x.cFS.Dec() +} + +func (x *counters) FS() uint64 { + return x.cFS.Load() +} + +func (x *counters) Read() error { + var inDB uint64 + + err := x.db.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + if b != nil { + inDB = uint64(b.Stats().KeyN) + } + + return nil + }) + if err != nil { + return fmt.Errorf("could not read write-cache DB counter: %w", err) + } + + x.cDB.Store(inDB) + + // FIXME: calculate the actual value in FSTree (new method?). + // For now we can think that db/fs = 50/50. + x.cFS.Store(inDB) + + return nil +} + +func (x *counters) FlushAndClose() { + // values aren't stored +} diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go index 17e4075aa..e82e1fb41 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -118,6 +118,7 @@ func (c *cache) deleteFromDB(keys [][]byte) error { return err } c.dbSize.Sub(sz) + c.objCounters.DecDB() return nil } @@ -139,6 +140,7 @@ func (c *cache) deleteFromDisk(keys [][]byte) error { continue } else if err == nil { storagelog.Write(c.log, storagelog.AddressField(string(keys[i])), storagelog.OpField("fstree DELETE")) + c.objCounters.DecFS() } } diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go index 593437ba3..1c4afa06e 100644 --- a/pkg/local_object_storage/writecache/writecache.go +++ b/pkg/local_object_storage/writecache/writecache.go @@ -60,6 +60,7 @@ const ( maxInMemorySizeBytes = 1024 * 1024 * 1024 // 1 GiB maxObjectSize = 64 * 1024 * 1024 // 64 MiB smallObjectSize = 32 * 1024 // 32 KiB + maxCacheSizeBytes = 1 << 30 // 1 GiB ) var ( @@ -81,6 +82,7 @@ func New(opts ...Option) Cache { maxObjectSize: maxObjectSize, smallObjectSize: smallObjectSize, workersCount: flushWorkersCount, + maxCacheSize: maxCacheSizeBytes, }, } @@ -91,9 +93,21 @@ func New(opts ...Option) Cache { return c } -// Open opens and initializes database. +// Open opens and initializes database. Reads object counters from the ObjectCounters instance. func (c *cache) Open() error { - return c.openStore() + err := c.openStore() + if err != nil { + return err + } + + if c.objCounters == nil { + c.objCounters = &counters{ + db: c.db, + fs: c.fsTree, + } + } + + return c.objCounters.Read() } // Init runs necessary services. @@ -103,8 +117,9 @@ func (c *cache) Init() error { return nil } -// Close closes db connection and stops services. +// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op. func (c *cache) Close() error { close(c.closeCh) + c.objCounters.FlushAndClose() return c.db.Close() }