Compare commits
1 commit
master
...
feature/56
Author | SHA1 | Date | |
---|---|---|---|
a537d0a62f |
9 changed files with 291 additions and 23 deletions
|
@ -286,6 +286,9 @@ const (
|
|||
ShardCouldNotMarkObjectToDeleteInMetabase = "could not mark object to delete in metabase"
|
||||
WritecacheBadgerInitExperimental = "initializing badger-backed experimental writecache"
|
||||
WritecacheTriedToFlushItemsFromWritecache = "tried to flush items from write-cache"
|
||||
WritecacheBadgerCouldNotCountKeys = "could not count keys from badger by iteration"
|
||||
WritecacheBadgerCouldNotFlushKeysCount = "could not flush keys count to badger"
|
||||
WritecacheBadgerCouldNotDeleteKeysCount = "could not delete keys count from badger"
|
||||
WritecacheWaitingForChannelsToFlush = "waiting for channels to flush"
|
||||
WritecacheFillingFlushMarksForObjectsInFSTree = "filling flush marks for objects in FSTree"
|
||||
WritecacheFinishedUpdatingFSTreeFlushMarks = "finished updating FSTree flush marks"
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package writecachebadger
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
|
@ -25,6 +26,8 @@ type cache struct {
|
|||
closeCh chan struct{}
|
||||
// wg is a wait group for flush workers.
|
||||
wg sync.WaitGroup
|
||||
// initCounterWG is necessary to wait for asynchronous initialization.
|
||||
initCounterWG sync.WaitGroup
|
||||
// store contains underlying database.
|
||||
store
|
||||
}
|
||||
|
@ -100,8 +103,29 @@ func (c *cache) Init() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// flushKeysCount writes the keys count to db if it should be flushed down.
|
||||
func (c *cache) flushKeysCount() {
|
||||
// Close may happen after resetting mode
|
||||
if c.db.IsClosed() {
|
||||
return
|
||||
}
|
||||
|
||||
if c.objCounters.isReadyToFlush() {
|
||||
k := keyCountPrefix
|
||||
v := make([]byte, 8)
|
||||
binary.LittleEndian.PutUint64(v, c.objCounters.DB())
|
||||
if err := c.putRaw(k[:], v); err != nil {
|
||||
c.log.Error(logs.WritecacheBadgerCouldNotFlushKeysCount, zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
|
||||
func (c *cache) Close() error {
|
||||
c.cancelInitCountByCloseAndWait()
|
||||
|
||||
c.flushKeysCount()
|
||||
|
||||
// We cannot lock mutex for the whole operation duration
|
||||
// because it is taken by some background workers, so `wg.Wait()` is done without modeMtx.
|
||||
c.modeMtx.Lock()
|
||||
|
@ -125,5 +149,13 @@ func (c *cache) Close() error {
|
|||
}
|
||||
}
|
||||
c.metrics.Close()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// cancelInitCountByCloseAndWait cancels the counting process if Close is invoked
|
||||
// and waits for correct finish of goroutine within initCount.
|
||||
func (c *cache) cancelInitCountByCloseAndWait() {
|
||||
c.objCounters.cDB.cancelByClose()
|
||||
c.initCounterWG.Wait()
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"github.com/dgraph-io/badger/v4"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Delete removes object from write-cache.
|
||||
|
@ -68,3 +69,33 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
|||
|
||||
return metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
// deleteRawKey removes object from write-cache by raw-represented key.
|
||||
//
|
||||
// Returns an error of type apistatus.ObjectNotFound if object is missing in write-cache.
|
||||
func (c *cache) deleteRawKey(key []byte) error {
|
||||
err := c.db.Update(func(tx *badger.Txn) error {
|
||||
it, err := tx.Get(key)
|
||||
if err != nil {
|
||||
if err == badger.ErrKeyNotFound {
|
||||
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
return err
|
||||
}
|
||||
if it.ValueSize() > 0 {
|
||||
err := tx.Delete(key)
|
||||
if err == nil {
|
||||
storagelog.Write(c.log,
|
||||
zap.String("key", string(key)),
|
||||
storagelog.StorageTypeField(wcStorageType),
|
||||
storagelog.OpField("db DELETE"),
|
||||
)
|
||||
c.objCounters.DecDB()
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
return metaerr.Wrap(err)
|
||||
}
|
||||
|
|
|
@ -93,3 +93,14 @@ func Get(db *badger.DB, key []byte) ([]byte, error) {
|
|||
|
||||
return value, metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
// getRawKey returns raw-data from write-cache.
|
||||
//
|
||||
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
|
||||
func (c *cache) getRawKey(key []byte) ([]byte, error) {
|
||||
value, err := Get(c.db, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return value, nil
|
||||
}
|
||||
|
|
|
@ -22,6 +22,8 @@ func (c *cache) SetMode(m mode.Mode) error {
|
|||
))
|
||||
defer span.End()
|
||||
|
||||
c.cancelInitCountBySetModeAndWait()
|
||||
|
||||
c.modeMtx.Lock()
|
||||
defer c.modeMtx.Unlock()
|
||||
|
||||
|
@ -32,6 +34,13 @@ func (c *cache) SetMode(m mode.Mode) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// cancelInitCountBySetModeAndWait cancels the counting process if SetMode is invoked
|
||||
// and waits for correct finish of goroutine within initCount.
|
||||
func (c *cache) cancelInitCountBySetModeAndWait() {
|
||||
c.objCounters.cDB.cancelBySetMode()
|
||||
c.initCounterWG.Wait()
|
||||
}
|
||||
|
||||
// setMode applies new mode. Must be called with cache.modeMtx lock taken.
|
||||
func (c *cache) setMode(ctx context.Context, m mode.Mode) error {
|
||||
var err error
|
||||
|
|
|
@ -34,6 +34,9 @@ type options struct {
|
|||
metrics writecache.Metrics
|
||||
// gcInterval is the interval duration to run the GC cycle.
|
||||
gcInterval time.Duration
|
||||
// recountKeys indicates if keys must be forcefully counted only by DB iteration. This flag should be
|
||||
// used if badger contains invalid keys count record.
|
||||
recountKeys bool
|
||||
}
|
||||
|
||||
// WithLogger sets logger.
|
||||
|
@ -108,3 +111,10 @@ func WithGCInterval(d time.Duration) Option {
|
|||
o.gcInterval = d
|
||||
}
|
||||
}
|
||||
|
||||
// WithRecountKeys sets the recount keys flag.
|
||||
func WithRecountKeys(recount bool) Option {
|
||||
return func(o *options) {
|
||||
o.recountKeys = recount
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
@ -80,3 +81,25 @@ func (c *cache) put(obj objectInfo) error {
|
|||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// putRaw persists raw-represent key and value to the write-cache
|
||||
// database and pushes to the flush workers queue.
|
||||
func (c *cache) putRaw(key, value []byte) error {
|
||||
cacheSize := c.estimateCacheSize()
|
||||
if c.maxCacheSize < c.incSizeDB(cacheSize) {
|
||||
return writecache.ErrOutOfSpace
|
||||
}
|
||||
|
||||
err := c.db.Update(func(txn *badger.Txn) error {
|
||||
err := txn.Set(key, value)
|
||||
return err
|
||||
})
|
||||
if err == nil {
|
||||
storagelog.Write(c.log,
|
||||
storagelog.StorageTypeField(wcStorageType),
|
||||
storagelog.OpField("db PUT"),
|
||||
)
|
||||
c.objCounters.IncDB()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -1,11 +1,18 @@
|
|||
package writecachebadger
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"sync/atomic"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var (
|
||||
errCountCancelled = errors.New("counting was cancelled")
|
||||
)
|
||||
|
||||
func (c *cache) estimateCacheSize() uint64 {
|
||||
|
@ -18,40 +25,180 @@ func (c *cache) incSizeDB(sz uint64) uint64 {
|
|||
return sz + c.maxObjectSize
|
||||
}
|
||||
|
||||
type countAsync struct {
|
||||
value atomic.Uint64
|
||||
|
||||
// mutable checks whether the value can be incremented/decremented.
|
||||
mutable bool
|
||||
|
||||
// mutMtx is mutex for mutable flag.
|
||||
mutMtx sync.RWMutex
|
||||
|
||||
// init checks whether the value is finally initialized.
|
||||
init bool
|
||||
|
||||
// initMtx is mutex for init flag.
|
||||
initMtx sync.RWMutex
|
||||
|
||||
// modeCh is channel to indicate SetMode has been invoked and
|
||||
// counting must be cancelled.
|
||||
modeCh chan struct{}
|
||||
|
||||
// modeDoneOnce guarantees that modeCh has been closed only once.
|
||||
// This may happen if SetMode has been invoked many times.
|
||||
modeDoneOnce sync.Once
|
||||
|
||||
// closeCh is channel to indicate Close has been invoked and
|
||||
// counting must be cancelled.
|
||||
closeCh chan struct{}
|
||||
|
||||
// closeDoneOnce guarantees that modeCh has been closed only once.
|
||||
// This may happen if Close has been invoked many times.
|
||||
closeDoneOnce sync.Once
|
||||
}
|
||||
|
||||
func (c *countAsync) cancelByClose() {
|
||||
c.closeDoneOnce.Do(func() {
|
||||
close(c.closeCh)
|
||||
})
|
||||
}
|
||||
|
||||
func (c *countAsync) cancelBySetMode() {
|
||||
c.modeDoneOnce.Do(func() {
|
||||
close(c.modeCh)
|
||||
})
|
||||
}
|
||||
|
||||
func (c *countAsync) addAndSetAsInit(value uint64) {
|
||||
c.value.Add(value)
|
||||
|
||||
c.initMtx.Lock()
|
||||
defer c.initMtx.Unlock()
|
||||
|
||||
c.init = true
|
||||
}
|
||||
|
||||
func (c *countAsync) isInitialized() bool {
|
||||
c.initMtx.RLock()
|
||||
defer c.initMtx.RUnlock()
|
||||
|
||||
return c.init
|
||||
}
|
||||
|
||||
func (c *countAsync) isMutable() bool {
|
||||
c.mutMtx.RLock()
|
||||
defer c.mutMtx.RUnlock()
|
||||
|
||||
return c.mutable
|
||||
}
|
||||
|
||||
func (c *countAsync) setAsMutable() {
|
||||
c.mutMtx.Lock()
|
||||
defer c.mutMtx.Unlock()
|
||||
|
||||
c.mutable = true
|
||||
}
|
||||
|
||||
func (c *countAsync) reset() {
|
||||
c.value.Store(0)
|
||||
|
||||
c.closeCh = make(chan struct{})
|
||||
c.closeDoneOnce = sync.Once{}
|
||||
|
||||
c.modeCh = make(chan struct{})
|
||||
c.modeDoneOnce = sync.Once{}
|
||||
|
||||
c.initMtx.Lock()
|
||||
defer c.initMtx.Unlock()
|
||||
c.init = false
|
||||
|
||||
c.mutMtx.Lock()
|
||||
defer c.mutMtx.Unlock()
|
||||
c.mutable = false
|
||||
}
|
||||
|
||||
type counters struct {
|
||||
cDB atomic.Uint64
|
||||
cDB countAsync
|
||||
}
|
||||
|
||||
func (x *counters) IncDB() {
|
||||
x.cDB.Add(1)
|
||||
if x.cDB.isMutable() {
|
||||
x.cDB.value.Inc()
|
||||
}
|
||||
}
|
||||
|
||||
func (x *counters) DecDB() {
|
||||
x.cDB.Add(math.MaxUint64)
|
||||
if x.cDB.isMutable() {
|
||||
x.cDB.value.Dec()
|
||||
}
|
||||
}
|
||||
|
||||
func (x *counters) DB() uint64 {
|
||||
return x.cDB.Load()
|
||||
if x.cDB.isInitialized() {
|
||||
return x.cDB.value.Load()
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (x *counters) isReadyToFlush() bool {
|
||||
return x.cDB.isInitialized()
|
||||
}
|
||||
|
||||
func (c *cache) initCounters() error {
|
||||
var inDB uint64
|
||||
err := c.db.View(func(tx *badger.Txn) error {
|
||||
opts := badger.DefaultIteratorOptions
|
||||
opts.PrefetchValues = false
|
||||
it := tx.NewIterator(opts)
|
||||
defer it.Close()
|
||||
for it.Rewind(); it.Valid(); it.Next() {
|
||||
inDB++
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not read write-cache DB counter: %w", err)
|
||||
}
|
||||
c.objCounters.cDB.reset()
|
||||
|
||||
c.objCounters.cDB.Store(inDB)
|
||||
c.metrics.SetActualCounters(inDB, 0)
|
||||
c.initCounterWG.Add(1)
|
||||
go func() {
|
||||
defer func() {
|
||||
c.initCounterWG.Done()
|
||||
}()
|
||||
|
||||
var inDB uint64
|
||||
|
||||
if !c.options.recountKeys {
|
||||
k := keyCountPrefix
|
||||
cDB, err := c.getRawKey(k[:])
|
||||
if err == nil {
|
||||
c.objCounters.cDB.setAsMutable()
|
||||
inDB = binary.LittleEndian.Uint64(cDB)
|
||||
|
||||
c.modeMtx.RLock()
|
||||
defer c.modeMtx.RUnlock()
|
||||
if !c.mode.ReadOnly() {
|
||||
if err = c.deleteRawKey(k[:]); err != nil {
|
||||
c.log.Error(logs.WritecacheBadgerCouldNotDeleteKeysCount, zap.Error(err))
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Recount keys if "key_count" has not been found
|
||||
if err := c.db.View(func(tx *badger.Txn) error {
|
||||
c.objCounters.cDB.setAsMutable()
|
||||
opts := badger.DefaultIteratorOptions
|
||||
opts.PrefetchValues = false
|
||||
it := tx.NewIterator(opts)
|
||||
defer it.Close()
|
||||
for it.Rewind(); it.Valid(); it.Next() {
|
||||
select {
|
||||
case <-c.objCounters.cDB.closeCh:
|
||||
return errCountCancelled
|
||||
case <-c.objCounters.cDB.modeCh:
|
||||
return errCountCancelled
|
||||
default:
|
||||
inDB++
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
c.log.Error(logs.WritecacheBadgerCouldNotCountKeys, zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
c.metrics.SetActualCounters(inDB, 0)
|
||||
c.objCounters.cDB.addAndSetAsInit(inDB)
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -21,6 +21,8 @@ type store struct {
|
|||
db *badger.DB
|
||||
}
|
||||
|
||||
var keyCountPrefix = []byte("keys_count")
|
||||
|
||||
type internalKey [len(cid.ID{}) + len(oid.ID{})]byte
|
||||
|
||||
func (k internalKey) address() oid.Address {
|
||||
|
|
Loading…
Reference in a new issue