writecache: Improve flushing scheme for badger #641

Merged
fyrchik merged 1 commit from acid-ant/frostfs-node:bugfix/568-imprv-badger-flush into master 2023-08-30 17:22:32 +00:00
4 changed files with 89 additions and 65 deletions

2
go.mod
View file

@ -11,6 +11,7 @@ require (
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
github.com/cheggaaa/pb v1.0.29
github.com/chzyer/readline v1.5.1
github.com/dgraph-io/ristretto v0.1.1
github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568
github.com/google/uuid v1.3.0
github.com/hashicorp/golang-lru/v2 v2.0.4
@ -42,7 +43,6 @@ require (
)
require (
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/glog v1.1.0 // indirect

View file

@ -296,6 +296,7 @@ const (
WritecacheCantParseAddress = "can't parse address"
WritecacheCantRemoveObjectFromWritecache = "can't remove object from write-cache"
WritecacheDBValueLogGCRunCompleted = "value log GC run completed"
WritecacheBadgerObjAlreadyScheduled = "object already scheduled for flush"
BlobovniczatreeCouldNotGetObjectFromLevel = "could not get object from level"
BlobovniczatreeCouldNotReadPayloadRangeFromOpenedBlobovnicza = "could not read payload range from opened blobovnicza"
BlobovniczatreeCouldNotReadPayloadRangeFromActiveBlobovnicza = "could not read payload range from active blobovnicza"

View file

@ -21,6 +21,10 @@ type cache struct {
// flushCh is a channel with objects to flush.
flushCh chan *objectSDK.Object
// scheduled4Flush contains objects scheduled for flush via flushCh
// helps to avoid multiple flushing of one object
scheduled4Flush map[oid.Address]struct{}
scheduled4FlushMtx sync.RWMutex
// closeCh is close channel, protected by modeMtx.
closeCh chan struct{}
// wg is a wait group for flush workers.
@ -49,6 +53,7 @@ func New(opts ...Option) writecache.Cache {
c := &cache{
flushCh: make(chan *objectSDK.Object),
mode: mode.ReadWrite,
scheduled4Flush: map[oid.Address]struct{}{},
options: options{
log: &logger.Logger{Logger: zap.NewNop()},

View file

@ -18,7 +18,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"github.com/dgraph-io/badger/v4"
"github.com/mr-tron/base58"
"github.com/dgraph-io/ristretto/z"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
@ -35,6 +35,65 @@ const (
defaultFlushInterval = time.Second
)
type collector struct {
cache *cache
scheduled int
processed int
cancel func()
}
func (c *collector) Send(buf *z.Buffer) error {
list, err := badger.BufferToKVList(buf)
if err != nil {
return err
}
for _, kv := range list.Kv {
select {
case <-c.cache.closeCh:
c.cancel()
return nil
default:
}
if kv.StreamDone {
return nil
}
if c.scheduled >= flushBatchSize {
c.cancel()
return nil
}
if got, want := len(kv.Key), len(internalKey{}); got != want {
c.cache.log.Debug(
fmt.Sprintf("not expected db key len: got %d, want %d", got, want))
continue
}
c.processed++
obj := objectSDK.New()
val := bytes.Clone(kv.Value)
if err = obj.Unmarshal(val); err != nil {
continue
}
addr := objectCore.AddressOf(obj)
c.cache.scheduled4FlushMtx.RLock()
_, ok := c.cache.scheduled4Flush[addr]
c.cache.scheduled4FlushMtx.RUnlock()
if ok {
c.cache.log.Debug(logs.WritecacheBadgerObjAlreadyScheduled, zap.Stringer("obj", addr))
continue
}
c.cache.scheduled4FlushMtx.Lock()
c.cache.scheduled4Flush[addr] = struct{}{}
c.cache.scheduled4FlushMtx.Unlock()
c.scheduled++
select {
case c.cache.flushCh <- obj:
case <-c.cache.closeCh:
c.cancel()
return nil
}
}
return nil
}
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
func (c *cache) runFlushLoop() {
for i := 0; i < c.workersCount; i++ {
@ -62,17 +121,12 @@ func (c *cache) runFlushLoop() {
}
func (c *cache) flushSmallObjects() {
var lastKey internalKey
var m []objectInfo
for {
select {
case <-c.closeCh:
return
default:
}
m = m[:0]
c.modeMtx.RLock()
if c.readOnly() {
c.modeMtx.RUnlock()
@ -86,61 +140,24 @@ func (c *cache) flushSmallObjects() {
c.modeMtx.RUnlock()
return
}
_ = c.db.View(func(tx *badger.Txn) error {
it := tx.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
if len(lastKey) == 0 {
it.Rewind()
} else {
it.Seek(lastKey[:])
if it.Valid() && bytes.Equal(it.Item().Key(), lastKey[:]) {
it.Next()
ctx, cancel := context.WithCancel(context.TODO())
coll := collector{
cache: c,
cancel: cancel,
}
stream := c.db.NewStream()
// All calls to Send are done by a single goroutine
stream.Send = coll.Send
if err := stream.Orchestrate(ctx); err != nil {
c.log.Debug(fmt.Sprintf(
"error during flushing object from wc: %s", err))
}
for ; it.Valid() && len(m) < flushBatchSize; it.Next() {
if got, want := int(it.Item().KeySize()), len(lastKey); got != want {
return fmt.Errorf("invalid db key len: got %d, want %d", got, want)
}
it.Item().KeyCopy(lastKey[:])
value, err := it.Item().ValueCopy(nil)
if err != nil {
return err
}
m = append(m, objectInfo{
addr: lastKey.address(),
data: value,
})
}
return nil
})
var count int
for i := range m {
obj := objectSDK.New()
if err := obj.Unmarshal(m[i].data); err != nil {
continue
}
count++
select {
case c.flushCh <- obj:
case <-c.closeCh:
c.modeMtx.RUnlock()
return
}
}
if count == 0 {
c.modeMtx.RUnlock()
if coll.scheduled == 0 {
break
}
c.modeMtx.RUnlock()
c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache,
zap.Int("count", count),
zap.String("start", base58.Encode(lastKey[:])))
zap.Int("scheduled", coll.scheduled), zap.Int("processed", coll.processed))
}
}
@ -167,13 +184,14 @@ func (c *cache) workerFlushSmall() {
return
}
addr := objectCore.AddressOf(obj)
err := c.flushObject(context.TODO(), obj, nil, writecache.StorageTypeDB)
if err != nil {
// Error is handled in flushObject.
continue
if err == nil {
c.deleteFromDB([]internalKey{addr2key(addr)})
}
c.deleteFromDB([]internalKey{addr2key(objectCore.AddressOf(obj))})
c.scheduled4FlushMtx.Lock()
delete(c.scheduled4Flush, addr)
c.scheduled4FlushMtx.Unlock()
}
}