[#642] writecache: Remove usage of close channel in badger

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
Anton Nikiforov 2023-09-19 08:43:13 +03:00 committed by Evgenii Stratonikov
parent b0cf100427
commit c0b86f2d93
3 changed files with 30 additions and 34 deletions

View file

@ -39,18 +39,16 @@ type collector struct {
cache *cache
scheduled int
processed int
cancel func()
}
func (c *collector) Send(buf *z.Buffer) error {
func (c *collector) send(ctx context.Context, cancel func(), buf *z.Buffer) error {
list, err := badger.BufferToKVList(buf)
if err != nil {
return err
}
for _, kv := range list.Kv {
select {
case <-c.cache.closeCh:
c.cancel()
case <-ctx.Done():
return nil
default:
}
@ -58,7 +56,7 @@ func (c *collector) Send(buf *z.Buffer) error {
return nil
}
if c.scheduled >= flushBatchSize {
c.cancel()
cancel()
return nil
}
if got, want := len(kv.Key), len(internalKey{}); got != want {
@ -90,8 +88,7 @@ func (c *collector) Send(buf *z.Buffer) error {
data: val,
obj: obj,
}:
case <-c.cache.closeCh:
c.cancel()
case <-ctx.Done():
return nil
}
}
@ -99,10 +96,10 @@ func (c *collector) Send(buf *z.Buffer) error {
}
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
func (c *cache) runFlushLoop() {
func (c *cache) runFlushLoop(ctx context.Context) {
for i := 0; i < c.workersCount; i++ {
c.wg.Add(1)
go c.workerFlushSmall()
go c.workerFlushSmall(ctx)
}
c.wg.Add(1)
@ -115,19 +112,19 @@ func (c *cache) runFlushLoop() {
for {
select {
case <-tt.C:
c.flushSmallObjects()
c.flushSmallObjects(ctx)
tt.Reset(defaultFlushInterval)
case <-c.closeCh:
case <-ctx.Done():
return
}
}
}()
}
func (c *cache) flushSmallObjects() {
func (c *cache) flushSmallObjects(ctx context.Context) {
for {
select {
case <-c.closeCh:
case <-ctx.Done():
return
default:
}
@ -144,14 +141,15 @@ func (c *cache) flushSmallObjects() {
c.modeMtx.RUnlock()
return
}
ctx, cancel := context.WithCancel(context.TODO())
ctx, cancel := context.WithCancel(ctx)
coll := collector{
cache: c,
cancel: cancel,
cache: c,
}
stream := c.db.NewStream()
// All calls to Send are done by a single goroutine
stream.Send = coll.Send
stream.Send = func(buf *z.Buffer) error {
return coll.send(ctx, cancel, buf)
}
if err := stream.Orchestrate(ctx); err != nil {
c.log.Debug(fmt.Sprintf(
"error during flushing object from wc: %s", err))
@ -176,7 +174,7 @@ func (c *cache) reportFlushError(msg string, addr string, err error) {
}
// workerFlushSmall writes small objects to the main storage.
func (c *cache) workerFlushSmall() {
func (c *cache) workerFlushSmall(ctx context.Context) {
defer c.wg.Done()
var objInfo objectInfo
@ -184,11 +182,11 @@ func (c *cache) workerFlushSmall() {
// Give priority to direct put.
select {
case objInfo = <-c.flushCh:
case <-c.closeCh:
case <-ctx.Done():
return
}
err := c.flushObject(context.TODO(), objInfo.obj, objInfo.data, writecache.StorageTypeDB)
err := c.flushObject(ctx, objInfo.obj, objInfo.data, writecache.StorageTypeDB)
if err == nil {
c.deleteFromDB([]internalKey{addr2key(objInfo.addr)})
}