diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index cf9216a14..cb368946a 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -5,6 +5,7 @@ import ( "github.com/mr-tron/base58" "github.com/nspcc-dev/neo-go/pkg/util/slice" + objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-sdk-go/object" @@ -110,13 +111,9 @@ func (c *cache) flush() { break } - c.evictObjects(len(m)) - for i := range m { - c.flushed.Add(m[i].addr, true) - } c.modeMtx.RUnlock() - c.log.Debug("flushed items from write-cache", + c.log.Debug("tried to flush items from write-cache", zap.Int("count", len(m)), zap.String("start", base58.Encode(lastKey))) } @@ -172,7 +169,7 @@ func (c *cache) flushBigObjects() { } // mark object as flushed - c.store.flushed.Add(sAddr, false) + c.flushed.Add(sAddr, false) evictNum++ @@ -181,8 +178,6 @@ func (c *cache) flushBigObjects() { _, _ = c.fsTree.Iterate(prm) - // evict objects which were successfully written to BlobStor - c.evictObjects(evictNum) c.modeMtx.RUnlock() case <-c.closeCh: return @@ -208,6 +203,8 @@ func (c *cache) flushWorker(_ int) { err := c.flushObject(obj) if err != nil { c.log.Error("can't flush object to the main storage", zap.Error(err)) + } else { + c.flushed.Add(objectCore.AddressOf(obj).EncodeToString(), true) } } } diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go index ed7370723..8992ffa1a 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -22,9 +22,15 @@ import ( type store struct { flushed simplelru.LRUCache db *bbolt.DB + + dbKeysToRemove []string + fsKeysToRemove []string } -const lruKeysCount = 256 * 1024 * 8 +const ( + maxFlushedMarksCount = 256 * 1024 * 8 + maxRemoveBatchSize = maxFlushedMarksCount / 4 +) const dbName = "small.bolt" @@ -64,98 +70,83 @@ func (c *cache) openStore(readOnly bool) error { // Write-cache can be opened multiple times during `SetMode`. // flushed map must not be re-created in this case. if c.flushed == nil { - c.flushed, _ = lru.New(lruKeysCount) + c.flushed, _ = lru.NewWithEvict(maxFlushedMarksCount, c.removeFlushed) } return nil } -func (s *store) removeFlushedKeys(n int) ([][]byte, [][]byte) { - var keysMem, keysDisk [][]byte - for i := 0; i < n; i++ { - k, v, ok := s.flushed.RemoveOldest() - if !ok { - break - } - - if v.(bool) { - keysMem = append(keysMem, []byte(k.(string))) - } else { - keysDisk = append(keysDisk, []byte(k.(string))) - } +// removeFlushed removes an object from the writecache. +// To minimize interference with the client operations, the actual removal +// is done in batches. +// It is not thread-safe and is used only as an evict callback to LRU cache. +func (c *cache) removeFlushed(key, value interface{}) { + fromDatabase := value.(bool) + if fromDatabase { + c.dbKeysToRemove = append(c.dbKeysToRemove, key.(string)) + } else { + c.fsKeysToRemove = append(c.fsKeysToRemove, key.(string)) } - return keysMem, keysDisk -} - -func (c *cache) evictObjects(putCount int) { - sum := c.flushed.Len() + putCount - if sum <= lruKeysCount { - return - } - - keysMem, keysDisk := c.store.removeFlushedKeys(sum - lruKeysCount) - - if err := c.deleteFromDB(keysMem); err != nil { - c.log.Error("error while removing objects from write-cache (database)", zap.Error(err)) - } - - if err := c.deleteFromDisk(keysDisk); err != nil { - c.log.Error("error while removing objects from write-cache (disk)", zap.Error(err)) + if len(c.dbKeysToRemove)+len(c.fsKeysToRemove) >= maxRemoveBatchSize { + c.dbKeysToRemove = c.deleteFromDB(c.dbKeysToRemove) + c.fsKeysToRemove = c.deleteFromDisk(c.fsKeysToRemove) } } -func (c *cache) deleteFromDB(keys [][]byte) error { +func (c *cache) deleteFromDB(keys []string) []string { if len(keys) == 0 { - return nil + return keys } - err := c.db.Update(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - for i := range keys { - has := b.Get(keys[i]) - if has == nil { - var errNotFound apistatus.ObjectNotFound - return errNotFound - } - if err := b.Delete(keys[i]); err != nil { + var errorIndex int + err := c.db.Batch(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + for errorIndex = range keys { + if err := b.Delete([]byte(keys[errorIndex])); err != nil { return err } - storagelog.Write(c.log, storagelog.AddressField(string(keys[i])), storagelog.OpField("db DELETE")) } return nil }) - if err != nil { - return err - } - for range keys { + for i := 0; i < errorIndex; i++ { c.objCounters.DecDB() + storagelog.Write(c.log, storagelog.AddressField(keys[i]), storagelog.OpField("db DELETE")) } - return nil + if err != nil { + c.log.Error("can't remove objects from the database", zap.Error(err)) + } + + copy(keys, keys[errorIndex:]) + return keys[:len(keys)-errorIndex] } -func (c *cache) deleteFromDisk(keys [][]byte) error { - var lastErr error +func (c *cache) deleteFromDisk(keys []string) []string { + if len(keys) == 0 { + return keys + } + var copyIndex int var addr oid.Address for i := range keys { - addrStr := string(keys[i]) - - if err := addr.DecodeString(addrStr); err != nil { - c.log.Error("can't parse address", zap.String("address", addrStr)) + if err := addr.DecodeString(keys[i]); err != nil { + c.log.Error("can't parse address", zap.String("address", keys[i])) continue } _, err := c.fsTree.Delete(common.DeletePrm{Address: addr}) if err != nil && !errors.As(err, new(apistatus.ObjectNotFound)) { - lastErr = err c.log.Error("can't remove object from write-cache", zap.Error(err)) + + // Save the key for the next iteration. + keys[copyIndex] = keys[i] + copyIndex++ continue } else if err == nil { - storagelog.Write(c.log, storagelog.AddressField(string(keys[i])), storagelog.OpField("fstree DELETE")) + storagelog.Write(c.log, storagelog.AddressField(keys[i]), storagelog.OpField("fstree DELETE")) c.objCounters.DecFS() } } - return lastErr + return keys[:copyIndex] }