a1696a81b6
There is a need to limit disk space used by write-cache. It is almost impossible to calculate the value exactly. It is proposed to estimate the size of the cache by the number of objects stored in it. Track amounts of objects saved in DB and FSTree separately. To do this, `ObjectCounters` interface is defined. It is generalized to a store of numbers that can be made persistent (new option `WithObjectCounters`). By default DB number is calculated as key number in default bucket, and FS number is set same to DB since it is currently hard to read the actual value from `FSTree` instance. Each PUT/DELETE operation to DB or FS increases/decreases corresponding counter. Before each PUT op an overflow check is performed with the following formula for evaluating the occupied space: `NumDB * MaxDBSize + NumFS * MaxFSSize`. If next PUT can cause write-cache overflow, object is written to the main storage. By default maximum write-cache size is set to 1GB. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
148 lines
3.3 KiB
Go
148 lines
3.3 KiB
Go
package writecache
|
|
|
|
import (
|
|
"errors"
|
|
"os"
|
|
"path"
|
|
|
|
lru "github.com/hashicorp/golang-lru"
|
|
"github.com/hashicorp/golang-lru/simplelru"
|
|
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
|
|
storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log"
|
|
"github.com/nspcc-dev/neofs-node/pkg/util"
|
|
"go.etcd.io/bbolt"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// store represents persistent storage with in-memory LRU cache
|
|
// for flushed items on top of it.
|
|
type store struct {
|
|
flushed simplelru.LRUCache
|
|
db *bbolt.DB
|
|
}
|
|
|
|
const lruKeysCount = 256 * 1024 * 8
|
|
|
|
const dbName = "small.bolt"
|
|
|
|
func (c *cache) openStore() error {
|
|
if err := util.MkdirAllX(c.path, os.ModePerm); err != nil {
|
|
return err
|
|
}
|
|
|
|
db, err := bbolt.Open(path.Join(c.path, dbName), os.ModePerm, &bbolt.Options{
|
|
NoFreelistSync: true,
|
|
NoSync: true,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
c.fsTree = &fstree.FSTree{
|
|
Info: fstree.Info{
|
|
Permissions: os.ModePerm,
|
|
RootPath: c.path,
|
|
},
|
|
Depth: 1,
|
|
DirNameLen: 1,
|
|
}
|
|
|
|
_ = db.Update(func(tx *bbolt.Tx) error {
|
|
_, err := tx.CreateBucketIfNotExists(defaultBucket)
|
|
return err
|
|
})
|
|
|
|
c.db = db
|
|
c.flushed, _ = lru.New(lruKeysCount)
|
|
return nil
|
|
}
|
|
|
|
func (s *store) removeFlushedKeys(n int) ([][]byte, [][]byte) {
|
|
var keysMem, keysDisk [][]byte
|
|
for i := 0; i < n; i++ {
|
|
k, v, ok := s.flushed.RemoveOldest()
|
|
if !ok {
|
|
break
|
|
}
|
|
|
|
if v.(bool) {
|
|
keysMem = append(keysMem, []byte(k.(string)))
|
|
} else {
|
|
keysDisk = append(keysDisk, []byte(k.(string)))
|
|
}
|
|
}
|
|
|
|
return keysMem, keysDisk
|
|
}
|
|
|
|
func (c *cache) evictObjects(putCount int) {
|
|
sum := c.flushed.Len() + putCount
|
|
if sum <= lruKeysCount {
|
|
return
|
|
}
|
|
|
|
keysMem, keysDisk := c.store.removeFlushedKeys(sum - lruKeysCount)
|
|
|
|
if err := c.deleteFromDB(keysMem); err != nil {
|
|
c.log.Error("error while removing objects from write-cache (database)", zap.Error(err))
|
|
}
|
|
|
|
if err := c.deleteFromDisk(keysDisk); err != nil {
|
|
c.log.Error("error while removing objects from write-cache (disk)", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
func (c *cache) deleteFromDB(keys [][]byte) error {
|
|
if len(keys) == 0 {
|
|
return nil
|
|
}
|
|
var sz uint64
|
|
err := c.db.Update(func(tx *bbolt.Tx) error {
|
|
b := tx.Bucket(defaultBucket)
|
|
for i := range keys {
|
|
has := b.Get(keys[i])
|
|
if has == nil {
|
|
return object.ErrNotFound
|
|
}
|
|
if err := b.Delete(keys[i]); err != nil {
|
|
return err
|
|
}
|
|
sz += uint64(len(has))
|
|
storagelog.Write(c.log, storagelog.AddressField(string(keys[i])), storagelog.OpField("db DELETE"))
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c.dbSize.Sub(sz)
|
|
c.objCounters.DecDB()
|
|
return nil
|
|
}
|
|
|
|
func (c *cache) deleteFromDisk(keys [][]byte) error {
|
|
var lastErr error
|
|
|
|
for i := range keys {
|
|
addr := objectSDK.NewAddress()
|
|
addrStr := string(keys[i])
|
|
|
|
if err := addr.Parse(addrStr); err != nil {
|
|
c.log.Error("can't parse address", zap.String("address", addrStr))
|
|
continue
|
|
}
|
|
|
|
if err := c.fsTree.Delete(addr); err != nil && !errors.Is(err, fstree.ErrFileNotFound) {
|
|
lastErr = err
|
|
c.log.Error("can't remove object from write-cache", zap.Error(err))
|
|
continue
|
|
} else if err == nil {
|
|
storagelog.Write(c.log, storagelog.AddressField(string(keys[i])), storagelog.OpField("fstree DELETE"))
|
|
c.objCounters.DecFS()
|
|
}
|
|
}
|
|
|
|
return lastErr
|
|
}
|