writecache: Improve flushing scheme for badger #641
2
go.mod
|
@ -11,6 +11,7 @@ require (
|
|||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
||||
github.com/cheggaaa/pb v1.0.29
|
||||
github.com/chzyer/readline v1.5.1
|
||||
github.com/dgraph-io/ristretto v0.1.1
|
||||
github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568
|
||||
github.com/google/uuid v1.3.0
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.4
|
||||
|
@ -42,7 +43,6 @@ require (
|
|||
)
|
||||
|
||||
require (
|
||||
github.com/dgraph-io/ristretto v0.1.1 // indirect
|
||||
github.com/dustin/go-humanize v1.0.0 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/golang/glog v1.1.0 // indirect
|
||||
|
|
|
@ -296,6 +296,7 @@ const (
|
|||
WritecacheCantParseAddress = "can't parse address"
|
||||
WritecacheCantRemoveObjectFromWritecache = "can't remove object from write-cache"
|
||||
WritecacheDBValueLogGCRunCompleted = "value log GC run completed"
|
||||
WritecacheBadgerObjAlreadyScheduled = "object already scheduled for flush"
|
||||
BlobovniczatreeCouldNotGetObjectFromLevel = "could not get object from level"
|
||||
BlobovniczatreeCouldNotReadPayloadRangeFromOpenedBlobovnicza = "could not read payload range from opened blobovnicza"
|
||||
BlobovniczatreeCouldNotReadPayloadRangeFromActiveBlobovnicza = "could not read payload range from active blobovnicza"
|
||||
|
|
|
@ -21,6 +21,10 @@ type cache struct {
|
|||
|
||||
// flushCh is a channel with objects to flush.
|
||||
flushCh chan *objectSDK.Object
|
||||
// scheduled4Flush contains objects scheduled for flush via flushCh
|
||||
// helps to avoid multiple flushing of one object
|
||||
scheduled4Flush map[oid.Address]struct{}
|
||||
fyrchik marked this conversation as resolved
Outdated
|
||||
scheduled4FlushMtx sync.RWMutex
|
||||
// closeCh is close channel, protected by modeMtx.
|
||||
closeCh chan struct{}
|
||||
// wg is a wait group for flush workers.
|
||||
|
@ -47,8 +51,9 @@ const (
|
|||
// New creates new writecache instance.
|
||||
func New(opts ...Option) writecache.Cache {
|
||||
c := &cache{
|
||||
flushCh: make(chan *objectSDK.Object),
|
||||
mode: mode.ReadWrite,
|
||||
flushCh: make(chan *objectSDK.Object),
|
||||
mode: mode.ReadWrite,
|
||||
scheduled4Flush: map[oid.Address]struct{}{},
|
||||
|
||||
options: options{
|
||||
log: &logger.Logger{Logger: zap.NewNop()},
|
||||
|
|
|
@ -18,7 +18,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"github.com/mr-tron/base58"
|
||||
"github.com/dgraph-io/ristretto/z"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
|
@ -35,6 +35,65 @@ const (
|
|||
defaultFlushInterval = time.Second
|
||||
)
|
||||
|
||||
type collector struct {
|
||||
cache *cache
|
||||
scheduled int
|
||||
processed int
|
||||
cancel func()
|
||||
}
|
||||
|
||||
func (c *collector) Send(buf *z.Buffer) error {
|
||||
list, err := badger.BufferToKVList(buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, kv := range list.Kv {
|
||||
select {
|
||||
case <-c.cache.closeCh:
|
||||
c.cancel()
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
if kv.StreamDone {
|
||||
return nil
|
||||
}
|
||||
if c.scheduled >= flushBatchSize {
|
||||
c.cancel()
|
||||
fyrchik
commented
Flush batch size was there in Flush batch size was there in `bbolt`, because long `View` transactions prevented database from growing in size.
In badger we may not need this.
acid-ant
commented
The reason I decided to continue to use it is to allow changing mode for writecache. If wc will be under pressure and for any reason we need to change mode, this will be possible once all objects were scheduled. It may take some time, external call may fail by timeout. The reason I decided to continue to use it is to allow changing mode for writecache. If wc will be under pressure and for any reason we need to change mode, this will be possible once all objects were scheduled. It may take some time, external call may fail by timeout.
Is It acceptable for us? Or it is better to interrupt some time?
|
||||
return nil
|
||||
}
|
||||
if got, want := len(kv.Key), len(internalKey{}); got != want {
|
||||
c.cache.log.Debug(
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
`len(internalKey{})`?
acid-ant
commented
Yeah, this is better, fixed. Yeah, this is better, fixed.
|
||||
fmt.Sprintf("not expected db key len: got %d, want %d", got, want))
|
||||
continue
|
||||
}
|
||||
c.processed++
|
||||
obj := objectSDK.New()
|
||||
val := bytes.Clone(kv.Value)
|
||||
if err = obj.Unmarshal(val); err != nil {
|
||||
continue
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
`val := bytes.Clone` (or `slice.Copy`)?
acid-ant
commented
Updated, I've chosen append because badger uses it in Updated, I've chosen append because badger uses it in `CopyValue`.
|
||||
}
|
||||
addr := objectCore.AddressOf(obj)
|
||||
c.cache.scheduled4FlushMtx.RLock()
|
||||
_, ok := c.cache.scheduled4Flush[addr]
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Correct me if I am wrong: the purpose of this map is to prevent flushing the same object twice? Correct me if I am wrong: the purpose of this map is to prevent flushing the same object twice?
acid-ant
commented
Right, It is possible that a background routine which storing data may hang, and on the next iteration the same object will be scheduled. When using stream it is impossible to start from one of the key, no such api. Also, retrieved keys sorted lexicographically, not in putting in db order. Right, It is possible that a background routine which storing data may hang, and on the next iteration the same object will be scheduled. When using stream it is impossible to start from one of the key, no such api. Also, retrieved keys sorted lexicographically, not in putting in db order.
|
||||
c.cache.scheduled4FlushMtx.RUnlock()
|
||||
if ok {
|
||||
c.cache.log.Debug(logs.WritecacheBadgerObjAlreadyScheduled, zap.Stringer("obj", addr))
|
||||
continue
|
||||
}
|
||||
c.cache.scheduled4FlushMtx.Lock()
|
||||
c.cache.scheduled4Flush[addr] = struct{}{}
|
||||
c.cache.scheduled4FlushMtx.Unlock()
|
||||
c.scheduled++
|
||||
select {
|
||||
case c.cache.flushCh <- obj:
|
||||
case <-c.cache.closeCh:
|
||||
c.cancel()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return nil
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
1. If we store garbage data, it should be at least `Warn` or `Error` (and probably increase shard error counter
2. Let's use common scheme with `log.Error(logs.Message, zap.Error(err))`
fyrchik
commented
Actually, if we increase error counter, there is no need to log. Actually, if we increase error counter, there is no need to log.
acid-ant
commented
Agree, removed log entry. Agree, removed log entry.
|
||||
}
|
||||
|
||||
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
|
||||
func (c *cache) runFlushLoop() {
|
||||
for i := 0; i < c.workersCount; i++ {
|
||||
|
@ -62,17 +121,12 @@ func (c *cache) runFlushLoop() {
|
|||
}
|
||||
|
||||
func (c *cache) flushSmallObjects() {
|
||||
var lastKey internalKey
|
||||
var m []objectInfo
|
||||
for {
|
||||
select {
|
||||
case <-c.closeCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
m = m[:0]
|
||||
|
||||
c.modeMtx.RLock()
|
||||
if c.readOnly() {
|
||||
c.modeMtx.RUnlock()
|
||||
|
@ -86,61 +140,24 @@ func (c *cache) flushSmallObjects() {
|
|||
c.modeMtx.RUnlock()
|
||||
return
|
||||
}
|
||||
|
||||
_ = c.db.View(func(tx *badger.Txn) error {
|
||||
it := tx.NewIterator(badger.DefaultIteratorOptions)
|
||||
defer it.Close()
|
||||
if len(lastKey) == 0 {
|
||||
it.Rewind()
|
||||
} else {
|
||||
it.Seek(lastKey[:])
|
||||
if it.Valid() && bytes.Equal(it.Item().Key(), lastKey[:]) {
|
||||
it.Next()
|
||||
}
|
||||
}
|
||||
for ; it.Valid() && len(m) < flushBatchSize; it.Next() {
|
||||
if got, want := int(it.Item().KeySize()), len(lastKey); got != want {
|
||||
return fmt.Errorf("invalid db key len: got %d, want %d", got, want)
|
||||
}
|
||||
it.Item().KeyCopy(lastKey[:])
|
||||
value, err := it.Item().ValueCopy(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m = append(m, objectInfo{
|
||||
addr: lastKey.address(),
|
||||
data: value,
|
||||
})
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
var count int
|
||||
for i := range m {
|
||||
obj := objectSDK.New()
|
||||
if err := obj.Unmarshal(m[i].data); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
count++
|
||||
select {
|
||||
case c.flushCh <- obj:
|
||||
case <-c.closeCh:
|
||||
c.modeMtx.RUnlock()
|
||||
return
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.TODO())
|
||||
coll := collector{
|
||||
cache: c,
|
||||
cancel: cancel,
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
`Background`? Or provide context to `flushSmallObjects`?
acid-ant
commented
Think it is better to use global context in a separate task #642. Requires a lot of refactoring, because we need to change the signature of Think it is better to use global context in a separate task #642. Requires a lot of refactoring, because we need to change the signature of `Init` method.
|
||||
}
|
||||
|
||||
if count == 0 {
|
||||
c.modeMtx.RUnlock()
|
||||
stream := c.db.NewStream()
|
||||
// All calls to Send are done by a single goroutine
|
||||
stream.Send = coll.Send
|
||||
if err := stream.Orchestrate(ctx); err != nil {
|
||||
c.log.Debug(fmt.Sprintf(
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
`can expect` or `expects`?
acid-ant
commented
All calls to All calls to `Send` are done by a single goroutine.
|
||||
"error during flushing object from wc: %s", err))
|
||||
}
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Does it exit after all object in the database were streamed through? Does it exit after all object in the database were streamed through?
acid-ant
commented
Yes, in opposite to Yes, in opposite to `Subscribe`.
|
||||
c.modeMtx.RUnlock()
|
||||
if coll.scheduled == 0 {
|
||||
dstepanov-yadro marked this conversation as resolved
Outdated
dstepanov-yadro
commented
Please explain this line. I don't understand why the flush ends if no objects are scheduled? Please explain this line. I don't understand why the flush ends if no objects are scheduled?
acid-ant
commented
It is possible that few objects still exists in db and at the same time scheduled for flush. It is possible that few objects still exists in db and at the same time scheduled for flush.
To prevent iteration over already scheduled objects, we need to check for scheduled counter, not for processed.
This loop running by timer each second.
|
||||
break
|
||||
}
|
||||
|
||||
c.modeMtx.RUnlock()
|
||||
|
||||
c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache,
|
||||
zap.Int("count", count),
|
||||
zap.String("start", base58.Encode(lastKey[:])))
|
||||
zap.Int("scheduled", coll.scheduled), zap.Int("processed", coll.processed))
|
||||
}
|
||||
}
|
||||
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
GC interval is 1 minute by default currently. It may be beneficial to run it after each flush cycle instead. What do you think? @TrueCloudLab/storage-core-committers @TrueCloudLab/storage-core-developers GC interval is 1 minute by default currently. It may be beneficial to run it after each flush cycle instead. What do you think? @TrueCloudLab/storage-core-committers @TrueCloudLab/storage-core-developers
acid-ant
commented
If you mean badgers gs, I think that will be useful here if we scheduled some objects for deletion. We're flushing each second, could this be a problem? If you mean badgers gs, I think that will be useful here if we scheduled some objects for deletion. We're flushing each second, could this be a problem?
dstepanov-yadro
commented
I don't think it must be some correlation between badger GC and flush. I don't think it must be some correlation between badger GC and flush.
|
||||
|
@ -167,13 +184,14 @@ func (c *cache) workerFlushSmall() {
|
|||
return
|
||||
}
|
||||
|
||||
addr := objectCore.AddressOf(obj)
|
||||
err := c.flushObject(context.TODO(), obj, nil, writecache.StorageTypeDB)
|
||||
if err != nil {
|
||||
// Error is handled in flushObject.
|
||||
continue
|
||||
if err == nil {
|
||||
c.deleteFromDB([]internalKey{addr2key(addr)})
|
||||
}
|
||||
|
||||
c.deleteFromDB([]internalKey{addr2key(objectCore.AddressOf(obj))})
|
||||
c.scheduled4FlushMtx.Lock()
|
||||
delete(c.scheduled4Flush, addr)
|
||||
c.scheduled4FlushMtx.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
If it is a set, it is better to use
struct{}
, notany
, asstruct{}
has zero size and thus could be optimized by the compiler.Thanks, updated.