writecache: Improve flushing scheme for badger #641

Merged
fyrchik merged 1 commit from acid-ant/frostfs-node:bugfix/568-imprv-badger-flush into master 2023-08-30 17:22:32 +00:00
4 changed files with 89 additions and 65 deletions

2
go.mod
View file

@ -11,6 +11,7 @@ require (
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
github.com/cheggaaa/pb v1.0.29
github.com/chzyer/readline v1.5.1
github.com/dgraph-io/ristretto v0.1.1
github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568
github.com/google/uuid v1.3.0
github.com/hashicorp/golang-lru/v2 v2.0.4
@ -42,7 +43,6 @@ require (
)
require (
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/glog v1.1.0 // indirect

View file

@ -296,6 +296,7 @@ const (
WritecacheCantParseAddress = "can't parse address"
WritecacheCantRemoveObjectFromWritecache = "can't remove object from write-cache"
WritecacheDBValueLogGCRunCompleted = "value log GC run completed"
WritecacheBadgerObjAlreadyScheduled = "object already scheduled for flush"
BlobovniczatreeCouldNotGetObjectFromLevel = "could not get object from level"
BlobovniczatreeCouldNotReadPayloadRangeFromOpenedBlobovnicza = "could not read payload range from opened blobovnicza"
BlobovniczatreeCouldNotReadPayloadRangeFromActiveBlobovnicza = "could not read payload range from active blobovnicza"

View file

@ -21,6 +21,10 @@ type cache struct {
// flushCh is a channel with objects to flush.
flushCh chan *objectSDK.Object
// scheduled4Flush contains objects scheduled for flush via flushCh
// helps to avoid multiple flushing of one object
scheduled4Flush map[oid.Address]struct{}
fyrchik marked this conversation as resolved Outdated

If it is a set, it is better to use struct{}, not any, as struct{} has zero size and thus could be optimized by the compiler.

If it is a set, it is better to use `struct{}`, not `any`, as `struct{}` has zero size and thus could be optimized by the compiler.

Thanks, updated.

Thanks, updated.
scheduled4FlushMtx sync.RWMutex
// closeCh is close channel, protected by modeMtx.
closeCh chan struct{}
// wg is a wait group for flush workers.
@ -47,8 +51,9 @@ const (
// New creates new writecache instance.
func New(opts ...Option) writecache.Cache {
c := &cache{
flushCh: make(chan *objectSDK.Object),
mode: mode.ReadWrite,
flushCh: make(chan *objectSDK.Object),
mode: mode.ReadWrite,
scheduled4Flush: map[oid.Address]struct{}{},
options: options{
log: &logger.Logger{Logger: zap.NewNop()},

View file

@ -18,7 +18,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"github.com/dgraph-io/badger/v4"
"github.com/mr-tron/base58"
"github.com/dgraph-io/ristretto/z"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
@ -35,6 +35,65 @@ const (
defaultFlushInterval = time.Second
)
type collector struct {
cache *cache
scheduled int
processed int
cancel func()
}
func (c *collector) Send(buf *z.Buffer) error {
list, err := badger.BufferToKVList(buf)
if err != nil {
return err
}
for _, kv := range list.Kv {
select {
case <-c.cache.closeCh:
c.cancel()
return nil
default:
}
if kv.StreamDone {
return nil
}
if c.scheduled >= flushBatchSize {
c.cancel()

Flush batch size was there in bbolt, because long View transactions prevented database from growing in size.
In badger we may not need this.

Flush batch size was there in `bbolt`, because long `View` transactions prevented database from growing in size. In badger we may not need this.

The reason I decided to continue to use it is to allow changing mode for writecache. If wc will be under pressure and for any reason we need to change mode, this will be possible once all objects were scheduled. It may take some time, external call may fail by timeout.
Is It acceptable for us? Or it is better to interrupt some time?

The reason I decided to continue to use it is to allow changing mode for writecache. If wc will be under pressure and for any reason we need to change mode, this will be possible once all objects were scheduled. It may take some time, external call may fail by timeout. Is It acceptable for us? Or it is better to interrupt some time?
return nil
}
if got, want := len(kv.Key), len(internalKey{}); got != want {
c.cache.log.Debug(
fyrchik marked this conversation as resolved Outdated

len(internalKey{})?

`len(internalKey{})`?

Yeah, this is better, fixed.

Yeah, this is better, fixed.
fmt.Sprintf("not expected db key len: got %d, want %d", got, want))
continue
}
c.processed++
obj := objectSDK.New()
val := bytes.Clone(kv.Value)
if err = obj.Unmarshal(val); err != nil {
continue
fyrchik marked this conversation as resolved Outdated

val := bytes.Clone (or slice.Copy)?

`val := bytes.Clone` (or `slice.Copy`)?

Updated, I've chosen append because badger uses it in CopyValue.

Updated, I've chosen append because badger uses it in `CopyValue`.
}
addr := objectCore.AddressOf(obj)
c.cache.scheduled4FlushMtx.RLock()
_, ok := c.cache.scheduled4Flush[addr]
fyrchik marked this conversation as resolved Outdated

Correct me if I am wrong: the purpose of this map is to prevent flushing the same object twice?

Correct me if I am wrong: the purpose of this map is to prevent flushing the same object twice?

Right, It is possible that a background routine which storing data may hang, and on the next iteration the same object will be scheduled. When using stream it is impossible to start from one of the key, no such api. Also, retrieved keys sorted lexicographically, not in putting in db order.

Right, It is possible that a background routine which storing data may hang, and on the next iteration the same object will be scheduled. When using stream it is impossible to start from one of the key, no such api. Also, retrieved keys sorted lexicographically, not in putting in db order.
c.cache.scheduled4FlushMtx.RUnlock()
if ok {
c.cache.log.Debug(logs.WritecacheBadgerObjAlreadyScheduled, zap.Stringer("obj", addr))
continue
}
c.cache.scheduled4FlushMtx.Lock()
c.cache.scheduled4Flush[addr] = struct{}{}
c.cache.scheduled4FlushMtx.Unlock()
c.scheduled++
select {
case c.cache.flushCh <- obj:
case <-c.cache.closeCh:
c.cancel()
return nil
}
}
return nil
fyrchik marked this conversation as resolved Outdated
  1. If we store garbage data, it should be at least Warn or Error (and probably increase shard error counter
  2. Let's use common scheme with log.Error(logs.Message, zap.Error(err))
1. If we store garbage data, it should be at least `Warn` or `Error` (and probably increase shard error counter 2. Let's use common scheme with `log.Error(logs.Message, zap.Error(err))`

Actually, if we increase error counter, there is no need to log.

Actually, if we increase error counter, there is no need to log.

Agree, removed log entry.

Agree, removed log entry.
}
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
func (c *cache) runFlushLoop() {
for i := 0; i < c.workersCount; i++ {
@ -62,17 +121,12 @@ func (c *cache) runFlushLoop() {
}
func (c *cache) flushSmallObjects() {
var lastKey internalKey
var m []objectInfo
for {
select {
case <-c.closeCh:
return
default:
}
m = m[:0]
c.modeMtx.RLock()
if c.readOnly() {
c.modeMtx.RUnlock()
@ -86,61 +140,24 @@ func (c *cache) flushSmallObjects() {
c.modeMtx.RUnlock()
return
}
_ = c.db.View(func(tx *badger.Txn) error {
it := tx.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
if len(lastKey) == 0 {
it.Rewind()
} else {
it.Seek(lastKey[:])
if it.Valid() && bytes.Equal(it.Item().Key(), lastKey[:]) {
it.Next()
}
}
for ; it.Valid() && len(m) < flushBatchSize; it.Next() {
if got, want := int(it.Item().KeySize()), len(lastKey); got != want {
return fmt.Errorf("invalid db key len: got %d, want %d", got, want)
}
it.Item().KeyCopy(lastKey[:])
value, err := it.Item().ValueCopy(nil)
if err != nil {
return err
}
m = append(m, objectInfo{
addr: lastKey.address(),
data: value,
})
}
return nil
})
var count int
for i := range m {
obj := objectSDK.New()
if err := obj.Unmarshal(m[i].data); err != nil {
continue
}
count++
select {
case c.flushCh <- obj:
case <-c.closeCh:
c.modeMtx.RUnlock()
return
}
ctx, cancel := context.WithCancel(context.TODO())
coll := collector{
cache: c,
cancel: cancel,
fyrchik marked this conversation as resolved Outdated

Background? Or provide context to flushSmallObjects?

`Background`? Or provide context to `flushSmallObjects`?

Think it is better to use global context in a separate task #642. Requires a lot of refactoring, because we need to change the signature of Init method.

Think it is better to use global context in a separate task #642. Requires a lot of refactoring, because we need to change the signature of `Init` method.
}
if count == 0 {
c.modeMtx.RUnlock()
stream := c.db.NewStream()
// All calls to Send are done by a single goroutine
stream.Send = coll.Send
if err := stream.Orchestrate(ctx); err != nil {
c.log.Debug(fmt.Sprintf(
fyrchik marked this conversation as resolved Outdated

can expect or expects?

`can expect` or `expects`?

All calls to Send are done by a single goroutine.

All calls to `Send` are done by a single goroutine.
"error during flushing object from wc: %s", err))
}
fyrchik marked this conversation as resolved Outdated

Does it exit after all object in the database were streamed through?

Does it exit after all object in the database were streamed through?

Yes, in opposite to Subscribe.

Yes, in opposite to `Subscribe`.
c.modeMtx.RUnlock()
if coll.scheduled == 0 {
dstepanov-yadro marked this conversation as resolved Outdated

Please explain this line. I don't understand why the flush ends if no objects are scheduled?

Please explain this line. I don't understand why the flush ends if no objects are scheduled?

It is possible that few objects still exists in db and at the same time scheduled for flush.
To prevent iteration over already scheduled objects, we need to check for scheduled counter, not for processed.
This loop running by timer each second.

It is possible that few objects still exists in db and at the same time scheduled for flush. To prevent iteration over already scheduled objects, we need to check for scheduled counter, not for processed. This loop running by timer each second.
break
}
c.modeMtx.RUnlock()
c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache,
zap.Int("count", count),
zap.String("start", base58.Encode(lastKey[:])))
zap.Int("scheduled", coll.scheduled), zap.Int("processed", coll.processed))
}
}
fyrchik marked this conversation as resolved Outdated

GC interval is 1 minute by default currently. It may be beneficial to run it after each flush cycle instead. What do you think? @TrueCloudLab/storage-core-committers @TrueCloudLab/storage-core-developers

GC interval is 1 minute by default currently. It may be beneficial to run it after each flush cycle instead. What do you think? @TrueCloudLab/storage-core-committers @TrueCloudLab/storage-core-developers

If you mean badgers gs, I think that will be useful here if we scheduled some objects for deletion. We're flushing each second, could this be a problem?

If you mean badgers gs, I think that will be useful here if we scheduled some objects for deletion. We're flushing each second, could this be a problem?

I don't think it must be some correlation between badger GC and flush.

I don't think it must be some correlation between badger GC and flush.
@ -167,13 +184,14 @@ func (c *cache) workerFlushSmall() {
return
}
addr := objectCore.AddressOf(obj)
err := c.flushObject(context.TODO(), obj, nil, writecache.StorageTypeDB)
if err != nil {
// Error is handled in flushObject.
continue
if err == nil {
c.deleteFromDB([]internalKey{addr2key(addr)})
}
c.deleteFromDB([]internalKey{addr2key(objectCore.AddressOf(obj))})
c.scheduled4FlushMtx.Lock()
delete(c.scheduled4Flush, addr)
c.scheduled4FlushMtx.Unlock()
}
}