[#561] writecache: Make badger keys counting async #613
9 changed files with 291 additions and 23 deletions
|
@ -286,6 +286,9 @@ const (
|
|||
ShardCouldNotMarkObjectToDeleteInMetabase = "could not mark object to delete in metabase"
|
||||
WritecacheBadgerInitExperimental = "initializing badger-backed experimental writecache"
|
||||
WritecacheTriedToFlushItemsFromWritecache = "tried to flush items from write-cache"
|
||||
WritecacheBadgerCouldNotCountKeys = "could not count keys from badger by iteration"
|
||||
WritecacheBadgerCouldNotFlushKeysCount = "could not flush keys count to badger"
|
||||
WritecacheBadgerCouldNotDeleteKeysCount = "could not delete keys count from badger"
|
||||
WritecacheWaitingForChannelsToFlush = "waiting for channels to flush"
|
||||
WritecacheFillingFlushMarksForObjectsInFSTree = "filling flush marks for objects in FSTree"
|
||||
WritecacheFinishedUpdatingFSTreeFlushMarks = "finished updating FSTree flush marks"
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package writecachebadger
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
|
@ -25,6 +26,8 @@ type cache struct {
|
|||
closeCh chan struct{}
|
||||
// wg is a wait group for flush workers.
|
||||
wg sync.WaitGroup
|
||||
// initCounterWG is necessary to wait for asynchronous initialization.
|
||||
initCounterWG sync.WaitGroup
|
||||
// store contains underlying database.
|
||||
store
|
||||
}
|
||||
|
@ -100,8 +103,29 @@ func (c *cache) Init() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// flushKeysCount writes the keys count to db if it should be flushed down.
|
||||
func (c *cache) flushKeysCount() {
|
||||
// Close may happen after resetting mode
|
||||
if c.db.IsClosed() {
|
||||
return
|
||||
}
|
||||
|
||||
if c.objCounters.isReadyToFlush() {
|
||||
k := keyCountPrefix
|
||||
v := make([]byte, 8)
|
||||
binary.LittleEndian.PutUint64(v, c.objCounters.DB())
|
||||
if err := c.putRaw(k[:], v); err != nil {
|
||||
c.log.Error(logs.WritecacheBadgerCouldNotFlushKeysCount, zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
|
||||
func (c *cache) Close() error {
|
||||
c.cancelInitCountByCloseAndWait()
|
||||
|
||||
c.flushKeysCount()
|
||||
|
||||
// We cannot lock mutex for the whole operation duration
|
||||
// because it is taken by some background workers, so `wg.Wait()` is done without modeMtx.
|
||||
c.modeMtx.Lock()
|
||||
|
@ -125,5 +149,13 @@ func (c *cache) Close() error {
|
|||
}
|
||||
}
|
||||
c.metrics.Close()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// cancelInitCountByCloseAndWait cancels the counting process if Close is invoked
|
||||
// and waits for correct finish of goroutine within initCount.
|
||||
func (c *cache) cancelInitCountByCloseAndWait() {
|
||||
c.objCounters.cDB.cancelByClose()
|
||||
c.initCounterWG.Wait()
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"github.com/dgraph-io/badger/v4"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Delete removes object from write-cache.
|
||||
|
@ -68,3 +69,33 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
|||
|
||||
return metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
// deleteRawKey removes object from write-cache by raw-represented key.
|
||||
//
|
||||
// Returns an error of type apistatus.ObjectNotFound if object is missing in write-cache.
|
||||
func (c *cache) deleteRawKey(key []byte) error {
|
||||
err := c.db.Update(func(tx *badger.Txn) error {
|
||||
it, err := tx.Get(key)
|
||||
if err != nil {
|
||||
if err == badger.ErrKeyNotFound {
|
||||
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
return err
|
||||
}
|
||||
if it.ValueSize() > 0 {
|
||||
err := tx.Delete(key)
|
||||
if err == nil {
|
||||
storagelog.Write(c.log,
|
||||
zap.String("key", string(key)),
|
||||
storagelog.StorageTypeField(wcStorageType),
|
||||
storagelog.OpField("db DELETE"),
|
||||
)
|
||||
c.objCounters.DecDB()
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
return metaerr.Wrap(err)
|
||||
}
|
||||
|
|
|
@ -93,3 +93,14 @@ func Get(db *badger.DB, key []byte) ([]byte, error) {
|
|||
|
||||
return value, metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
// getRawKey returns raw-data from write-cache.
|
||||
//
|
||||
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
|
||||
func (c *cache) getRawKey(key []byte) ([]byte, error) {
|
||||
value, err := Get(c.db, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return value, nil
|
||||
}
|
||||
|
|
|
@ -22,6 +22,8 @@ func (c *cache) SetMode(m mode.Mode) error {
|
|||
))
|
||||
defer span.End()
|
||||
|
||||
c.cancelInitCountBySetModeAndWait()
|
||||
|
||||
c.modeMtx.Lock()
|
||||
defer c.modeMtx.Unlock()
|
||||
|
||||
|
@ -32,6 +34,13 @@ func (c *cache) SetMode(m mode.Mode) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// cancelInitCountBySetModeAndWait cancels the counting process if SetMode is invoked
|
||||
// and waits for correct finish of goroutine within initCount.
|
||||
func (c *cache) cancelInitCountBySetModeAndWait() {
|
||||
c.objCounters.cDB.cancelBySetMode()
|
||||
c.initCounterWG.Wait()
|
||||
}
|
||||
|
||||
// setMode applies new mode. Must be called with cache.modeMtx lock taken.
|
||||
func (c *cache) setMode(ctx context.Context, m mode.Mode) error {
|
||||
var err error
|
||||
|
|
|
@ -34,6 +34,9 @@ type options struct {
|
|||
metrics writecache.Metrics
|
||||
// gcInterval is the interval duration to run the GC cycle.
|
||||
gcInterval time.Duration
|
||||
// recountKeys indicates if keys must be forcefully counted only by DB iteration. This flag should be
|
||||
// used if badger contains invalid keys count record.
|
||||
recountKeys bool
|
||||
}
|
||||
|
||||
// WithLogger sets logger.
|
||||
|
@ -108,3 +111,10 @@ func WithGCInterval(d time.Duration) Option {
|
|||
o.gcInterval = d
|
||||
}
|
||||
}
|
||||
|
||||
// WithRecountKeys sets the recount keys flag.
|
||||
func WithRecountKeys(recount bool) Option {
|
||||
return func(o *options) {
|
||||
o.recountKeys = recount
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
@ -80,3 +81,25 @@ func (c *cache) put(obj objectInfo) error {
|
|||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// putRaw persists raw-represent key and value to the write-cache
|
||||
// database and pushes to the flush workers queue.
|
||||
func (c *cache) putRaw(key, value []byte) error {
|
||||
cacheSize := c.estimateCacheSize()
|
||||
if c.maxCacheSize < c.incSizeDB(cacheSize) {
|
||||
return writecache.ErrOutOfSpace
|
||||
}
|
||||
|
||||
err := c.db.Update(func(txn *badger.Txn) error {
|
||||
err := txn.Set(key, value)
|
||||
return err
|
||||
})
|
||||
if err == nil {
|
||||
storagelog.Write(c.log,
|
||||
storagelog.StorageTypeField(wcStorageType),
|
||||
storagelog.OpField("db PUT"),
|
||||
)
|
||||
c.objCounters.IncDB()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -1,11 +1,18 @@
|
|||
package writecachebadger
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"sync/atomic"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var (
|
||||
errCountCancelled = errors.New("counting was cancelled")
|
||||
)
|
||||
|
||||
func (c *cache) estimateCacheSize() uint64 {
|
||||
aarifullin marked this conversation as resolved
|
||||
|
@ -18,40 +25,180 @@ func (c *cache) incSizeDB(sz uint64) uint64 {
|
|||
return sz + c.maxObjectSize
|
||||
}
|
||||
|
||||
type countAsync struct {
|
||||
value atomic.Uint64
|
||||
|
||||
// mutable checks whether the value can be incremented/decremented.
|
||||
mutable bool
|
||||
|
||||
// mutMtx is mutex for mutable flag.
|
||||
mutMtx sync.RWMutex
|
||||
|
||||
// init checks whether the value is finally initialized.
|
||||
init bool
|
||||
|
||||
// initMtx is mutex for init flag.
|
||||
initMtx sync.RWMutex
|
||||
|
||||
// modeCh is channel to indicate SetMode has been invoked and
|
||||
// counting must be cancelled.
|
||||
modeCh chan struct{}
|
||||
|
||||
// modeDoneOnce guarantees that modeCh has been closed only once.
|
||||
// This may happen if SetMode has been invoked many times.
|
||||
modeDoneOnce sync.Once
|
||||
|
||||
// closeCh is channel to indicate Close has been invoked and
|
||||
// counting must be cancelled.
|
||||
closeCh chan struct{}
|
||||
|
||||
// closeDoneOnce guarantees that modeCh has been closed only once.
|
||||
// This may happen if Close has been invoked many times.
|
||||
closeDoneOnce sync.Once
|
||||
}
|
||||
|
||||
func (c *countAsync) cancelByClose() {
|
||||
c.closeDoneOnce.Do(func() {
|
||||
close(c.closeCh)
|
||||
})
|
||||
}
|
||||
|
||||
func (c *countAsync) cancelBySetMode() {
|
||||
c.modeDoneOnce.Do(func() {
|
||||
close(c.modeCh)
|
||||
})
|
||||
}
|
||||
|
||||
func (c *countAsync) addAndSetAsInit(value uint64) {
|
||||
c.value.Add(value)
|
||||
|
||||
c.initMtx.Lock()
|
||||
defer c.initMtx.Unlock()
|
||||
|
||||
c.init = true
|
||||
}
|
||||
|
||||
func (c *countAsync) isInitialized() bool {
|
||||
c.initMtx.RLock()
|
||||
defer c.initMtx.RUnlock()
|
||||
|
||||
return c.init
|
||||
}
|
||||
|
||||
func (c *countAsync) isMutable() bool {
|
||||
c.mutMtx.RLock()
|
||||
defer c.mutMtx.RUnlock()
|
||||
|
||||
return c.mutable
|
||||
}
|
||||
|
||||
func (c *countAsync) setAsMutable() {
|
||||
c.mutMtx.Lock()
|
||||
defer c.mutMtx.Unlock()
|
||||
|
||||
c.mutable = true
|
||||
}
|
||||
|
||||
func (c *countAsync) reset() {
|
||||
c.value.Store(0)
|
||||
|
||||
c.closeCh = make(chan struct{})
|
||||
c.closeDoneOnce = sync.Once{}
|
||||
|
||||
c.modeCh = make(chan struct{})
|
||||
c.modeDoneOnce = sync.Once{}
|
||||
|
||||
c.initMtx.Lock()
|
||||
defer c.initMtx.Unlock()
|
||||
c.init = false
|
||||
|
||||
c.mutMtx.Lock()
|
||||
defer c.mutMtx.Unlock()
|
||||
c.mutable = false
|
||||
}
|
||||
|
||||
type counters struct {
|
||||
cDB atomic.Uint64
|
||||
cDB countAsync
|
||||
}
|
||||
|
||||
func (x *counters) IncDB() {
|
||||
x.cDB.Add(1)
|
||||
if x.cDB.isMutable() {
|
||||
x.cDB.value.Inc()
|
||||
}
|
||||
}
|
||||
|
||||
func (x *counters) DecDB() {
|
||||
x.cDB.Add(math.MaxUint64)
|
||||
if x.cDB.isMutable() {
|
||||
x.cDB.value.Dec()
|
||||
}
|
||||
}
|
||||
|
||||
func (x *counters) DB() uint64 {
|
||||
return x.cDB.Load()
|
||||
if x.cDB.isInitialized() {
|
||||
return x.cDB.value.Load()
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (x *counters) isReadyToFlush() bool {
|
||||
return x.cDB.isInitialized()
|
||||
}
|
||||
|
||||
func (c *cache) initCounters() error {
|
||||
var inDB uint64
|
||||
err := c.db.View(func(tx *badger.Txn) error {
|
||||
opts := badger.DefaultIteratorOptions
|
||||
opts.PrefetchValues = false
|
||||
it := tx.NewIterator(opts)
|
||||
defer it.Close()
|
||||
for it.Rewind(); it.Valid(); it.Next() {
|
||||
inDB++
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not read write-cache DB counter: %w", err)
|
||||
}
|
||||
c.objCounters.cDB.reset()
|
||||
|
||||
c.objCounters.cDB.Store(inDB)
|
||||
c.metrics.SetActualCounters(inDB, 0)
|
||||
c.initCounterWG.Add(1)
|
||||
go func() {
|
||||
defer func() {
|
||||
c.initCounterWG.Done()
|
||||
}()
|
||||
|
||||
var inDB uint64
|
||||
|
||||
if !c.options.recountKeys {
|
||||
k := keyCountPrefix
|
||||
cDB, err := c.getRawKey(k[:])
|
||||
if err == nil {
|
||||
c.objCounters.cDB.setAsMutable()
|
||||
inDB = binary.LittleEndian.Uint64(cDB)
|
||||
|
||||
c.modeMtx.RLock()
|
||||
defer c.modeMtx.RUnlock()
|
||||
if !c.mode.ReadOnly() {
|
||||
if err = c.deleteRawKey(k[:]); err != nil {
|
||||
c.log.Error(logs.WritecacheBadgerCouldNotDeleteKeysCount, zap.Error(err))
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Recount keys if "key_count" has not been found
|
||||
if err := c.db.View(func(tx *badger.Txn) error {
|
||||
c.objCounters.cDB.setAsMutable()
|
||||
opts := badger.DefaultIteratorOptions
|
||||
opts.PrefetchValues = false
|
||||
it := tx.NewIterator(opts)
|
||||
defer it.Close()
|
||||
for it.Rewind(); it.Valid(); it.Next() {
|
||||
select {
|
||||
case <-c.objCounters.cDB.closeCh:
|
||||
return errCountCancelled
|
||||
case <-c.objCounters.cDB.modeCh:
|
||||
return errCountCancelled
|
||||
default:
|
||||
inDB++
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
c.log.Error(logs.WritecacheBadgerCouldNotCountKeys, zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
c.metrics.SetActualCounters(inDB, 0)
|
||||
c.objCounters.cDB.addAndSetAsInit(inDB)
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -21,6 +21,8 @@ type store struct {
|
|||
db *badger.DB
|
||||
}
|
||||
|
||||
var keyCountPrefix = []byte("keys_count")
|
||||
|
||||
type internalKey [len(cid.ID{}) + len(oid.ID{})]byte
|
||||
|
||||
func (k internalKey) address() oid.Address {
|
||||
|
|
Loading…
Reference in a new issue
The file size on disk is still used here, but bbolt uses the number of objects. But if we use a counter of objects, it is not clear how asynchronous counting will help us.
You're right, we don't need to use
DB()
if we want roughly estimate size of db data.But we cannot use
EstimateSize
if need the exact number of keys in DB:So, we won't be able to count the exact number using
EstimateSize
and the rough approximation for keys number may mislead if we use it for metrics or other purposescc @fyrchik @ale64bit
But such rough approximation with size rounding is used for bbolt.
For example, imagine that you are a user and you see that writecache is not being used. You open the metrics and see that the limit has not been reached. Or vice versa.
Probably, we talk about different things. Let me clarify:
DB()
gives the exact number of keys currently saved in badger.We cannot use
EstimateSize()
to calculate the number of keys:keys = EstimateSize() / EstimateKeySize()
- this is incorrect