frostfs-node/pkg/local_object_storage/writecache/persist.go
Leonard Lyubich a1696a81b6 [#776] writecache: Limit size of used disk space
There is a need to limit disk space used by write-cache. It is almost
impossible to calculate the value exactly. It is proposed to estimate the
size of the cache by the number of objects stored in it.

Track amounts of objects saved in DB and FSTree separately. To do this,
`ObjectCounters` interface is defined. It is generalized to a store of
numbers that can be made persistent (new option `WithObjectCounters`). By
default DB number is calculated as key number in default bucket, and FS
number is set same to DB since it is currently hard to read the actual value
from `FSTree` instance. Each PUT/DELETE operation to DB or FS
increases/decreases corresponding counter. Before each PUT op an overflow
check is performed with the following formula for evaluating the occupied
space: `NumDB * MaxDBSize + NumFS * MaxFSSize`. If next PUT can cause
write-cache overflow, object is written to the main storage.

By default maximum write-cache size is set to 1GB.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
2021-09-15 18:07:36 +03:00

183 lines
4.1 KiB
Go

package writecache
import (
"sort"
"time"
storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
const defaultPersistInterval = time.Second
// persistLoop persists object accumulated in memory to the database.
func (c *cache) persistLoop() {
tick := time.NewTicker(defaultPersistInterval)
defer tick.Stop()
for {
select {
case <-tick.C:
c.mtx.RLock()
m := c.mem
c.mtx.RUnlock()
sort.Slice(m, func(i, j int) bool { return m[i].addr < m[j].addr })
start := time.Now()
c.persistObjects(m)
c.log.Debug("persisted items to disk",
zap.Duration("took", time.Since(start)),
zap.Int("total", len(m)))
for i := range m {
storagelog.Write(c.log,
storagelog.AddressField(m[i].addr),
storagelog.OpField("in-mem DELETE persist"),
)
}
c.mtx.Lock()
c.curMemSize = 0
n := copy(c.mem, c.mem[len(m):])
c.mem = c.mem[:n]
for i := range c.mem {
c.curMemSize += uint64(len(c.mem[i].data))
}
c.mtx.Unlock()
case <-c.closeCh:
return
}
}
}
func (c *cache) persistToCache(objs []objectInfo) []int {
var (
failMem []int // some index is negative => all objects starting from it will overflow the cache
doneMem []int
)
var sz uint64
err := c.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
cacheSz := c.estimateCacheSize()
for i := range objs {
if uint64(len(objs[i].data)) >= c.smallObjectSize {
failMem = append(failMem, i)
continue
}
// check if object will overflow write-cache size limit
updCacheSz := c.incSizeDB(cacheSz)
if updCacheSz > c.maxCacheSize {
// set negative index. We decrement index to cover 0 val (overflow is practically impossible)
failMem = append(failMem, -i-1)
return nil
}
err := b.Put([]byte(objs[i].addr), objs[i].data)
if err != nil {
return err
}
sz += uint64(len(objs[i].data))
doneMem = append(doneMem, i)
storagelog.Write(c.log, storagelog.AddressField(objs[i].addr), storagelog.OpField("db PUT"))
// update cache size
cacheSz = updCacheSz
c.objCounters.IncDB()
}
return nil
})
if err == nil {
c.dbSize.Add(sz)
}
if len(doneMem) > 0 {
c.evictObjects(len(doneMem))
for _, i := range doneMem {
c.flushed.Add(objs[i].addr, true)
}
}
var failDisk []int
cacheSz := c.estimateCacheSize()
for _, objInd := range failMem {
var (
updCacheSz uint64
overflowInd = -1
)
if objInd < 0 {
// actually, since the overflow was detected in DB tx, the required space could well have been freed,
// but it is easier to consider the entire method atomic
overflowInd = -objInd - 1 // subtract 1 since we decremented index above
} else {
// check if object will overflow write-cache size limit
if updCacheSz = c.incSizeFS(cacheSz); updCacheSz > c.maxCacheSize {
overflowInd = objInd
}
}
if overflowInd >= 0 {
loop:
for j := range objs[overflowInd:] {
// exclude objects which are already stored in DB
for _, doneMemInd := range doneMem {
if j == doneMemInd {
continue loop
}
}
failDisk = append(failDisk, j)
}
break
}
if uint64(len(objs[objInd].data)) > c.maxObjectSize {
failDisk = append(failDisk, objInd)
continue
}
err := c.fsTree.Put(objs[objInd].obj.Address(), objs[objInd].data)
if err != nil {
failDisk = append(failDisk, objInd)
} else {
storagelog.Write(c.log, storagelog.AddressField(objs[objInd].addr), storagelog.OpField("fstree PUT"))
// update cache size
cacheSz = updCacheSz
c.objCounters.IncFS()
}
}
return failDisk
}
// persistObjects tries to write objects from memory to the persistent storage.
// If tryCache is false, writing skips cache and is done directly to the main storage.
func (c *cache) persistObjects(objs []objectInfo) {
toDisk := c.persistToCache(objs)
j := 0
for i := range objs {
ch := c.metaCh
if j < len(toDisk) {
if i == toDisk[j] {
ch = c.directCh
} else {
for ; j < len(toDisk) && i > toDisk[j]; j++ {
}
}
}
select {
case ch <- objs[j].obj:
case <-c.closeCh:
return
}
}
}