package writecache import ( "sort" "time" storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log" "go.etcd.io/bbolt" "go.uber.org/zap" ) const defaultPersistInterval = time.Second // persistLoop persists object accumulated in memory to the database. func (c *cache) persistLoop() { tick := time.NewTicker(defaultPersistInterval) defer tick.Stop() for { select { case <-tick.C: c.mtx.RLock() m := c.mem c.mtx.RUnlock() sort.Slice(m, func(i, j int) bool { return m[i].addr < m[j].addr }) start := time.Now() c.persistObjects(m) c.log.Debug("persisted items to disk", zap.Duration("took", time.Since(start)), zap.Int("total", len(m))) for i := range m { storagelog.Write(c.log, storagelog.AddressField(m[i].addr), storagelog.OpField("in-mem DELETE persist"), ) } c.mtx.Lock() c.curMemSize = 0 n := copy(c.mem, c.mem[len(m):]) c.mem = c.mem[:n] for i := range c.mem { c.curMemSize += uint64(len(c.mem[i].data)) } c.mtx.Unlock() case <-c.closeCh: return } } } func (c *cache) persistToCache(objs []objectInfo) []int { var ( failMem []int // some index is negative => all objects starting from it will overflow the cache doneMem []int ) var sz uint64 err := c.db.Update(func(tx *bbolt.Tx) error { b := tx.Bucket(defaultBucket) cacheSz := c.estimateCacheSize() for i := range objs { if uint64(len(objs[i].data)) >= c.smallObjectSize { failMem = append(failMem, i) continue } // check if object will overflow write-cache size limit updCacheSz := c.incSizeDB(cacheSz) if updCacheSz > c.maxCacheSize { // set negative index. We decrement index to cover 0 val (overflow is practically impossible) failMem = append(failMem, -i-1) return nil } err := b.Put([]byte(objs[i].addr), objs[i].data) if err != nil { return err } sz += uint64(len(objs[i].data)) doneMem = append(doneMem, i) storagelog.Write(c.log, storagelog.AddressField(objs[i].addr), storagelog.OpField("db PUT")) // update cache size cacheSz = updCacheSz c.objCounters.IncDB() } return nil }) if err == nil { c.dbSize.Add(sz) } if len(doneMem) > 0 { c.evictObjects(len(doneMem)) for _, i := range doneMem { c.flushed.Add(objs[i].addr, true) } } var failDisk []int cacheSz := c.estimateCacheSize() for _, objInd := range failMem { var ( updCacheSz uint64 overflowInd = -1 ) if objInd < 0 { // actually, since the overflow was detected in DB tx, the required space could well have been freed, // but it is easier to consider the entire method atomic overflowInd = -objInd - 1 // subtract 1 since we decremented index above } else { // check if object will overflow write-cache size limit if updCacheSz = c.incSizeFS(cacheSz); updCacheSz > c.maxCacheSize { overflowInd = objInd } } if overflowInd >= 0 { loop: for j := range objs[overflowInd:] { // exclude objects which are already stored in DB for _, doneMemInd := range doneMem { if j == doneMemInd { continue loop } } failDisk = append(failDisk, j) } break } if uint64(len(objs[objInd].data)) > c.maxObjectSize { failDisk = append(failDisk, objInd) continue } err := c.fsTree.Put(objs[objInd].obj.Address(), objs[objInd].data) if err != nil { failDisk = append(failDisk, objInd) } else { storagelog.Write(c.log, storagelog.AddressField(objs[objInd].addr), storagelog.OpField("fstree PUT")) // update cache size cacheSz = updCacheSz c.objCounters.IncFS() } } return failDisk } // persistObjects tries to write objects from memory to the persistent storage. // If tryCache is false, writing skips cache and is done directly to the main storage. func (c *cache) persistObjects(objs []objectInfo) { toDisk := c.persistToCache(objs) j := 0 for i := range objs { ch := c.metaCh if j < len(toDisk) { if i == toDisk[j] { ch = c.directCh } else { for ; j < len(toDisk) && i > toDisk[j]; j++ { } } } select { case ch <- objs[j].obj: case <-c.closeCh: return } } }