[#726] writecache: Fix small object flush for BBolt
Do not marshal object twice. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
bd5bf8b1a9
commit
997ac7cd8d
2 changed files with 9 additions and 9 deletions
|
@ -29,7 +29,7 @@ type cache struct {
|
||||||
compressFlags map[string]struct{}
|
compressFlags map[string]struct{}
|
||||||
|
|
||||||
// flushCh is a channel with objects to flush.
|
// flushCh is a channel with objects to flush.
|
||||||
flushCh chan *objectSDK.Object
|
flushCh chan objectInfo
|
||||||
// closeCh is close channel, protected by modeMtx.
|
// closeCh is close channel, protected by modeMtx.
|
||||||
closeCh chan struct{}
|
closeCh chan struct{}
|
||||||
// wg is a wait group for flush workers.
|
// wg is a wait group for flush workers.
|
||||||
|
@ -62,7 +62,7 @@ var (
|
||||||
// New creates new writecache instance.
|
// New creates new writecache instance.
|
||||||
func New(opts ...Option) writecache.Cache {
|
func New(opts ...Option) writecache.Cache {
|
||||||
c := &cache{
|
c := &cache{
|
||||||
flushCh: make(chan *objectSDK.Object),
|
flushCh: make(chan objectInfo),
|
||||||
mode: mode.ReadWrite,
|
mode: mode.ReadWrite,
|
||||||
|
|
||||||
compressFlags: make(map[string]struct{}),
|
compressFlags: make(map[string]struct{}),
|
||||||
|
|
|
@ -79,7 +79,6 @@ func (c *cache) runFlushLoop() {
|
||||||
|
|
||||||
func (c *cache) flushSmallObjects() {
|
func (c *cache) flushSmallObjects() {
|
||||||
var lastKey []byte
|
var lastKey []byte
|
||||||
var m []objectInfo
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-c.closeCh:
|
case <-c.closeCh:
|
||||||
|
@ -87,7 +86,7 @@ func (c *cache) flushSmallObjects() {
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
m = m[:0]
|
var m []objectInfo
|
||||||
|
|
||||||
c.modeMtx.RLock()
|
c.modeMtx.RLock()
|
||||||
if c.readOnly() {
|
if c.readOnly() {
|
||||||
|
@ -133,10 +132,11 @@ func (c *cache) flushSmallObjects() {
|
||||||
if err := obj.Unmarshal(m[i].data); err != nil {
|
if err := obj.Unmarshal(m[i].data); err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
m[i].obj = obj
|
||||||
|
|
||||||
count++
|
count++
|
||||||
select {
|
select {
|
||||||
case c.flushCh <- obj:
|
case c.flushCh <- m[i]:
|
||||||
case <-c.closeCh:
|
case <-c.closeCh:
|
||||||
c.modeMtx.RUnlock()
|
c.modeMtx.RUnlock()
|
||||||
return
|
return
|
||||||
|
@ -231,22 +231,22 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
|
||||||
func (c *cache) workerFlushSmall() {
|
func (c *cache) workerFlushSmall() {
|
||||||
defer c.wg.Done()
|
defer c.wg.Done()
|
||||||
|
|
||||||
var obj *objectSDK.Object
|
var objInfo objectInfo
|
||||||
for {
|
for {
|
||||||
// Give priority to direct put.
|
// Give priority to direct put.
|
||||||
select {
|
select {
|
||||||
case obj = <-c.flushCh:
|
case objInfo = <-c.flushCh:
|
||||||
case <-c.closeCh:
|
case <-c.closeCh:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err := c.flushObject(context.TODO(), obj, nil, writecache.StorageTypeDB)
|
err := c.flushObject(context.TODO(), objInfo.obj, objInfo.data, writecache.StorageTypeDB)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Error is handled in flushObject.
|
// Error is handled in flushObject.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
c.deleteFromDB(objectCore.AddressOf(obj).EncodeToString())
|
c.deleteFromDB(objInfo.addr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue