From 4dff9555f15145768608b130b752b4788955a4c6 Mon Sep 17 00:00:00 2001 From: Anton Nikiforov Date: Wed, 23 Aug 2023 10:05:40 +0300 Subject: [PATCH] [#568] writecache: Improve flushing scheme for badger Signed-off-by: Anton Nikiforov --- go.mod | 2 +- internal/logs/logs.go | 1 + .../writecachebadger/cachebadger.go | 9 +- .../writecache/writecachebadger/flush.go | 142 ++++++++++-------- 4 files changed, 89 insertions(+), 65 deletions(-) diff --git a/go.mod b/go.mod index cbe20e39f..114c95adb 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( git.frostfs.info/TrueCloudLab/tzhash v1.8.0 github.com/cheggaaa/pb v1.0.29 github.com/chzyer/readline v1.5.1 + github.com/dgraph-io/ristretto v0.1.1 github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568 github.com/google/uuid v1.3.0 github.com/hashicorp/golang-lru/v2 v2.0.4 @@ -42,7 +43,6 @@ require ( ) require ( - github.com/dgraph-io/ristretto v0.1.1 // indirect github.com/dustin/go-humanize v1.0.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/glog v1.1.0 // indirect diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 95961cd47..6ceee4f17 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -296,6 +296,7 @@ const ( WritecacheCantParseAddress = "can't parse address" WritecacheCantRemoveObjectFromWritecache = "can't remove object from write-cache" WritecacheDBValueLogGCRunCompleted = "value log GC run completed" + WritecacheBadgerObjAlreadyScheduled = "object already scheduled for flush" BlobovniczatreeCouldNotGetObjectFromLevel = "could not get object from level" BlobovniczatreeCouldNotReadPayloadRangeFromOpenedBlobovnicza = "could not read payload range from opened blobovnicza" BlobovniczatreeCouldNotReadPayloadRangeFromActiveBlobovnicza = "could not read payload range from active blobovnicza" diff --git a/pkg/local_object_storage/writecache/writecachebadger/cachebadger.go b/pkg/local_object_storage/writecache/writecachebadger/cachebadger.go index 837e76a0b..28aa7e766 100644 --- a/pkg/local_object_storage/writecache/writecachebadger/cachebadger.go +++ b/pkg/local_object_storage/writecache/writecachebadger/cachebadger.go @@ -21,6 +21,10 @@ type cache struct { // flushCh is a channel with objects to flush. flushCh chan *objectSDK.Object + // scheduled4Flush contains objects scheduled for flush via flushCh + // helps to avoid multiple flushing of one object + scheduled4Flush map[oid.Address]struct{} + scheduled4FlushMtx sync.RWMutex // closeCh is close channel, protected by modeMtx. closeCh chan struct{} // wg is a wait group for flush workers. @@ -47,8 +51,9 @@ const ( // New creates new writecache instance. func New(opts ...Option) writecache.Cache { c := &cache{ - flushCh: make(chan *objectSDK.Object), - mode: mode.ReadWrite, + flushCh: make(chan *objectSDK.Object), + mode: mode.ReadWrite, + scheduled4Flush: map[oid.Address]struct{}{}, options: options{ log: &logger.Logger{Logger: zap.NewNop()}, diff --git a/pkg/local_object_storage/writecache/writecachebadger/flush.go b/pkg/local_object_storage/writecache/writecachebadger/flush.go index 7f41920ba..ab6910a80 100644 --- a/pkg/local_object_storage/writecache/writecachebadger/flush.go +++ b/pkg/local_object_storage/writecache/writecachebadger/flush.go @@ -18,7 +18,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "github.com/dgraph-io/badger/v4" - "github.com/mr-tron/base58" + "github.com/dgraph-io/ristretto/z" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -35,6 +35,65 @@ const ( defaultFlushInterval = time.Second ) +type collector struct { + cache *cache + scheduled int + processed int + cancel func() +} + +func (c *collector) Send(buf *z.Buffer) error { + list, err := badger.BufferToKVList(buf) + if err != nil { + return err + } + for _, kv := range list.Kv { + select { + case <-c.cache.closeCh: + c.cancel() + return nil + default: + } + if kv.StreamDone { + return nil + } + if c.scheduled >= flushBatchSize { + c.cancel() + return nil + } + if got, want := len(kv.Key), len(internalKey{}); got != want { + c.cache.log.Debug( + fmt.Sprintf("not expected db key len: got %d, want %d", got, want)) + continue + } + c.processed++ + obj := objectSDK.New() + val := bytes.Clone(kv.Value) + if err = obj.Unmarshal(val); err != nil { + continue + } + addr := objectCore.AddressOf(obj) + c.cache.scheduled4FlushMtx.RLock() + _, ok := c.cache.scheduled4Flush[addr] + c.cache.scheduled4FlushMtx.RUnlock() + if ok { + c.cache.log.Debug(logs.WritecacheBadgerObjAlreadyScheduled, zap.Stringer("obj", addr)) + continue + } + c.cache.scheduled4FlushMtx.Lock() + c.cache.scheduled4Flush[addr] = struct{}{} + c.cache.scheduled4FlushMtx.Unlock() + c.scheduled++ + select { + case c.cache.flushCh <- obj: + case <-c.cache.closeCh: + c.cancel() + return nil + } + } + return nil +} + // runFlushLoop starts background workers which periodically flush objects to the blobstor. func (c *cache) runFlushLoop() { for i := 0; i < c.workersCount; i++ { @@ -62,17 +121,12 @@ func (c *cache) runFlushLoop() { } func (c *cache) flushSmallObjects() { - var lastKey internalKey - var m []objectInfo for { select { case <-c.closeCh: return default: } - - m = m[:0] - c.modeMtx.RLock() if c.readOnly() { c.modeMtx.RUnlock() @@ -86,61 +140,24 @@ func (c *cache) flushSmallObjects() { c.modeMtx.RUnlock() return } - - _ = c.db.View(func(tx *badger.Txn) error { - it := tx.NewIterator(badger.DefaultIteratorOptions) - defer it.Close() - if len(lastKey) == 0 { - it.Rewind() - } else { - it.Seek(lastKey[:]) - if it.Valid() && bytes.Equal(it.Item().Key(), lastKey[:]) { - it.Next() - } - } - for ; it.Valid() && len(m) < flushBatchSize; it.Next() { - if got, want := int(it.Item().KeySize()), len(lastKey); got != want { - return fmt.Errorf("invalid db key len: got %d, want %d", got, want) - } - it.Item().KeyCopy(lastKey[:]) - value, err := it.Item().ValueCopy(nil) - if err != nil { - return err - } - m = append(m, objectInfo{ - addr: lastKey.address(), - data: value, - }) - } - return nil - }) - - var count int - for i := range m { - obj := objectSDK.New() - if err := obj.Unmarshal(m[i].data); err != nil { - continue - } - - count++ - select { - case c.flushCh <- obj: - case <-c.closeCh: - c.modeMtx.RUnlock() - return - } + ctx, cancel := context.WithCancel(context.TODO()) + coll := collector{ + cache: c, + cancel: cancel, } - - if count == 0 { - c.modeMtx.RUnlock() + stream := c.db.NewStream() + // All calls to Send are done by a single goroutine + stream.Send = coll.Send + if err := stream.Orchestrate(ctx); err != nil { + c.log.Debug(fmt.Sprintf( + "error during flushing object from wc: %s", err)) + } + c.modeMtx.RUnlock() + if coll.scheduled == 0 { break } - - c.modeMtx.RUnlock() - c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache, - zap.Int("count", count), - zap.String("start", base58.Encode(lastKey[:]))) + zap.Int("scheduled", coll.scheduled), zap.Int("processed", coll.processed)) } } @@ -167,13 +184,14 @@ func (c *cache) workerFlushSmall() { return } + addr := objectCore.AddressOf(obj) err := c.flushObject(context.TODO(), obj, nil, writecache.StorageTypeDB) - if err != nil { - // Error is handled in flushObject. - continue + if err == nil { + c.deleteFromDB([]internalKey{addr2key(addr)}) } - - c.deleteFromDB([]internalKey{addr2key(objectCore.AddressOf(obj))}) + c.scheduled4FlushMtx.Lock() + delete(c.scheduled4Flush, addr) + c.scheduled4FlushMtx.Unlock() } }