writecache: Fix small object flush #726

Merged
fyrchik merged 2 commits from dstepanov-yadro/frostfs-node:feat/fix_flush_small_objects into master 2024-09-04 19:51:03 +00:00
4 changed files with 21 additions and 18 deletions

View file

@ -21,7 +21,7 @@ type cache struct {
modeMtx sync.RWMutex
// flushCh is a channel with objects to flush.
flushCh chan *objectSDK.Object
flushCh chan objectInfo
// scheduled4Flush contains objects scheduled for flush via flushCh
// helps to avoid multiple flushing of one object
scheduled4Flush map[oid.Address]struct{}
@ -52,7 +52,7 @@ const (
// New creates new writecache instance.
func New(opts ...Option) writecache.Cache {
c := &cache{
flushCh: make(chan *objectSDK.Object),
flushCh: make(chan objectInfo),
mode: mode.ReadWrite,
scheduled4Flush: map[oid.Address]struct{}{},

View file

@ -85,7 +85,11 @@ func (c *collector) Send(buf *z.Buffer) error {
c.cache.scheduled4FlushMtx.Unlock()
c.scheduled++
select {
case c.cache.flushCh <- obj:
case c.cache.flushCh <- objectInfo{
addr: addr,
data: val,
obj: obj,
}:
case <-c.cache.closeCh:
c.cancel()
return nil
@ -175,22 +179,21 @@ func (c *cache) reportFlushError(msg string, addr string, err error) {
func (c *cache) workerFlushSmall() {
defer c.wg.Done()
var obj *objectSDK.Object
var objInfo objectInfo
for {
// Give priority to direct put.
select {
case obj = <-c.flushCh:
case objInfo = <-c.flushCh:
case <-c.closeCh:
return
}
addr := objectCore.AddressOf(obj)
err := c.flushObject(context.TODO(), obj, nil, writecache.StorageTypeDB)
err := c.flushObject(context.TODO(), objInfo.obj, objInfo.data, writecache.StorageTypeDB)
if err == nil {
c.deleteFromDB([]internalKey{addr2key(addr)})
c.deleteFromDB([]internalKey{addr2key(objInfo.addr)})
}
c.scheduled4FlushMtx.Lock()
delete(c.scheduled4Flush, addr)
delete(c.scheduled4Flush, objInfo.addr)
c.scheduled4FlushMtx.Unlock()
}
}

View file

@ -29,7 +29,7 @@ type cache struct {
compressFlags map[string]struct{}
// flushCh is a channel with objects to flush.
flushCh chan *objectSDK.Object
flushCh chan objectInfo
// closeCh is close channel, protected by modeMtx.
closeCh chan struct{}
// wg is a wait group for flush workers.
@ -62,7 +62,7 @@ var (
// New creates new writecache instance.
func New(opts ...Option) writecache.Cache {
c := &cache{
flushCh: make(chan *objectSDK.Object),
flushCh: make(chan objectInfo),
mode: mode.ReadWrite,
compressFlags: make(map[string]struct{}),

View file

@ -79,7 +79,6 @@ func (c *cache) runFlushLoop() {
func (c *cache) flushSmallObjects() {
var lastKey []byte
var m []objectInfo
for {
select {
case <-c.closeCh:
@ -87,7 +86,7 @@ func (c *cache) flushSmallObjects() {
default:
}
m = m[:0]
var m []objectInfo
c.modeMtx.RLock()
if c.readOnly() {
@ -133,10 +132,11 @@ func (c *cache) flushSmallObjects() {
if err := obj.Unmarshal(m[i].data); err != nil {
continue
}
m[i].obj = obj
count++
select {
case c.flushCh <- obj:
case c.flushCh <- m[i]:
case <-c.closeCh:
fyrchik marked this conversation as resolved Outdated

At line 90 we do m = m[:0], can this lead to problems?

At line 90 we do `m = m[:0]`, can this lead to problems?

IMO this optimization is more important, then slice reuse, so we may drop line 90 if needed.

IMO this optimization is more important, then slice reuse, so we may drop line 90 if needed.
c.modeMtx.RUnlock()
return
@ -231,22 +231,22 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
func (c *cache) workerFlushSmall() {
defer c.wg.Done()
var obj *objectSDK.Object
var objInfo objectInfo
for {
// Give priority to direct put.
select {
case obj = <-c.flushCh:
case objInfo = <-c.flushCh:
case <-c.closeCh:
return
}
err := c.flushObject(context.TODO(), obj, nil, writecache.StorageTypeDB)
err := c.flushObject(context.TODO(), objInfo.obj, objInfo.data, writecache.StorageTypeDB)
if err != nil {
// Error is handled in flushObject.
continue
}
c.deleteFromDB(objectCore.AddressOf(obj).EncodeToString())
c.deleteFromDB(objInfo.addr)
}
}