diff --git a/pkg/local_object_storage/writecache/delete.go b/pkg/local_object_storage/writecache/delete.go index 99f00d0fc..838cf7f28 100644 --- a/pkg/local_object_storage/writecache/delete.go +++ b/pkg/local_object_storage/writecache/delete.go @@ -45,7 +45,6 @@ func (c *cache) Delete(addr *objectSDK.Address) error { if err != nil { return err } - c.dbSize.Sub(uint64(has)) storagelog.Write(c.log, storagelog.AddressField(saddr), storagelog.OpField("db DELETE")) c.objCounters.DecDB() return nil diff --git a/pkg/local_object_storage/writecache/persist.go b/pkg/local_object_storage/writecache/persist.go index afb7ff097..b810c7744 100644 --- a/pkg/local_object_storage/writecache/persist.go +++ b/pkg/local_object_storage/writecache/persist.go @@ -26,7 +26,7 @@ func (c *cache) persistLoop() { sort.Slice(m, func(i, j int) bool { return m[i].addr < m[j].addr }) start := time.Now() - c.persistObjects(m) + c.persistSmallObjects(m) c.log.Debug("persisted items to disk", zap.Duration("took", time.Since(start)), zap.Int("total", len(m))) @@ -52,130 +52,73 @@ func (c *cache) persistLoop() { } } -func (c *cache) persistToCache(objs []objectInfo) []int { - var ( - failMem []int // some index is negative => all objects starting from it will overflow the cache - doneMem []int - ) - var sz uint64 +// persistSmallObjects persists small objects to the write-cache database and +// pushes the to the flush workers queue. +func (c *cache) persistSmallObjects(objs []objectInfo) { + cacheSize := c.estimateCacheSize() + overflowIndex := len(objs) + for i := range objs { + newSize := c.incSizeDB(cacheSize) + if c.maxCacheSize < newSize { + overflowIndex = i + break + } + cacheSize = newSize + } + err := c.db.Update(func(tx *bbolt.Tx) error { b := tx.Bucket(defaultBucket) - cacheSz := c.estimateCacheSize() - for i := range objs { - if uint64(len(objs[i].data)) >= c.smallObjectSize { - failMem = append(failMem, i) - continue - } - - // check if object will overflow write-cache size limit - updCacheSz := c.incSizeDB(cacheSz) - if updCacheSz > c.maxCacheSize { - // set negative index. We decrement index to cover 0 val (overflow is practically impossible) - failMem = append(failMem, -i-1) - - return nil - } - + for i := 0; i < overflowIndex; i++ { err := b.Put([]byte(objs[i].addr), objs[i].data) if err != nil { return err } - sz += uint64(len(objs[i].data)) - doneMem = append(doneMem, i) - storagelog.Write(c.log, storagelog.AddressField(objs[i].addr), storagelog.OpField("db PUT")) - - // update cache size - cacheSz = updCacheSz - c.objCounters.IncDB() } return nil }) - if err == nil { - c.dbSize.Add(sz) - } - if len(doneMem) > 0 { - c.evictObjects(len(doneMem)) - for _, i := range doneMem { - c.flushed.Add(objs[i].addr, true) - } + if err != nil { + overflowIndex = 0 + } else { + c.evictObjects(overflowIndex) } - var failDisk []int - - cacheSz := c.estimateCacheSize() - - for _, objInd := range failMem { - var ( - updCacheSz uint64 - overflowInd = -1 - ) - - if objInd < 0 { - // actually, since the overflow was detected in DB tx, the required space could well have been freed, - // but it is easier to consider the entire method atomic - overflowInd = -objInd - 1 // subtract 1 since we decremented index above - } else { - // check if object will overflow write-cache size limit - if updCacheSz = c.incSizeFS(cacheSz); updCacheSz > c.maxCacheSize { - overflowInd = objInd - } - } - - if overflowInd >= 0 { - loop: - for j := range objs[overflowInd:] { - // exclude objects which are already stored in DB - for _, doneMemInd := range doneMem { - if j == doneMemInd { - continue loop - } - } - - failDisk = append(failDisk, j) - } - - break - } - - if uint64(len(objs[objInd].data)) > c.maxObjectSize { - failDisk = append(failDisk, objInd) - continue - } - - err := c.fsTree.Put(objs[objInd].obj.Address(), objs[objInd].data) - if err != nil { - failDisk = append(failDisk, objInd) - } else { - storagelog.Write(c.log, storagelog.AddressField(objs[objInd].addr), storagelog.OpField("fstree PUT")) - - // update cache size - cacheSz = updCacheSz - c.objCounters.IncFS() - } + for i := 0; i < overflowIndex; i++ { + storagelog.Write(c.log, storagelog.AddressField(objs[i].addr), storagelog.OpField("db PUT")) + c.objCounters.IncDB() + c.flushed.Add(objs[i].addr, true) } - return failDisk + c.addToFlushQueue(objs, overflowIndex) } -// persistObjects tries to write objects from memory to the persistent storage. -// If tryCache is false, writing skips cache and is done directly to the main storage. -func (c *cache) persistObjects(objs []objectInfo) { - toDisk := c.persistToCache(objs) - j := 0 - - for i := range objs { - ch := c.metaCh - if j < len(toDisk) { - if i == toDisk[j] { - ch = c.directCh - } else { - for ; j < len(toDisk) && i > toDisk[j]; j++ { - } - } +// persistBigObject writes object to FSTree and pushes it to the flush workers queue. +func (c *cache) persistBigObject(objInfo objectInfo) { + cacheSz := c.estimateCacheSize() + metaIndex := 0 + if c.incSizeFS(cacheSz) <= c.maxCacheSize { + err := c.fsTree.Put(objInfo.obj.Address(), objInfo.data) + if err == nil { + metaIndex = 1 + c.objCounters.IncFS() + storagelog.Write(c.log, storagelog.AddressField(objInfo.addr), storagelog.OpField("fstree PUT")) } + } + c.addToFlushQueue([]objectInfo{objInfo}, metaIndex) +} +// addToFlushQueue pushes objects to the flush workers queue. +// For objects below metaIndex only meta information will be flushed. +func (c *cache) addToFlushQueue(objs []objectInfo, metaIndex int) { + for i := 0; i < metaIndex; i++ { select { - case ch <- objs[j].obj: + case c.metaCh <- objs[i].obj: + case <-c.closeCh: + return + } + } + for i := metaIndex; i < len(objs); i++ { + select { + case c.directCh <- objs[i].obj: case <-c.closeCh: return } diff --git a/pkg/local_object_storage/writecache/put.go b/pkg/local_object_storage/writecache/put.go index 588cdd20c..cfe0b0e7d 100644 --- a/pkg/local_object_storage/writecache/put.go +++ b/pkg/local_object_storage/writecache/put.go @@ -30,7 +30,7 @@ func (c *cache) Put(o *object.Object) error { c.mtx.Lock() - if sz < c.smallObjectSize && c.curMemSize+sz <= c.maxMemSize { + if sz <= c.smallObjectSize && c.curMemSize+sz <= c.maxMemSize { c.curMemSize += sz c.mem = append(c.mem, oi) @@ -43,6 +43,10 @@ func (c *cache) Put(o *object.Object) error { c.mtx.Unlock() - c.persistObjects([]objectInfo{oi}) + if sz <= c.smallObjectSize { + c.persistSmallObjects([]objectInfo{oi}) + } else { + c.persistBigObject(oi) + } return nil } diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go index 80842317b..51d3fea21 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -117,7 +117,6 @@ func (c *cache) deleteFromDB(keys [][]byte) error { if err != nil { return err } - c.dbSize.Sub(sz) c.objCounters.DecDB() return nil } diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go index 1c4afa06e..03f873c43 100644 --- a/pkg/local_object_storage/writecache/writecache.go +++ b/pkg/local_object_storage/writecache/writecache.go @@ -6,7 +6,6 @@ import ( objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" - "go.uber.org/atomic" "go.uber.org/zap" ) @@ -44,8 +43,6 @@ type cache struct { evictCh chan []byte // store contains underlying database. store - // dbSize stores approximate database size. It is updated every flush/persist cycle. - dbSize atomic.Uint64 // fsTree contains big files stored directly on file-system. fsTree *fstree.FSTree }