[#1745] writecache: Set flush mark only on success

Set flush mark in the inside the flush worker because writing to the blobstor
can fail. Because each evicted object must be deleted, it is reasonable
to do this in the evict callback.

The evict callback is protected by LRU mutex and thus potentially interferes
with `Get` and `Iterate` methods. This problem will be addressed in the
future.

Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
This commit is contained in:
Evgenii Stratonikov 2022-08-31 12:20:43 +03:00 committed by fyrchik
parent 82839cb1c9
commit 20abdaeed4
2 changed files with 54 additions and 66 deletions

View file

@ -5,6 +5,7 @@ import (
"github.com/mr-tron/base58"
"github.com/nspcc-dev/neo-go/pkg/util/slice"
objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/nspcc-dev/neofs-sdk-go/object"
@ -110,13 +111,9 @@ func (c *cache) flush() {
break
}
c.evictObjects(len(m))
for i := range m {
c.flushed.Add(m[i].addr, true)
}
c.modeMtx.RUnlock()
c.log.Debug("flushed items from write-cache",
c.log.Debug("tried to flush items from write-cache",
zap.Int("count", len(m)),
zap.String("start", base58.Encode(lastKey)))
}
@ -172,7 +169,7 @@ func (c *cache) flushBigObjects() {
}
// mark object as flushed
c.store.flushed.Add(sAddr, false)
c.flushed.Add(sAddr, false)
evictNum++
@ -181,8 +178,6 @@ func (c *cache) flushBigObjects() {
_, _ = c.fsTree.Iterate(prm)
// evict objects which were successfully written to BlobStor
c.evictObjects(evictNum)
c.modeMtx.RUnlock()
case <-c.closeCh:
return
@ -208,6 +203,8 @@ func (c *cache) flushWorker(_ int) {
err := c.flushObject(obj)
if err != nil {
c.log.Error("can't flush object to the main storage", zap.Error(err))
} else {
c.flushed.Add(objectCore.AddressOf(obj).EncodeToString(), true)
}
}
}

View file

@ -22,9 +22,15 @@ import (
type store struct {
flushed simplelru.LRUCache
db *bbolt.DB
dbKeysToRemove []string
fsKeysToRemove []string
}
const lruKeysCount = 256 * 1024 * 8
const (
maxFlushedMarksCount = 256 * 1024 * 8
maxRemoveBatchSize = maxFlushedMarksCount / 4
)
const dbName = "small.bolt"
@ -64,98 +70,83 @@ func (c *cache) openStore(readOnly bool) error {
// Write-cache can be opened multiple times during `SetMode`.
// flushed map must not be re-created in this case.
if c.flushed == nil {
c.flushed, _ = lru.New(lruKeysCount)
c.flushed, _ = lru.NewWithEvict(maxFlushedMarksCount, c.removeFlushed)
}
return nil
}
func (s *store) removeFlushedKeys(n int) ([][]byte, [][]byte) {
var keysMem, keysDisk [][]byte
for i := 0; i < n; i++ {
k, v, ok := s.flushed.RemoveOldest()
if !ok {
break
}
if v.(bool) {
keysMem = append(keysMem, []byte(k.(string)))
// removeFlushed removes an object from the writecache.
// To minimize interference with the client operations, the actual removal
// is done in batches.
// It is not thread-safe and is used only as an evict callback to LRU cache.
func (c *cache) removeFlushed(key, value interface{}) {
fromDatabase := value.(bool)
if fromDatabase {
c.dbKeysToRemove = append(c.dbKeysToRemove, key.(string))
} else {
keysDisk = append(keysDisk, []byte(k.(string)))
c.fsKeysToRemove = append(c.fsKeysToRemove, key.(string))
}
if len(c.dbKeysToRemove)+len(c.fsKeysToRemove) >= maxRemoveBatchSize {
c.dbKeysToRemove = c.deleteFromDB(c.dbKeysToRemove)
c.fsKeysToRemove = c.deleteFromDisk(c.fsKeysToRemove)
}
}
return keysMem, keysDisk
}
func (c *cache) evictObjects(putCount int) {
sum := c.flushed.Len() + putCount
if sum <= lruKeysCount {
return
}
keysMem, keysDisk := c.store.removeFlushedKeys(sum - lruKeysCount)
if err := c.deleteFromDB(keysMem); err != nil {
c.log.Error("error while removing objects from write-cache (database)", zap.Error(err))
}
if err := c.deleteFromDisk(keysDisk); err != nil {
c.log.Error("error while removing objects from write-cache (disk)", zap.Error(err))
}
}
func (c *cache) deleteFromDB(keys [][]byte) error {
func (c *cache) deleteFromDB(keys []string) []string {
if len(keys) == 0 {
return nil
return keys
}
err := c.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
for i := range keys {
has := b.Get(keys[i])
if has == nil {
var errNotFound apistatus.ObjectNotFound
return errNotFound
}
if err := b.Delete(keys[i]); err != nil {
var errorIndex int
err := c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
for errorIndex = range keys {
if err := b.Delete([]byte(keys[errorIndex])); err != nil {
return err
}
storagelog.Write(c.log, storagelog.AddressField(string(keys[i])), storagelog.OpField("db DELETE"))
}
return nil
})
if err != nil {
return err
}
for range keys {
for i := 0; i < errorIndex; i++ {
c.objCounters.DecDB()
storagelog.Write(c.log, storagelog.AddressField(keys[i]), storagelog.OpField("db DELETE"))
}
return nil
if err != nil {
c.log.Error("can't remove objects from the database", zap.Error(err))
}
func (c *cache) deleteFromDisk(keys [][]byte) error {
var lastErr error
copy(keys, keys[errorIndex:])
return keys[:len(keys)-errorIndex]
}
func (c *cache) deleteFromDisk(keys []string) []string {
if len(keys) == 0 {
return keys
}
var copyIndex int
var addr oid.Address
for i := range keys {
addrStr := string(keys[i])
if err := addr.DecodeString(addrStr); err != nil {
c.log.Error("can't parse address", zap.String("address", addrStr))
if err := addr.DecodeString(keys[i]); err != nil {
c.log.Error("can't parse address", zap.String("address", keys[i]))
continue
}
_, err := c.fsTree.Delete(common.DeletePrm{Address: addr})
if err != nil && !errors.As(err, new(apistatus.ObjectNotFound)) {
lastErr = err
c.log.Error("can't remove object from write-cache", zap.Error(err))
// Save the key for the next iteration.
keys[copyIndex] = keys[i]
copyIndex++
continue
} else if err == nil {
storagelog.Write(c.log, storagelog.AddressField(string(keys[i])), storagelog.OpField("fstree DELETE"))
storagelog.Write(c.log, storagelog.AddressField(keys[i]), storagelog.OpField("fstree DELETE"))
c.objCounters.DecFS()
}
}
return lastErr
return keys[:copyIndex]
}