Compare commits

...

1 commit

Author SHA1 Message Date
a537d0a62f [#561] writecache: Make badger keys counting async
Some checks failed
Vulncheck / Vulncheck (pull_request) Successful in 2m50s
Build / Build Components (1.21) (pull_request) Successful in 3m33s
DCO action / DCO (pull_request) Successful in 3m46s
Tests and linters / Staticcheck (pull_request) Successful in 4m19s
Tests and linters / Tests with -race (pull_request) Failing after 4m50s
Tests and linters / Tests (1.20) (pull_request) Successful in 5m51s
Tests and linters / Tests (1.21) (pull_request) Successful in 5m59s
Tests and linters / Lint (pull_request) Successful in 6m24s
Build / Build Components (1.20) (pull_request) Successful in 7m24s
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-08-31 13:14:17 +03:00
9 changed files with 291 additions and 23 deletions

View file

@ -286,6 +286,9 @@ const (
ShardCouldNotMarkObjectToDeleteInMetabase = "could not mark object to delete in metabase"
WritecacheBadgerInitExperimental = "initializing badger-backed experimental writecache"
WritecacheTriedToFlushItemsFromWritecache = "tried to flush items from write-cache"
WritecacheBadgerCouldNotCountKeys = "could not count keys from badger by iteration"
WritecacheBadgerCouldNotFlushKeysCount = "could not flush keys count to badger"
WritecacheBadgerCouldNotDeleteKeysCount = "could not delete keys count from badger"
WritecacheWaitingForChannelsToFlush = "waiting for channels to flush"
WritecacheFillingFlushMarksForObjectsInFSTree = "filling flush marks for objects in FSTree"
WritecacheFinishedUpdatingFSTreeFlushMarks = "finished updating FSTree flush marks"

View file

@ -1,6 +1,7 @@
package writecachebadger
import (
"encoding/binary"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -25,6 +26,8 @@ type cache struct {
closeCh chan struct{}
// wg is a wait group for flush workers.
wg sync.WaitGroup
// initCounterWG is necessary to wait for asynchronous initialization.
initCounterWG sync.WaitGroup
// store contains underlying database.
store
}
@ -100,8 +103,29 @@ func (c *cache) Init() error {
return nil
}
// flushKeysCount writes the keys count to db if it should be flushed down.
func (c *cache) flushKeysCount() {
// Close may happen after resetting mode
if c.db.IsClosed() {
return
}
if c.objCounters.isReadyToFlush() {
k := keyCountPrefix
v := make([]byte, 8)
binary.LittleEndian.PutUint64(v, c.objCounters.DB())
if err := c.putRaw(k[:], v); err != nil {
c.log.Error(logs.WritecacheBadgerCouldNotFlushKeysCount, zap.Error(err))
}
}
}
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
func (c *cache) Close() error {
c.cancelInitCountByCloseAndWait()
c.flushKeysCount()
// We cannot lock mutex for the whole operation duration
// because it is taken by some background workers, so `wg.Wait()` is done without modeMtx.
c.modeMtx.Lock()
@ -125,5 +149,13 @@ func (c *cache) Close() error {
}
}
c.metrics.Close()
return nil
}
// cancelInitCountByCloseAndWait cancels the counting process if Close is invoked
// and waits for correct finish of goroutine within initCount.
func (c *cache) cancelInitCountByCloseAndWait() {
c.objCounters.cDB.cancelByClose()
c.initCounterWG.Wait()
}

View file

@ -14,6 +14,7 @@ import (
"github.com/dgraph-io/badger/v4"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
// Delete removes object from write-cache.
@ -68,3 +69,33 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
return metaerr.Wrap(err)
}
// deleteRawKey removes object from write-cache by raw-represented key.
//
// Returns an error of type apistatus.ObjectNotFound if object is missing in write-cache.
func (c *cache) deleteRawKey(key []byte) error {
err := c.db.Update(func(tx *badger.Txn) error {
it, err := tx.Get(key)
if err != nil {
if err == badger.ErrKeyNotFound {
return logicerr.Wrap(new(apistatus.ObjectNotFound))
}
return err
}
if it.ValueSize() > 0 {
err := tx.Delete(key)
if err == nil {
storagelog.Write(c.log,
zap.String("key", string(key)),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db DELETE"),
)
c.objCounters.DecDB()
}
return err
}
return nil
})
return metaerr.Wrap(err)
}

View file

@ -93,3 +93,14 @@ func Get(db *badger.DB, key []byte) ([]byte, error) {
return value, metaerr.Wrap(err)
}
// getRawKey returns raw-data from write-cache.
//
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
func (c *cache) getRawKey(key []byte) ([]byte, error) {
value, err := Get(c.db, key)
if err != nil {
return nil, err
}
return value, nil
}

View file

@ -22,6 +22,8 @@ func (c *cache) SetMode(m mode.Mode) error {
))
defer span.End()
c.cancelInitCountBySetModeAndWait()
c.modeMtx.Lock()
defer c.modeMtx.Unlock()
@ -32,6 +34,13 @@ func (c *cache) SetMode(m mode.Mode) error {
return err
}
// cancelInitCountBySetModeAndWait cancels the counting process if SetMode is invoked
// and waits for correct finish of goroutine within initCount.
func (c *cache) cancelInitCountBySetModeAndWait() {
c.objCounters.cDB.cancelBySetMode()
c.initCounterWG.Wait()
}
// setMode applies new mode. Must be called with cache.modeMtx lock taken.
func (c *cache) setMode(ctx context.Context, m mode.Mode) error {
var err error

View file

@ -34,6 +34,9 @@ type options struct {
metrics writecache.Metrics
// gcInterval is the interval duration to run the GC cycle.
gcInterval time.Duration
// recountKeys indicates if keys must be forcefully counted only by DB iteration. This flag should be
// used if badger contains invalid keys count record.
recountKeys bool
}
// WithLogger sets logger.
@ -108,3 +111,10 @@ func WithGCInterval(d time.Duration) Option {
o.gcInterval = d
}
}
// WithRecountKeys sets the recount keys flag.
func WithRecountKeys(recount bool) Option {
return func(o *options) {
o.recountKeys = recount
}
}

View file

@ -8,6 +8,7 @@ import (
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"github.com/dgraph-io/badger/v4"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
@ -80,3 +81,25 @@ func (c *cache) put(obj objectInfo) error {
}
return err
}
// putRaw persists raw-represent key and value to the write-cache
// database and pushes to the flush workers queue.
func (c *cache) putRaw(key, value []byte) error {
cacheSize := c.estimateCacheSize()
if c.maxCacheSize < c.incSizeDB(cacheSize) {
return writecache.ErrOutOfSpace
}
err := c.db.Update(func(txn *badger.Txn) error {
err := txn.Set(key, value)
return err
})
if err == nil {
storagelog.Write(c.log,
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db PUT"),
)
c.objCounters.IncDB()
}
return err
}

View file

@ -1,11 +1,18 @@
package writecachebadger
import (
"fmt"
"math"
"sync/atomic"
"encoding/binary"
"errors"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"github.com/dgraph-io/badger/v4"
"go.uber.org/atomic"
"go.uber.org/zap"
)
var (
errCountCancelled = errors.New("counting was cancelled")
)
func (c *cache) estimateCacheSize() uint64 {
@ -18,40 +25,180 @@ func (c *cache) incSizeDB(sz uint64) uint64 {
return sz + c.maxObjectSize
}
type countAsync struct {
value atomic.Uint64
// mutable checks whether the value can be incremented/decremented.
mutable bool
// mutMtx is mutex for mutable flag.
mutMtx sync.RWMutex
// init checks whether the value is finally initialized.
init bool
// initMtx is mutex for init flag.
initMtx sync.RWMutex
// modeCh is channel to indicate SetMode has been invoked and
// counting must be cancelled.
modeCh chan struct{}
// modeDoneOnce guarantees that modeCh has been closed only once.
// This may happen if SetMode has been invoked many times.
modeDoneOnce sync.Once
// closeCh is channel to indicate Close has been invoked and
// counting must be cancelled.
closeCh chan struct{}
// closeDoneOnce guarantees that modeCh has been closed only once.
// This may happen if Close has been invoked many times.
closeDoneOnce sync.Once
}
func (c *countAsync) cancelByClose() {
c.closeDoneOnce.Do(func() {
close(c.closeCh)
})
}
func (c *countAsync) cancelBySetMode() {
c.modeDoneOnce.Do(func() {
close(c.modeCh)
})
}
func (c *countAsync) addAndSetAsInit(value uint64) {
c.value.Add(value)
c.initMtx.Lock()
defer c.initMtx.Unlock()
c.init = true
}
func (c *countAsync) isInitialized() bool {
c.initMtx.RLock()
defer c.initMtx.RUnlock()
return c.init
}
func (c *countAsync) isMutable() bool {
c.mutMtx.RLock()
defer c.mutMtx.RUnlock()
return c.mutable
}
func (c *countAsync) setAsMutable() {
c.mutMtx.Lock()
defer c.mutMtx.Unlock()
c.mutable = true
}
func (c *countAsync) reset() {
c.value.Store(0)
c.closeCh = make(chan struct{})
c.closeDoneOnce = sync.Once{}
c.modeCh = make(chan struct{})
c.modeDoneOnce = sync.Once{}
c.initMtx.Lock()
defer c.initMtx.Unlock()
c.init = false
c.mutMtx.Lock()
defer c.mutMtx.Unlock()
c.mutable = false
}
type counters struct {
cDB atomic.Uint64
cDB countAsync
}
func (x *counters) IncDB() {
x.cDB.Add(1)
if x.cDB.isMutable() {
x.cDB.value.Inc()
}
}
func (x *counters) DecDB() {
x.cDB.Add(math.MaxUint64)
if x.cDB.isMutable() {
x.cDB.value.Dec()
}
}
func (x *counters) DB() uint64 {
return x.cDB.Load()
if x.cDB.isInitialized() {
return x.cDB.value.Load()
}
return 0
}
func (x *counters) isReadyToFlush() bool {
return x.cDB.isInitialized()
}
func (c *cache) initCounters() error {
var inDB uint64
err := c.db.View(func(tx *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false
it := tx.NewIterator(opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
inDB++
}
return nil
})
if err != nil {
return fmt.Errorf("could not read write-cache DB counter: %w", err)
}
c.objCounters.cDB.reset()
c.objCounters.cDB.Store(inDB)
c.metrics.SetActualCounters(inDB, 0)
c.initCounterWG.Add(1)
go func() {
defer func() {
c.initCounterWG.Done()
}()
var inDB uint64
if !c.options.recountKeys {
k := keyCountPrefix
cDB, err := c.getRawKey(k[:])
if err == nil {
c.objCounters.cDB.setAsMutable()
inDB = binary.LittleEndian.Uint64(cDB)
c.modeMtx.RLock()
defer c.modeMtx.RUnlock()
if !c.mode.ReadOnly() {
if err = c.deleteRawKey(k[:]); err != nil {
c.log.Error(logs.WritecacheBadgerCouldNotDeleteKeysCount, zap.Error(err))
}
}
return
}
}
// Recount keys if "key_count" has not been found
if err := c.db.View(func(tx *badger.Txn) error {
c.objCounters.cDB.setAsMutable()
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false
it := tx.NewIterator(opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
select {
case <-c.objCounters.cDB.closeCh:
return errCountCancelled
case <-c.objCounters.cDB.modeCh:
return errCountCancelled
default:
inDB++
}
}
return nil
}); err != nil {
c.log.Error(logs.WritecacheBadgerCouldNotCountKeys, zap.Error(err))
return
}
c.metrics.SetActualCounters(inDB, 0)
c.objCounters.cDB.addAndSetAsInit(inDB)
}()
return nil
}

View file

@ -21,6 +21,8 @@ type store struct {
db *badger.DB
}
var keyCountPrefix = []byte("keys_count")
type internalKey [len(cid.ID{}) + len(oid.ID{})]byte
func (k internalKey) address() oid.Address {