From d07afd803ccb1dc111e545d8ac27c37987b3831c Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Fri, 6 Oct 2023 11:31:40 +0300 Subject: [PATCH] [#726] writecache: Fix small object flush for Badger Do not marshal object twice. Signed-off-by: Dmitrii Stepanov --- .../writecache/writecachebadger/cachebadger.go | 4 ++-- .../writecache/writecachebadger/flush.go | 17 ++++++++++------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/pkg/local_object_storage/writecache/writecachebadger/cachebadger.go b/pkg/local_object_storage/writecache/writecachebadger/cachebadger.go index 6dd4755b..d5da7763 100644 --- a/pkg/local_object_storage/writecache/writecachebadger/cachebadger.go +++ b/pkg/local_object_storage/writecache/writecachebadger/cachebadger.go @@ -21,7 +21,7 @@ type cache struct { modeMtx sync.RWMutex // flushCh is a channel with objects to flush. - flushCh chan *objectSDK.Object + flushCh chan objectInfo // scheduled4Flush contains objects scheduled for flush via flushCh // helps to avoid multiple flushing of one object scheduled4Flush map[oid.Address]struct{} @@ -52,7 +52,7 @@ const ( // New creates new writecache instance. func New(opts ...Option) writecache.Cache { c := &cache{ - flushCh: make(chan *objectSDK.Object), + flushCh: make(chan objectInfo), mode: mode.ReadWrite, scheduled4Flush: map[oid.Address]struct{}{}, diff --git a/pkg/local_object_storage/writecache/writecachebadger/flush.go b/pkg/local_object_storage/writecache/writecachebadger/flush.go index ab6910a8..3f2bdb04 100644 --- a/pkg/local_object_storage/writecache/writecachebadger/flush.go +++ b/pkg/local_object_storage/writecache/writecachebadger/flush.go @@ -85,7 +85,11 @@ func (c *collector) Send(buf *z.Buffer) error { c.cache.scheduled4FlushMtx.Unlock() c.scheduled++ select { - case c.cache.flushCh <- obj: + case c.cache.flushCh <- objectInfo{ + addr: addr, + data: val, + obj: obj, + }: case <-c.cache.closeCh: c.cancel() return nil @@ -175,22 +179,21 @@ func (c *cache) reportFlushError(msg string, addr string, err error) { func (c *cache) workerFlushSmall() { defer c.wg.Done() - var obj *objectSDK.Object + var objInfo objectInfo for { // Give priority to direct put. select { - case obj = <-c.flushCh: + case objInfo = <-c.flushCh: case <-c.closeCh: return } - addr := objectCore.AddressOf(obj) - err := c.flushObject(context.TODO(), obj, nil, writecache.StorageTypeDB) + err := c.flushObject(context.TODO(), objInfo.obj, objInfo.data, writecache.StorageTypeDB) if err == nil { - c.deleteFromDB([]internalKey{addr2key(addr)}) + c.deleteFromDB([]internalKey{addr2key(objInfo.addr)}) } c.scheduled4FlushMtx.Lock() - delete(c.scheduled4Flush, addr) + delete(c.scheduled4Flush, objInfo.addr) c.scheduled4FlushMtx.Unlock() } }