forked from TrueCloudLab/frostfs-node
[#776] writecache: Limit size of used disk space
There is a need to limit disk space used by write-cache. It is almost impossible to calculate the value exactly. It is proposed to estimate the size of the cache by the number of objects stored in it. Track amounts of objects saved in DB and FSTree separately. To do this, `ObjectCounters` interface is defined. It is generalized to a store of numbers that can be made persistent (new option `WithObjectCounters`). By default DB number is calculated as key number in default bucket, and FS number is set same to DB since it is currently hard to read the actual value from `FSTree` instance. Each PUT/DELETE operation to DB or FS increases/decreases corresponding counter. Before each PUT op an overflow check is performed with the following formula for evaluating the occupied space: `NumDB * MaxDBSize + NumFS * MaxFSSize`. If next PUT can cause write-cache overflow, object is written to the main storage. By default maximum write-cache size is set to 1GB. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
0a130177d6
commit
a1696a81b6
6 changed files with 203 additions and 10 deletions
|
@ -54,18 +54,28 @@ func (c *cache) persistLoop() {
|
|||
|
||||
func (c *cache) persistToCache(objs []objectInfo) []int {
|
||||
var (
|
||||
failMem []int
|
||||
failMem []int // some index is negative => all objects starting from it will overflow the cache
|
||||
doneMem []int
|
||||
)
|
||||
var sz uint64
|
||||
err := c.db.Update(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
cacheSz := c.estimateCacheSize()
|
||||
for i := range objs {
|
||||
if uint64(len(objs[i].data)) >= c.smallObjectSize {
|
||||
failMem = append(failMem, i)
|
||||
continue
|
||||
}
|
||||
|
||||
// check if object will overflow write-cache size limit
|
||||
updCacheSz := c.incSizeDB(cacheSz)
|
||||
if updCacheSz > c.maxCacheSize {
|
||||
// set negative index. We decrement index to cover 0 val (overflow is practically impossible)
|
||||
failMem = append(failMem, -i-1)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
err := b.Put([]byte(objs[i].addr), objs[i].data)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -73,6 +83,10 @@ func (c *cache) persistToCache(objs []objectInfo) []int {
|
|||
sz += uint64(len(objs[i].data))
|
||||
doneMem = append(doneMem, i)
|
||||
storagelog.Write(c.log, storagelog.AddressField(objs[i].addr), storagelog.OpField("db PUT"))
|
||||
|
||||
// update cache size
|
||||
cacheSz = updCacheSz
|
||||
c.objCounters.IncDB()
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
@ -88,17 +102,55 @@ func (c *cache) persistToCache(objs []objectInfo) []int {
|
|||
|
||||
var failDisk []int
|
||||
|
||||
for _, i := range failMem {
|
||||
if uint64(len(objs[i].data)) > c.maxObjectSize {
|
||||
failDisk = append(failDisk, i)
|
||||
cacheSz := c.estimateCacheSize()
|
||||
|
||||
for _, objInd := range failMem {
|
||||
var (
|
||||
updCacheSz uint64
|
||||
overflowInd = -1
|
||||
)
|
||||
|
||||
if objInd < 0 {
|
||||
// actually, since the overflow was detected in DB tx, the required space could well have been freed,
|
||||
// but it is easier to consider the entire method atomic
|
||||
overflowInd = -objInd - 1 // subtract 1 since we decremented index above
|
||||
} else {
|
||||
// check if object will overflow write-cache size limit
|
||||
if updCacheSz = c.incSizeFS(cacheSz); updCacheSz > c.maxCacheSize {
|
||||
overflowInd = objInd
|
||||
}
|
||||
}
|
||||
|
||||
if overflowInd >= 0 {
|
||||
loop:
|
||||
for j := range objs[overflowInd:] {
|
||||
// exclude objects which are already stored in DB
|
||||
for _, doneMemInd := range doneMem {
|
||||
if j == doneMemInd {
|
||||
continue loop
|
||||
}
|
||||
}
|
||||
|
||||
failDisk = append(failDisk, j)
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
if uint64(len(objs[objInd].data)) > c.maxObjectSize {
|
||||
failDisk = append(failDisk, objInd)
|
||||
continue
|
||||
}
|
||||
|
||||
err := c.fsTree.Put(objs[i].obj.Address(), objs[i].data)
|
||||
err := c.fsTree.Put(objs[objInd].obj.Address(), objs[objInd].data)
|
||||
if err != nil {
|
||||
failDisk = append(failDisk, i)
|
||||
failDisk = append(failDisk, objInd)
|
||||
} else {
|
||||
storagelog.Write(c.log, storagelog.AddressField(objs[i].addr), storagelog.OpField("fstree PUT"))
|
||||
storagelog.Write(c.log, storagelog.AddressField(objs[objInd].addr), storagelog.OpField("fstree PUT"))
|
||||
|
||||
// update cache size
|
||||
cacheSz = updCacheSz
|
||||
c.objCounters.IncFS()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue