[#561] writecache: Make badger keys counting async #613

Closed
aarifullin wants to merge 1 commit from aarifullin/frostfs-node:feature/561-init_count into master
9 changed files with 291 additions and 23 deletions

View file

@ -286,6 +286,9 @@ const (
ShardCouldNotMarkObjectToDeleteInMetabase = "could not mark object to delete in metabase" ShardCouldNotMarkObjectToDeleteInMetabase = "could not mark object to delete in metabase"
WritecacheBadgerInitExperimental = "initializing badger-backed experimental writecache" WritecacheBadgerInitExperimental = "initializing badger-backed experimental writecache"
WritecacheTriedToFlushItemsFromWritecache = "tried to flush items from write-cache" WritecacheTriedToFlushItemsFromWritecache = "tried to flush items from write-cache"
WritecacheBadgerCouldNotCountKeys = "could not count keys from badger by iteration"
WritecacheBadgerCouldNotFlushKeysCount = "could not flush keys count to badger"
WritecacheBadgerCouldNotDeleteKeysCount = "could not delete keys count from badger"
WritecacheWaitingForChannelsToFlush = "waiting for channels to flush" WritecacheWaitingForChannelsToFlush = "waiting for channels to flush"
WritecacheFillingFlushMarksForObjectsInFSTree = "filling flush marks for objects in FSTree" WritecacheFillingFlushMarksForObjectsInFSTree = "filling flush marks for objects in FSTree"
WritecacheFinishedUpdatingFSTreeFlushMarks = "finished updating FSTree flush marks" WritecacheFinishedUpdatingFSTreeFlushMarks = "finished updating FSTree flush marks"

View file

@ -1,6 +1,7 @@
package writecachebadger package writecachebadger
import ( import (
"encoding/binary"
"sync" "sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -25,6 +26,8 @@ type cache struct {
closeCh chan struct{} closeCh chan struct{}
// wg is a wait group for flush workers. // wg is a wait group for flush workers.
wg sync.WaitGroup wg sync.WaitGroup
// initCounterWG is necessary to wait for asynchronous initialization.
initCounterWG sync.WaitGroup
// store contains underlying database. // store contains underlying database.
store store
} }
@ -100,8 +103,29 @@ func (c *cache) Init() error {
return nil return nil
} }
// flushKeysCount writes the keys count to db if it should be flushed down.
func (c *cache) flushKeysCount() {
// Close may happen after resetting mode
if c.db.IsClosed() {
return
}
if c.objCounters.isReadyToFlush() {
k := keyCountPrefix
v := make([]byte, 8)
binary.LittleEndian.PutUint64(v, c.objCounters.DB())
if err := c.putRaw(k[:], v); err != nil {
c.log.Error(logs.WritecacheBadgerCouldNotFlushKeysCount, zap.Error(err))
}
}
}
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op. // Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
func (c *cache) Close() error { func (c *cache) Close() error {
c.cancelInitCountByCloseAndWait()
c.flushKeysCount()
// We cannot lock mutex for the whole operation duration // We cannot lock mutex for the whole operation duration
// because it is taken by some background workers, so `wg.Wait()` is done without modeMtx. // because it is taken by some background workers, so `wg.Wait()` is done without modeMtx.
c.modeMtx.Lock() c.modeMtx.Lock()
@ -125,5 +149,13 @@ func (c *cache) Close() error {
} }
} }
c.metrics.Close() c.metrics.Close()
return nil return nil
} }
// cancelInitCountByCloseAndWait cancels the counting process if Close is invoked
// and waits for correct finish of goroutine within initCount.
func (c *cache) cancelInitCountByCloseAndWait() {
c.objCounters.cDB.cancelByClose()
c.initCounterWG.Wait()
}

View file

@ -14,6 +14,7 @@ import (
"github.com/dgraph-io/badger/v4" "github.com/dgraph-io/badger/v4"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
) )
// Delete removes object from write-cache. // Delete removes object from write-cache.
@ -68,3 +69,33 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
return metaerr.Wrap(err) return metaerr.Wrap(err)
} }
// deleteRawKey removes object from write-cache by raw-represented key.
//
// Returns an error of type apistatus.ObjectNotFound if object is missing in write-cache.
func (c *cache) deleteRawKey(key []byte) error {
err := c.db.Update(func(tx *badger.Txn) error {
it, err := tx.Get(key)
if err != nil {
if err == badger.ErrKeyNotFound {
return logicerr.Wrap(new(apistatus.ObjectNotFound))
}
return err
}
if it.ValueSize() > 0 {
err := tx.Delete(key)
if err == nil {
storagelog.Write(c.log,
zap.String("key", string(key)),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db DELETE"),
)
c.objCounters.DecDB()
}
return err
}
return nil
})
return metaerr.Wrap(err)
}

View file

@ -93,3 +93,14 @@ func Get(db *badger.DB, key []byte) ([]byte, error) {
return value, metaerr.Wrap(err) return value, metaerr.Wrap(err)
} }
// getRawKey returns raw-data from write-cache.
//
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
func (c *cache) getRawKey(key []byte) ([]byte, error) {
value, err := Get(c.db, key)
if err != nil {
return nil, err
}
return value, nil
}

View file

@ -22,6 +22,8 @@ func (c *cache) SetMode(m mode.Mode) error {
)) ))
defer span.End() defer span.End()
c.cancelInitCountBySetModeAndWait()
c.modeMtx.Lock() c.modeMtx.Lock()
defer c.modeMtx.Unlock() defer c.modeMtx.Unlock()
@ -32,6 +34,13 @@ func (c *cache) SetMode(m mode.Mode) error {
return err return err
} }
// cancelInitCountBySetModeAndWait cancels the counting process if SetMode is invoked
// and waits for correct finish of goroutine within initCount.
func (c *cache) cancelInitCountBySetModeAndWait() {
c.objCounters.cDB.cancelBySetMode()
c.initCounterWG.Wait()
}
// setMode applies new mode. Must be called with cache.modeMtx lock taken. // setMode applies new mode. Must be called with cache.modeMtx lock taken.
func (c *cache) setMode(ctx context.Context, m mode.Mode) error { func (c *cache) setMode(ctx context.Context, m mode.Mode) error {
var err error var err error

View file

@ -34,6 +34,9 @@ type options struct {
metrics writecache.Metrics metrics writecache.Metrics
// gcInterval is the interval duration to run the GC cycle. // gcInterval is the interval duration to run the GC cycle.
gcInterval time.Duration gcInterval time.Duration
// recountKeys indicates if keys must be forcefully counted only by DB iteration. This flag should be
// used if badger contains invalid keys count record.
recountKeys bool
} }
// WithLogger sets logger. // WithLogger sets logger.
@ -108,3 +111,10 @@ func WithGCInterval(d time.Duration) Option {
o.gcInterval = d o.gcInterval = d
} }
} }
// WithRecountKeys sets the recount keys flag.
func WithRecountKeys(recount bool) Option {
return func(o *options) {
o.recountKeys = recount
}
}

View file

@ -8,6 +8,7 @@ import (
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"github.com/dgraph-io/badger/v4"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
) )
@ -80,3 +81,25 @@ func (c *cache) put(obj objectInfo) error {
} }
return err return err
} }
// putRaw persists raw-represent key and value to the write-cache
// database and pushes to the flush workers queue.
func (c *cache) putRaw(key, value []byte) error {
cacheSize := c.estimateCacheSize()
if c.maxCacheSize < c.incSizeDB(cacheSize) {
return writecache.ErrOutOfSpace
}
err := c.db.Update(func(txn *badger.Txn) error {
err := txn.Set(key, value)
return err
})
if err == nil {
storagelog.Write(c.log,
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db PUT"),
)
c.objCounters.IncDB()
}
return err
}

View file

@ -1,11 +1,18 @@
package writecachebadger package writecachebadger
import ( import (
"fmt" "encoding/binary"
"math" "errors"
"sync/atomic" "sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"github.com/dgraph-io/badger/v4" "github.com/dgraph-io/badger/v4"
"go.uber.org/atomic"
"go.uber.org/zap"
)
var (
errCountCancelled = errors.New("counting was cancelled")
) )
func (c *cache) estimateCacheSize() uint64 { func (c *cache) estimateCacheSize() uint64 {
aarifullin marked this conversation as resolved
Review

The file size on disk is still used here, but bbolt uses the number of objects. But if we use a counter of objects, it is not clear how asynchronous counting will help us.

The file size on disk is still used here, but bbolt uses the number of objects. But if we use a counter of objects, it is not clear how asynchronous counting will help us.
Review

You're right, we don't need to use DB() if we want roughly estimate size of db data.

But we cannot use EstimateSize if need the exact number of keys in DB:

// EstimateSize can be used to get rough estimate of data size for a given prefix.

So, we won't be able to count the exact number using EstimateSize and the rough approximation for keys number may mislead if we use it for metrics or other purposes

cc @fyrchik @ale64bit

You're right, we don't need to use `DB()` if we want roughly estimate size of db data. But we cannot use `EstimateSize` if need the exact number of keys in DB: ```golang // EstimateSize can be used to get rough estimate of data size for a given prefix. ``` So, we won't be able to count the exact number using `EstimateSize` and the rough approximation for keys number may mislead if we use it for metrics or other purposes cc @fyrchik @ale64bit
Review

But such rough approximation with size rounding is used for bbolt.

For example, imagine that you are a user and you see that writecache is not being used. You open the metrics and see that the limit has not been reached. Or vice versa.

But such rough approximation with size rounding is used for bbolt. For example, imagine that you are a user and you see that writecache is not being used. You open the metrics and see that the limit has not been reached. Or vice versa.
Review

Probably, we talk about different things. Let me clarify:

DB() gives the exact number of keys currently saved in badger.
We cannot use EstimateSize() to calculate the number of keys: keys = EstimateSize() / EstimateKeySize() - this is incorrect

Probably, we talk about different things. Let me clarify: `DB()` gives the exact number of keys currently saved in badger. We cannot use `EstimateSize()` to calculate the number of keys: `keys = EstimateSize() / EstimateKeySize()` - this is incorrect
@ -18,40 +25,180 @@ func (c *cache) incSizeDB(sz uint64) uint64 {
return sz + c.maxObjectSize return sz + c.maxObjectSize
} }
type countAsync struct {
value atomic.Uint64
// mutable checks whether the value can be incremented/decremented.
mutable bool
// mutMtx is mutex for mutable flag.
mutMtx sync.RWMutex
// init checks whether the value is finally initialized.
init bool
// initMtx is mutex for init flag.
initMtx sync.RWMutex
// modeCh is channel to indicate SetMode has been invoked and
// counting must be cancelled.
modeCh chan struct{}
// modeDoneOnce guarantees that modeCh has been closed only once.
// This may happen if SetMode has been invoked many times.
modeDoneOnce sync.Once
// closeCh is channel to indicate Close has been invoked and
// counting must be cancelled.
closeCh chan struct{}
// closeDoneOnce guarantees that modeCh has been closed only once.
// This may happen if Close has been invoked many times.
closeDoneOnce sync.Once
}
func (c *countAsync) cancelByClose() {
c.closeDoneOnce.Do(func() {
close(c.closeCh)
})
}
func (c *countAsync) cancelBySetMode() {
c.modeDoneOnce.Do(func() {
close(c.modeCh)
})
}
func (c *countAsync) addAndSetAsInit(value uint64) {
c.value.Add(value)
c.initMtx.Lock()
defer c.initMtx.Unlock()
c.init = true
}
func (c *countAsync) isInitialized() bool {
c.initMtx.RLock()
defer c.initMtx.RUnlock()
return c.init
}
func (c *countAsync) isMutable() bool {
c.mutMtx.RLock()
defer c.mutMtx.RUnlock()
return c.mutable
}
func (c *countAsync) setAsMutable() {
c.mutMtx.Lock()
defer c.mutMtx.Unlock()
c.mutable = true
}
func (c *countAsync) reset() {
c.value.Store(0)
c.closeCh = make(chan struct{})
c.closeDoneOnce = sync.Once{}
c.modeCh = make(chan struct{})
c.modeDoneOnce = sync.Once{}
c.initMtx.Lock()
defer c.initMtx.Unlock()
c.init = false
c.mutMtx.Lock()
defer c.mutMtx.Unlock()
c.mutable = false
}
type counters struct { type counters struct {
cDB atomic.Uint64 cDB countAsync
} }
func (x *counters) IncDB() { func (x *counters) IncDB() {
x.cDB.Add(1) if x.cDB.isMutable() {
x.cDB.value.Inc()
}
} }
func (x *counters) DecDB() { func (x *counters) DecDB() {
x.cDB.Add(math.MaxUint64) if x.cDB.isMutable() {
x.cDB.value.Dec()
}
} }
func (x *counters) DB() uint64 { func (x *counters) DB() uint64 {
return x.cDB.Load() if x.cDB.isInitialized() {
return x.cDB.value.Load()
}
return 0
}
func (x *counters) isReadyToFlush() bool {
return x.cDB.isInitialized()
} }
func (c *cache) initCounters() error { func (c *cache) initCounters() error {
var inDB uint64 c.objCounters.cDB.reset()
err := c.db.View(func(tx *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false
it := tx.NewIterator(opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
inDB++
}
return nil
})
if err != nil {
return fmt.Errorf("could not read write-cache DB counter: %w", err)
}
c.objCounters.cDB.Store(inDB) c.initCounterWG.Add(1)
c.metrics.SetActualCounters(inDB, 0) go func() {
defer func() {
c.initCounterWG.Done()
}()
var inDB uint64
if !c.options.recountKeys {
k := keyCountPrefix
cDB, err := c.getRawKey(k[:])
if err == nil {
c.objCounters.cDB.setAsMutable()
inDB = binary.LittleEndian.Uint64(cDB)
c.modeMtx.RLock()
defer c.modeMtx.RUnlock()
if !c.mode.ReadOnly() {
if err = c.deleteRawKey(k[:]); err != nil {
c.log.Error(logs.WritecacheBadgerCouldNotDeleteKeysCount, zap.Error(err))
}
}
return
}
}
// Recount keys if "key_count" has not been found
if err := c.db.View(func(tx *badger.Txn) error {
c.objCounters.cDB.setAsMutable()
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false
it := tx.NewIterator(opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
select {
case <-c.objCounters.cDB.closeCh:
return errCountCancelled
case <-c.objCounters.cDB.modeCh:
return errCountCancelled
default:
inDB++
}
}
return nil
}); err != nil {
c.log.Error(logs.WritecacheBadgerCouldNotCountKeys, zap.Error(err))
return
}
c.metrics.SetActualCounters(inDB, 0)
c.objCounters.cDB.addAndSetAsInit(inDB)
}()
return nil return nil
} }

View file

@ -21,6 +21,8 @@ type store struct {
db *badger.DB db *badger.DB
} }
var keyCountPrefix = []byte("keys_count")
type internalKey [len(cid.ID{}) + len(oid.ID{})]byte type internalKey [len(cid.ID{}) + len(oid.ID{})]byte
func (k internalKey) address() oid.Address { func (k internalKey) address() oid.Address {