forked from TrueCloudLab/frostfs-node
20abdaeed4
Set flush mark in the inside the flush worker because writing to the blobstor can fail. Because each evicted object must be deleted, it is reasonable to do this in the evict callback. The evict callback is protected by LRU mutex and thus potentially interferes with `Get` and `Iterate` methods. This problem will be addressed in the future. Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
152 lines
3.8 KiB
Go
152 lines
3.8 KiB
Go
package writecache
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
|
|
lru "github.com/hashicorp/golang-lru"
|
|
"github.com/hashicorp/golang-lru/simplelru"
|
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
|
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
|
|
storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log"
|
|
"github.com/nspcc-dev/neofs-node/pkg/util"
|
|
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
|
"go.etcd.io/bbolt"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// store represents persistent storage with in-memory LRU cache
|
|
// for flushed items on top of it.
|
|
type store struct {
|
|
flushed simplelru.LRUCache
|
|
db *bbolt.DB
|
|
|
|
dbKeysToRemove []string
|
|
fsKeysToRemove []string
|
|
}
|
|
|
|
const (
|
|
maxFlushedMarksCount = 256 * 1024 * 8
|
|
maxRemoveBatchSize = maxFlushedMarksCount / 4
|
|
)
|
|
|
|
const dbName = "small.bolt"
|
|
|
|
func (c *cache) openStore(readOnly bool) error {
|
|
err := util.MkdirAllX(c.path, os.ModePerm)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
c.db, err = OpenDB(c.path, readOnly)
|
|
if err != nil {
|
|
return fmt.Errorf("could not open database: %w", err)
|
|
}
|
|
|
|
c.db.MaxBatchSize = c.maxBatchSize
|
|
c.db.MaxBatchDelay = c.maxBatchDelay
|
|
|
|
if !readOnly {
|
|
err = c.db.Update(func(tx *bbolt.Tx) error {
|
|
_, err := tx.CreateBucketIfNotExists(defaultBucket)
|
|
return err
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("could not create default bucket: %w", err)
|
|
}
|
|
}
|
|
|
|
c.fsTree = &fstree.FSTree{
|
|
Info: fstree.Info{
|
|
Permissions: os.ModePerm,
|
|
RootPath: c.path,
|
|
},
|
|
Depth: 1,
|
|
DirNameLen: 1,
|
|
}
|
|
|
|
// Write-cache can be opened multiple times during `SetMode`.
|
|
// flushed map must not be re-created in this case.
|
|
if c.flushed == nil {
|
|
c.flushed, _ = lru.NewWithEvict(maxFlushedMarksCount, c.removeFlushed)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// removeFlushed removes an object from the writecache.
|
|
// To minimize interference with the client operations, the actual removal
|
|
// is done in batches.
|
|
// It is not thread-safe and is used only as an evict callback to LRU cache.
|
|
func (c *cache) removeFlushed(key, value interface{}) {
|
|
fromDatabase := value.(bool)
|
|
if fromDatabase {
|
|
c.dbKeysToRemove = append(c.dbKeysToRemove, key.(string))
|
|
} else {
|
|
c.fsKeysToRemove = append(c.fsKeysToRemove, key.(string))
|
|
}
|
|
|
|
if len(c.dbKeysToRemove)+len(c.fsKeysToRemove) >= maxRemoveBatchSize {
|
|
c.dbKeysToRemove = c.deleteFromDB(c.dbKeysToRemove)
|
|
c.fsKeysToRemove = c.deleteFromDisk(c.fsKeysToRemove)
|
|
}
|
|
}
|
|
|
|
func (c *cache) deleteFromDB(keys []string) []string {
|
|
if len(keys) == 0 {
|
|
return keys
|
|
}
|
|
|
|
var errorIndex int
|
|
err := c.db.Batch(func(tx *bbolt.Tx) error {
|
|
b := tx.Bucket(defaultBucket)
|
|
for errorIndex = range keys {
|
|
if err := b.Delete([]byte(keys[errorIndex])); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
for i := 0; i < errorIndex; i++ {
|
|
c.objCounters.DecDB()
|
|
storagelog.Write(c.log, storagelog.AddressField(keys[i]), storagelog.OpField("db DELETE"))
|
|
}
|
|
if err != nil {
|
|
c.log.Error("can't remove objects from the database", zap.Error(err))
|
|
}
|
|
|
|
copy(keys, keys[errorIndex:])
|
|
return keys[:len(keys)-errorIndex]
|
|
}
|
|
|
|
func (c *cache) deleteFromDisk(keys []string) []string {
|
|
if len(keys) == 0 {
|
|
return keys
|
|
}
|
|
|
|
var copyIndex int
|
|
var addr oid.Address
|
|
|
|
for i := range keys {
|
|
if err := addr.DecodeString(keys[i]); err != nil {
|
|
c.log.Error("can't parse address", zap.String("address", keys[i]))
|
|
continue
|
|
}
|
|
|
|
_, err := c.fsTree.Delete(common.DeletePrm{Address: addr})
|
|
if err != nil && !errors.As(err, new(apistatus.ObjectNotFound)) {
|
|
c.log.Error("can't remove object from write-cache", zap.Error(err))
|
|
|
|
// Save the key for the next iteration.
|
|
keys[copyIndex] = keys[i]
|
|
copyIndex++
|
|
continue
|
|
} else if err == nil {
|
|
storagelog.Write(c.log, storagelog.AddressField(keys[i]), storagelog.OpField("fstree DELETE"))
|
|
c.objCounters.DecFS()
|
|
}
|
|
}
|
|
|
|
return keys[:copyIndex]
|
|
}
|