writecache: Fix small object flush #726
4 changed files with 21 additions and 18 deletions
|
@ -21,7 +21,7 @@ type cache struct {
|
|||
modeMtx sync.RWMutex
|
||||
|
||||
// flushCh is a channel with objects to flush.
|
||||
flushCh chan *objectSDK.Object
|
||||
flushCh chan objectInfo
|
||||
// scheduled4Flush contains objects scheduled for flush via flushCh
|
||||
// helps to avoid multiple flushing of one object
|
||||
scheduled4Flush map[oid.Address]struct{}
|
||||
|
@ -52,7 +52,7 @@ const (
|
|||
// New creates new writecache instance.
|
||||
func New(opts ...Option) writecache.Cache {
|
||||
c := &cache{
|
||||
flushCh: make(chan *objectSDK.Object),
|
||||
flushCh: make(chan objectInfo),
|
||||
mode: mode.ReadWrite,
|
||||
scheduled4Flush: map[oid.Address]struct{}{},
|
||||
|
||||
|
|
|
@ -85,7 +85,11 @@ func (c *collector) Send(buf *z.Buffer) error {
|
|||
c.cache.scheduled4FlushMtx.Unlock()
|
||||
c.scheduled++
|
||||
select {
|
||||
case c.cache.flushCh <- obj:
|
||||
case c.cache.flushCh <- objectInfo{
|
||||
addr: addr,
|
||||
data: val,
|
||||
obj: obj,
|
||||
}:
|
||||
case <-c.cache.closeCh:
|
||||
c.cancel()
|
||||
return nil
|
||||
|
@ -175,22 +179,21 @@ func (c *cache) reportFlushError(msg string, addr string, err error) {
|
|||
func (c *cache) workerFlushSmall() {
|
||||
defer c.wg.Done()
|
||||
|
||||
var obj *objectSDK.Object
|
||||
var objInfo objectInfo
|
||||
for {
|
||||
// Give priority to direct put.
|
||||
select {
|
||||
case obj = <-c.flushCh:
|
||||
case objInfo = <-c.flushCh:
|
||||
case <-c.closeCh:
|
||||
return
|
||||
}
|
||||
|
||||
addr := objectCore.AddressOf(obj)
|
||||
err := c.flushObject(context.TODO(), obj, nil, writecache.StorageTypeDB)
|
||||
err := c.flushObject(context.TODO(), objInfo.obj, objInfo.data, writecache.StorageTypeDB)
|
||||
if err == nil {
|
||||
c.deleteFromDB([]internalKey{addr2key(addr)})
|
||||
c.deleteFromDB([]internalKey{addr2key(objInfo.addr)})
|
||||
}
|
||||
c.scheduled4FlushMtx.Lock()
|
||||
delete(c.scheduled4Flush, addr)
|
||||
delete(c.scheduled4Flush, objInfo.addr)
|
||||
c.scheduled4FlushMtx.Unlock()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,7 +29,7 @@ type cache struct {
|
|||
compressFlags map[string]struct{}
|
||||
|
||||
// flushCh is a channel with objects to flush.
|
||||
flushCh chan *objectSDK.Object
|
||||
flushCh chan objectInfo
|
||||
// closeCh is close channel, protected by modeMtx.
|
||||
closeCh chan struct{}
|
||||
// wg is a wait group for flush workers.
|
||||
|
@ -62,7 +62,7 @@ var (
|
|||
// New creates new writecache instance.
|
||||
func New(opts ...Option) writecache.Cache {
|
||||
c := &cache{
|
||||
flushCh: make(chan *objectSDK.Object),
|
||||
flushCh: make(chan objectInfo),
|
||||
mode: mode.ReadWrite,
|
||||
|
||||
compressFlags: make(map[string]struct{}),
|
||||
|
|
|
@ -79,7 +79,6 @@ func (c *cache) runFlushLoop() {
|
|||
|
||||
func (c *cache) flushSmallObjects() {
|
||||
var lastKey []byte
|
||||
var m []objectInfo
|
||||
for {
|
||||
select {
|
||||
case <-c.closeCh:
|
||||
|
@ -87,7 +86,7 @@ func (c *cache) flushSmallObjects() {
|
|||
default:
|
||||
}
|
||||
|
||||
m = m[:0]
|
||||
var m []objectInfo
|
||||
|
||||
c.modeMtx.RLock()
|
||||
if c.readOnly() {
|
||||
|
@ -133,10 +132,11 @@ func (c *cache) flushSmallObjects() {
|
|||
if err := obj.Unmarshal(m[i].data); err != nil {
|
||||
continue
|
||||
}
|
||||
m[i].obj = obj
|
||||
|
||||
count++
|
||||
select {
|
||||
case c.flushCh <- obj:
|
||||
case c.flushCh <- m[i]:
|
||||
case <-c.closeCh:
|
||||
c.modeMtx.RUnlock()
|
||||
return
|
||||
|
@ -231,22 +231,22 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
|
|||
func (c *cache) workerFlushSmall() {
|
||||
defer c.wg.Done()
|
||||
|
||||
var obj *objectSDK.Object
|
||||
var objInfo objectInfo
|
||||
for {
|
||||
// Give priority to direct put.
|
||||
select {
|
||||
case obj = <-c.flushCh:
|
||||
case objInfo = <-c.flushCh:
|
||||
case <-c.closeCh:
|
||||
return
|
||||
}
|
||||
|
||||
err := c.flushObject(context.TODO(), obj, nil, writecache.StorageTypeDB)
|
||||
err := c.flushObject(context.TODO(), objInfo.obj, objInfo.data, writecache.StorageTypeDB)
|
||||
if err != nil {
|
||||
// Error is handled in flushObject.
|
||||
continue
|
||||
}
|
||||
|
||||
c.deleteFromDB(objectCore.AddressOf(obj).EncodeToString())
|
||||
c.deleteFromDB(objInfo.addr)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue