[#726] writecache: Fix small object flush for Badger
All checks were successful
DCO action / DCO (pull_request) Successful in 2m25s
Vulncheck / Vulncheck (pull_request) Successful in 2m41s
Build / Build Components (1.21) (pull_request) Successful in 3m2s
Build / Build Components (1.20) (pull_request) Successful in 3m17s
Tests and linters / Staticcheck (pull_request) Successful in 4m26s
Tests and linters / Tests (1.20) (pull_request) Successful in 6m9s
Tests and linters / Tests (1.21) (pull_request) Successful in 18m15s
Tests and linters / Lint (pull_request) Successful in 20m21s
Tests and linters / Tests with -race (pull_request) Successful in 18m2s
All checks were successful
DCO action / DCO (pull_request) Successful in 2m25s
Vulncheck / Vulncheck (pull_request) Successful in 2m41s
Build / Build Components (1.21) (pull_request) Successful in 3m2s
Build / Build Components (1.20) (pull_request) Successful in 3m17s
Tests and linters / Staticcheck (pull_request) Successful in 4m26s
Tests and linters / Tests (1.20) (pull_request) Successful in 6m9s
Tests and linters / Tests (1.21) (pull_request) Successful in 18m15s
Tests and linters / Lint (pull_request) Successful in 20m21s
Tests and linters / Tests with -race (pull_request) Successful in 18m2s
Do not marshal object twice. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
997ac7cd8d
commit
d07afd803c
2 changed files with 12 additions and 9 deletions
|
@ -21,7 +21,7 @@ type cache struct {
|
||||||
modeMtx sync.RWMutex
|
modeMtx sync.RWMutex
|
||||||
|
|
||||||
// flushCh is a channel with objects to flush.
|
// flushCh is a channel with objects to flush.
|
||||||
flushCh chan *objectSDK.Object
|
flushCh chan objectInfo
|
||||||
// scheduled4Flush contains objects scheduled for flush via flushCh
|
// scheduled4Flush contains objects scheduled for flush via flushCh
|
||||||
// helps to avoid multiple flushing of one object
|
// helps to avoid multiple flushing of one object
|
||||||
scheduled4Flush map[oid.Address]struct{}
|
scheduled4Flush map[oid.Address]struct{}
|
||||||
|
@ -52,7 +52,7 @@ const (
|
||||||
// New creates new writecache instance.
|
// New creates new writecache instance.
|
||||||
func New(opts ...Option) writecache.Cache {
|
func New(opts ...Option) writecache.Cache {
|
||||||
c := &cache{
|
c := &cache{
|
||||||
flushCh: make(chan *objectSDK.Object),
|
flushCh: make(chan objectInfo),
|
||||||
mode: mode.ReadWrite,
|
mode: mode.ReadWrite,
|
||||||
scheduled4Flush: map[oid.Address]struct{}{},
|
scheduled4Flush: map[oid.Address]struct{}{},
|
||||||
|
|
||||||
|
|
|
@ -85,7 +85,11 @@ func (c *collector) Send(buf *z.Buffer) error {
|
||||||
c.cache.scheduled4FlushMtx.Unlock()
|
c.cache.scheduled4FlushMtx.Unlock()
|
||||||
c.scheduled++
|
c.scheduled++
|
||||||
select {
|
select {
|
||||||
case c.cache.flushCh <- obj:
|
case c.cache.flushCh <- objectInfo{
|
||||||
|
addr: addr,
|
||||||
|
data: val,
|
||||||
|
obj: obj,
|
||||||
|
}:
|
||||||
case <-c.cache.closeCh:
|
case <-c.cache.closeCh:
|
||||||
c.cancel()
|
c.cancel()
|
||||||
return nil
|
return nil
|
||||||
|
@ -175,22 +179,21 @@ func (c *cache) reportFlushError(msg string, addr string, err error) {
|
||||||
func (c *cache) workerFlushSmall() {
|
func (c *cache) workerFlushSmall() {
|
||||||
defer c.wg.Done()
|
defer c.wg.Done()
|
||||||
|
|
||||||
var obj *objectSDK.Object
|
var objInfo objectInfo
|
||||||
for {
|
for {
|
||||||
// Give priority to direct put.
|
// Give priority to direct put.
|
||||||
select {
|
select {
|
||||||
case obj = <-c.flushCh:
|
case objInfo = <-c.flushCh:
|
||||||
case <-c.closeCh:
|
case <-c.closeCh:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
addr := objectCore.AddressOf(obj)
|
err := c.flushObject(context.TODO(), objInfo.obj, objInfo.data, writecache.StorageTypeDB)
|
||||||
err := c.flushObject(context.TODO(), obj, nil, writecache.StorageTypeDB)
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
c.deleteFromDB([]internalKey{addr2key(addr)})
|
c.deleteFromDB([]internalKey{addr2key(objInfo.addr)})
|
||||||
}
|
}
|
||||||
c.scheduled4FlushMtx.Lock()
|
c.scheduled4FlushMtx.Lock()
|
||||||
delete(c.scheduled4Flush, addr)
|
delete(c.scheduled4Flush, objInfo.addr)
|
||||||
c.scheduled4FlushMtx.Unlock()
|
c.scheduled4FlushMtx.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue