diff --git a/internal/logs/logs.go b/internal/logs/logs.go index b826ae08..4d8bffc8 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -286,6 +286,9 @@ const ( ShardCouldNotMarkObjectToDeleteInMetabase = "could not mark object to delete in metabase" WritecacheBadgerInitExperimental = "initializing badger-backed experimental writecache" WritecacheTriedToFlushItemsFromWritecache = "tried to flush items from write-cache" + WritecacheBadgerCouldNotCountKeys = "could not count keys from badger by iteration" + WritecacheBadgerCouldNotFlushKeysCount = "could not flush keys count to badger" + WritecacheBadgerCouldNotDeleteKeysCount = "could not delete keys count from badger" WritecacheWaitingForChannelsToFlush = "waiting for channels to flush" WritecacheFillingFlushMarksForObjectsInFSTree = "filling flush marks for objects in FSTree" WritecacheFinishedUpdatingFSTreeFlushMarks = "finished updating FSTree flush marks" diff --git a/pkg/local_object_storage/writecache/writecachebadger/cachebadger.go b/pkg/local_object_storage/writecache/writecachebadger/cachebadger.go index 837e76a0..a20c2f97 100644 --- a/pkg/local_object_storage/writecache/writecachebadger/cachebadger.go +++ b/pkg/local_object_storage/writecache/writecachebadger/cachebadger.go @@ -1,6 +1,7 @@ package writecachebadger import ( + "encoding/binary" "sync" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" @@ -25,6 +26,8 @@ type cache struct { closeCh chan struct{} // wg is a wait group for flush workers. wg sync.WaitGroup + // initCounterWG is necessary to wait for asynchronous initialization. + initCounterWG sync.WaitGroup // store contains underlying database. store } @@ -100,8 +103,29 @@ func (c *cache) Init() error { return nil } +// flushKeysCount writes the keys count to db if it should be flushed down. +func (c *cache) flushKeysCount() { + // Close may happen after resetting mode + if c.db.IsClosed() { + return + } + + if c.objCounters.isReadyToFlush() { + k := keyCountPrefix + v := make([]byte, 8) + binary.LittleEndian.PutUint64(v, c.objCounters.DB()) + if err := c.putRaw(k[:], v); err != nil { + c.log.Error(logs.WritecacheBadgerCouldNotFlushKeysCount, zap.Error(err)) + } + } +} + // Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op. func (c *cache) Close() error { + c.cancelInitCountByCloseAndWait() + + c.flushKeysCount() + // We cannot lock mutex for the whole operation duration // because it is taken by some background workers, so `wg.Wait()` is done without modeMtx. c.modeMtx.Lock() @@ -125,5 +149,13 @@ func (c *cache) Close() error { } } c.metrics.Close() + return nil } + +// cancelInitCountByCloseAndWait cancels the counting process if Close is invoked +// and waits for correct finish of goroutine within initCount. +func (c *cache) cancelInitCountByCloseAndWait() { + c.objCounters.cDB.cancelByClose() + c.initCounterWG.Wait() +} diff --git a/pkg/local_object_storage/writecache/writecachebadger/delete.go b/pkg/local_object_storage/writecache/writecachebadger/delete.go index f3737109..c3949ac7 100644 --- a/pkg/local_object_storage/writecache/writecachebadger/delete.go +++ b/pkg/local_object_storage/writecache/writecachebadger/delete.go @@ -14,6 +14,7 @@ import ( "github.com/dgraph-io/badger/v4" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" ) // Delete removes object from write-cache. @@ -68,3 +69,33 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error { return metaerr.Wrap(err) } + +// deleteRawKey removes object from write-cache by raw-represented key. +// +// Returns an error of type apistatus.ObjectNotFound if object is missing in write-cache. +func (c *cache) deleteRawKey(key []byte) error { + err := c.db.Update(func(tx *badger.Txn) error { + it, err := tx.Get(key) + if err != nil { + if err == badger.ErrKeyNotFound { + return logicerr.Wrap(new(apistatus.ObjectNotFound)) + } + return err + } + if it.ValueSize() > 0 { + err := tx.Delete(key) + if err == nil { + storagelog.Write(c.log, + zap.String("key", string(key)), + storagelog.StorageTypeField(wcStorageType), + storagelog.OpField("db DELETE"), + ) + c.objCounters.DecDB() + } + return err + } + return nil + }) + + return metaerr.Wrap(err) +} diff --git a/pkg/local_object_storage/writecache/writecachebadger/get.go b/pkg/local_object_storage/writecache/writecachebadger/get.go index 42403e55..8dc64317 100644 --- a/pkg/local_object_storage/writecache/writecachebadger/get.go +++ b/pkg/local_object_storage/writecache/writecachebadger/get.go @@ -93,3 +93,14 @@ func Get(db *badger.DB, key []byte) ([]byte, error) { return value, metaerr.Wrap(err) } + +// getRawKey returns raw-data from write-cache. +// +// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache. +func (c *cache) getRawKey(key []byte) ([]byte, error) { + value, err := Get(c.db, key) + if err != nil { + return nil, err + } + return value, nil +} diff --git a/pkg/local_object_storage/writecache/writecachebadger/mode.go b/pkg/local_object_storage/writecache/writecachebadger/mode.go index 03d86183..449541de 100644 --- a/pkg/local_object_storage/writecache/writecachebadger/mode.go +++ b/pkg/local_object_storage/writecache/writecachebadger/mode.go @@ -22,6 +22,8 @@ func (c *cache) SetMode(m mode.Mode) error { )) defer span.End() + c.cancelInitCountBySetModeAndWait() + c.modeMtx.Lock() defer c.modeMtx.Unlock() @@ -32,6 +34,13 @@ func (c *cache) SetMode(m mode.Mode) error { return err } +// cancelInitCountBySetModeAndWait cancels the counting process if SetMode is invoked +// and waits for correct finish of goroutine within initCount. +func (c *cache) cancelInitCountBySetModeAndWait() { + c.objCounters.cDB.cancelBySetMode() + c.initCounterWG.Wait() +} + // setMode applies new mode. Must be called with cache.modeMtx lock taken. func (c *cache) setMode(ctx context.Context, m mode.Mode) error { var err error diff --git a/pkg/local_object_storage/writecache/writecachebadger/options.go b/pkg/local_object_storage/writecache/writecachebadger/options.go index 63bfb196..24219e9a 100644 --- a/pkg/local_object_storage/writecache/writecachebadger/options.go +++ b/pkg/local_object_storage/writecache/writecachebadger/options.go @@ -34,6 +34,9 @@ type options struct { metrics writecache.Metrics // gcInterval is the interval duration to run the GC cycle. gcInterval time.Duration + // recountKeys indicates if keys must be forcefully counted only by DB iteration. This flag should be + // used if badger contains invalid keys count record. + recountKeys bool } // WithLogger sets logger. @@ -108,3 +111,10 @@ func WithGCInterval(d time.Duration) Option { o.gcInterval = d } } + +// WithRecountKeys sets the recount keys flag. +func WithRecountKeys(recount bool) Option { + return func(o *options) { + o.recountKeys = recount + } +} diff --git a/pkg/local_object_storage/writecache/writecachebadger/put.go b/pkg/local_object_storage/writecache/writecachebadger/put.go index c03a0d33..04566889 100644 --- a/pkg/local_object_storage/writecache/writecachebadger/put.go +++ b/pkg/local_object_storage/writecache/writecachebadger/put.go @@ -8,6 +8,7 @@ import ( storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + "github.com/dgraph-io/badger/v4" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -80,3 +81,25 @@ func (c *cache) put(obj objectInfo) error { } return err } + +// putRaw persists raw-represent key and value to the write-cache +// database and pushes to the flush workers queue. +func (c *cache) putRaw(key, value []byte) error { + cacheSize := c.estimateCacheSize() + if c.maxCacheSize < c.incSizeDB(cacheSize) { + return writecache.ErrOutOfSpace + } + + err := c.db.Update(func(txn *badger.Txn) error { + err := txn.Set(key, value) + return err + }) + if err == nil { + storagelog.Write(c.log, + storagelog.StorageTypeField(wcStorageType), + storagelog.OpField("db PUT"), + ) + c.objCounters.IncDB() + } + return err +} diff --git a/pkg/local_object_storage/writecache/writecachebadger/state.go b/pkg/local_object_storage/writecache/writecachebadger/state.go index 994dfa3d..59ec796c 100644 --- a/pkg/local_object_storage/writecache/writecachebadger/state.go +++ b/pkg/local_object_storage/writecache/writecachebadger/state.go @@ -1,11 +1,18 @@ package writecachebadger import ( - "fmt" - "math" - "sync/atomic" + "encoding/binary" + "errors" + "sync" + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "github.com/dgraph-io/badger/v4" + "go.uber.org/atomic" + "go.uber.org/zap" +) + +var ( + errCountCancelled = errors.New("counting was cancelled") ) func (c *cache) estimateCacheSize() uint64 { @@ -18,40 +25,180 @@ func (c *cache) incSizeDB(sz uint64) uint64 { return sz + c.maxObjectSize } +type countAsync struct { + value atomic.Uint64 + + // mutable checks whether the value can be incremented/decremented. + mutable bool + + // mutMtx is mutex for mutable flag. + mutMtx sync.RWMutex + + // init checks whether the value is finally initialized. + init bool + + // initMtx is mutex for init flag. + initMtx sync.RWMutex + + // modeCh is channel to indicate SetMode has been invoked and + // counting must be cancelled. + modeCh chan struct{} + + // modeDoneOnce guarantees that modeCh has been closed only once. + // This may happen if SetMode has been invoked many times. + modeDoneOnce sync.Once + + // closeCh is channel to indicate Close has been invoked and + // counting must be cancelled. + closeCh chan struct{} + + // closeDoneOnce guarantees that modeCh has been closed only once. + // This may happen if Close has been invoked many times. + closeDoneOnce sync.Once +} + +func (c *countAsync) cancelByClose() { + c.closeDoneOnce.Do(func() { + close(c.closeCh) + }) +} + +func (c *countAsync) cancelBySetMode() { + c.modeDoneOnce.Do(func() { + close(c.modeCh) + }) +} + +func (c *countAsync) addAndSetAsInit(value uint64) { + c.value.Add(value) + + c.initMtx.Lock() + defer c.initMtx.Unlock() + + c.init = true +} + +func (c *countAsync) isInitialized() bool { + c.initMtx.RLock() + defer c.initMtx.RUnlock() + + return c.init +} + +func (c *countAsync) isMutable() bool { + c.mutMtx.RLock() + defer c.mutMtx.RUnlock() + + return c.mutable +} + +func (c *countAsync) setAsMutable() { + c.mutMtx.Lock() + defer c.mutMtx.Unlock() + + c.mutable = true +} + +func (c *countAsync) reset() { + c.value.Store(0) + + c.closeCh = make(chan struct{}) + c.closeDoneOnce = sync.Once{} + + c.modeCh = make(chan struct{}) + c.modeDoneOnce = sync.Once{} + + c.initMtx.Lock() + defer c.initMtx.Unlock() + c.init = false + + c.mutMtx.Lock() + defer c.mutMtx.Unlock() + c.mutable = false +} + type counters struct { - cDB atomic.Uint64 + cDB countAsync } func (x *counters) IncDB() { - x.cDB.Add(1) + if x.cDB.isMutable() { + x.cDB.value.Inc() + } } func (x *counters) DecDB() { - x.cDB.Add(math.MaxUint64) + if x.cDB.isMutable() { + x.cDB.value.Dec() + } } func (x *counters) DB() uint64 { - return x.cDB.Load() + if x.cDB.isInitialized() { + return x.cDB.value.Load() + } + return 0 +} + +func (x *counters) isReadyToFlush() bool { + return x.cDB.isInitialized() } func (c *cache) initCounters() error { - var inDB uint64 - err := c.db.View(func(tx *badger.Txn) error { - opts := badger.DefaultIteratorOptions - opts.PrefetchValues = false - it := tx.NewIterator(opts) - defer it.Close() - for it.Rewind(); it.Valid(); it.Next() { - inDB++ - } - return nil - }) - if err != nil { - return fmt.Errorf("could not read write-cache DB counter: %w", err) - } + c.objCounters.cDB.reset() - c.objCounters.cDB.Store(inDB) - c.metrics.SetActualCounters(inDB, 0) + c.initCounterWG.Add(1) + go func() { + defer func() { + c.initCounterWG.Done() + }() + + var inDB uint64 + + if !c.options.recountKeys { + k := keyCountPrefix + cDB, err := c.getRawKey(k[:]) + if err == nil { + c.objCounters.cDB.setAsMutable() + inDB = binary.LittleEndian.Uint64(cDB) + + c.modeMtx.RLock() + defer c.modeMtx.RUnlock() + if !c.mode.ReadOnly() { + if err = c.deleteRawKey(k[:]); err != nil { + c.log.Error(logs.WritecacheBadgerCouldNotDeleteKeysCount, zap.Error(err)) + } + } + return + } + } + + // Recount keys if "key_count" has not been found + if err := c.db.View(func(tx *badger.Txn) error { + c.objCounters.cDB.setAsMutable() + opts := badger.DefaultIteratorOptions + opts.PrefetchValues = false + it := tx.NewIterator(opts) + defer it.Close() + for it.Rewind(); it.Valid(); it.Next() { + select { + case <-c.objCounters.cDB.closeCh: + return errCountCancelled + case <-c.objCounters.cDB.modeCh: + return errCountCancelled + default: + inDB++ + } + } + return nil + }); err != nil { + c.log.Error(logs.WritecacheBadgerCouldNotCountKeys, zap.Error(err)) + return + } + + c.metrics.SetActualCounters(inDB, 0) + c.objCounters.cDB.addAndSetAsInit(inDB) + }() return nil } diff --git a/pkg/local_object_storage/writecache/writecachebadger/storage.go b/pkg/local_object_storage/writecache/writecachebadger/storage.go index 9ff54bee..c4665150 100644 --- a/pkg/local_object_storage/writecache/writecachebadger/storage.go +++ b/pkg/local_object_storage/writecache/writecachebadger/storage.go @@ -21,6 +21,8 @@ type store struct { db *badger.DB } +var keyCountPrefix = []byte("keys_count") + type internalKey [len(cid.ID{}) + len(oid.ID{})]byte func (k internalKey) address() oid.Address {