writecache: Fix small object flush #726

Merged
fyrchik merged 2 commits from dstepanov-yadro/frostfs-node:feat/fix_flush_small_objects into master 2024-09-04 19:51:03 +00:00
4 changed files with 21 additions and 18 deletions

View file

@ -21,7 +21,7 @@ type cache struct {
modeMtx sync.RWMutex modeMtx sync.RWMutex
// flushCh is a channel with objects to flush. // flushCh is a channel with objects to flush.
flushCh chan *objectSDK.Object flushCh chan objectInfo
// scheduled4Flush contains objects scheduled for flush via flushCh // scheduled4Flush contains objects scheduled for flush via flushCh
// helps to avoid multiple flushing of one object // helps to avoid multiple flushing of one object
scheduled4Flush map[oid.Address]struct{} scheduled4Flush map[oid.Address]struct{}
@ -52,7 +52,7 @@ const (
// New creates new writecache instance. // New creates new writecache instance.
func New(opts ...Option) writecache.Cache { func New(opts ...Option) writecache.Cache {
c := &cache{ c := &cache{
flushCh: make(chan *objectSDK.Object), flushCh: make(chan objectInfo),
mode: mode.ReadWrite, mode: mode.ReadWrite,
scheduled4Flush: map[oid.Address]struct{}{}, scheduled4Flush: map[oid.Address]struct{}{},

View file

@ -85,7 +85,11 @@ func (c *collector) Send(buf *z.Buffer) error {
c.cache.scheduled4FlushMtx.Unlock() c.cache.scheduled4FlushMtx.Unlock()
c.scheduled++ c.scheduled++
select { select {
case c.cache.flushCh <- obj: case c.cache.flushCh <- objectInfo{
addr: addr,
data: val,
obj: obj,
}:
case <-c.cache.closeCh: case <-c.cache.closeCh:
c.cancel() c.cancel()
return nil return nil
@ -175,22 +179,21 @@ func (c *cache) reportFlushError(msg string, addr string, err error) {
func (c *cache) workerFlushSmall() { func (c *cache) workerFlushSmall() {
defer c.wg.Done() defer c.wg.Done()
var obj *objectSDK.Object var objInfo objectInfo
for { for {
// Give priority to direct put. // Give priority to direct put.
select { select {
case obj = <-c.flushCh: case objInfo = <-c.flushCh:
case <-c.closeCh: case <-c.closeCh:
return return
} }
addr := objectCore.AddressOf(obj) err := c.flushObject(context.TODO(), objInfo.obj, objInfo.data, writecache.StorageTypeDB)
err := c.flushObject(context.TODO(), obj, nil, writecache.StorageTypeDB)
if err == nil { if err == nil {
c.deleteFromDB([]internalKey{addr2key(addr)}) c.deleteFromDB([]internalKey{addr2key(objInfo.addr)})
} }
c.scheduled4FlushMtx.Lock() c.scheduled4FlushMtx.Lock()
delete(c.scheduled4Flush, addr) delete(c.scheduled4Flush, objInfo.addr)
c.scheduled4FlushMtx.Unlock() c.scheduled4FlushMtx.Unlock()
} }
} }

View file

@ -29,7 +29,7 @@ type cache struct {
compressFlags map[string]struct{} compressFlags map[string]struct{}
// flushCh is a channel with objects to flush. // flushCh is a channel with objects to flush.
flushCh chan *objectSDK.Object flushCh chan objectInfo
// closeCh is close channel, protected by modeMtx. // closeCh is close channel, protected by modeMtx.
closeCh chan struct{} closeCh chan struct{}
// wg is a wait group for flush workers. // wg is a wait group for flush workers.
@ -62,7 +62,7 @@ var (
// New creates new writecache instance. // New creates new writecache instance.
func New(opts ...Option) writecache.Cache { func New(opts ...Option) writecache.Cache {
c := &cache{ c := &cache{
flushCh: make(chan *objectSDK.Object), flushCh: make(chan objectInfo),
mode: mode.ReadWrite, mode: mode.ReadWrite,
compressFlags: make(map[string]struct{}), compressFlags: make(map[string]struct{}),

View file

@ -79,7 +79,6 @@ func (c *cache) runFlushLoop() {
func (c *cache) flushSmallObjects() { func (c *cache) flushSmallObjects() {
var lastKey []byte var lastKey []byte
var m []objectInfo
for { for {
select { select {
case <-c.closeCh: case <-c.closeCh:
@ -87,7 +86,7 @@ func (c *cache) flushSmallObjects() {
default: default:
} }
m = m[:0] var m []objectInfo
c.modeMtx.RLock() c.modeMtx.RLock()
if c.readOnly() { if c.readOnly() {
@ -133,10 +132,11 @@ func (c *cache) flushSmallObjects() {
if err := obj.Unmarshal(m[i].data); err != nil { if err := obj.Unmarshal(m[i].data); err != nil {
continue continue
} }
m[i].obj = obj
count++ count++
select { select {
case c.flushCh <- obj: case c.flushCh <- m[i]:
case <-c.closeCh: case <-c.closeCh:
c.modeMtx.RUnlock() c.modeMtx.RUnlock()
return return
@ -231,22 +231,22 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
func (c *cache) workerFlushSmall() { func (c *cache) workerFlushSmall() {
defer c.wg.Done() defer c.wg.Done()
var obj *objectSDK.Object var objInfo objectInfo
for { for {
// Give priority to direct put. // Give priority to direct put.
select { select {
case obj = <-c.flushCh: case objInfo = <-c.flushCh:
case <-c.closeCh: case <-c.closeCh:
return return
} }
err := c.flushObject(context.TODO(), obj, nil, writecache.StorageTypeDB) err := c.flushObject(context.TODO(), objInfo.obj, objInfo.data, writecache.StorageTypeDB)
if err != nil { if err != nil {
// Error is handled in flushObject. // Error is handled in flushObject.
continue continue
} }
c.deleteFromDB(objectCore.AddressOf(obj).EncodeToString()) c.deleteFromDB(objInfo.addr)
} }
} }