diff --git a/pkg/local_object_storage/writecache/writecachebbolt/cachebbolt.go b/pkg/local_object_storage/writecache/writecachebbolt/cachebbolt.go index 11bdbe07..363ee844 100644 --- a/pkg/local_object_storage/writecache/writecachebbolt/cachebbolt.go +++ b/pkg/local_object_storage/writecache/writecachebbolt/cachebbolt.go @@ -29,7 +29,7 @@ type cache struct { compressFlags map[string]struct{} // flushCh is a channel with objects to flush. - flushCh chan *objectSDK.Object + flushCh chan objectInfo // closeCh is close channel, protected by modeMtx. closeCh chan struct{} // wg is a wait group for flush workers. @@ -62,7 +62,7 @@ var ( // New creates new writecache instance. func New(opts ...Option) writecache.Cache { c := &cache{ - flushCh: make(chan *objectSDK.Object), + flushCh: make(chan objectInfo), mode: mode.ReadWrite, compressFlags: make(map[string]struct{}), diff --git a/pkg/local_object_storage/writecache/writecachebbolt/flush.go b/pkg/local_object_storage/writecache/writecachebbolt/flush.go index c3b0f89b..89b26cd6 100644 --- a/pkg/local_object_storage/writecache/writecachebbolt/flush.go +++ b/pkg/local_object_storage/writecache/writecachebbolt/flush.go @@ -79,7 +79,6 @@ func (c *cache) runFlushLoop() { func (c *cache) flushSmallObjects() { var lastKey []byte - var m []objectInfo for { select { case <-c.closeCh: @@ -87,7 +86,7 @@ func (c *cache) flushSmallObjects() { default: } - m = m[:0] + var m []objectInfo c.modeMtx.RLock() if c.readOnly() { @@ -133,10 +132,11 @@ func (c *cache) flushSmallObjects() { if err := obj.Unmarshal(m[i].data); err != nil { continue } + m[i].obj = obj count++ select { - case c.flushCh <- obj: + case c.flushCh <- m[i]: case <-c.closeCh: c.modeMtx.RUnlock() return @@ -231,22 +231,22 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { func (c *cache) workerFlushSmall() { defer c.wg.Done() - var obj *objectSDK.Object + var objInfo objectInfo for { // Give priority to direct put. select { - case obj = <-c.flushCh: + case objInfo = <-c.flushCh: case <-c.closeCh: return } - err := c.flushObject(context.TODO(), obj, nil, writecache.StorageTypeDB) + err := c.flushObject(context.TODO(), objInfo.obj, objInfo.data, writecache.StorageTypeDB) if err != nil { // Error is handled in flushObject. continue } - c.deleteFromDB(objectCore.AddressOf(obj).EncodeToString()) + c.deleteFromDB(objInfo.addr) } }