[#642] writecache: Remove usage of close channel in bbolt

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
Anton Nikiforov 2023-09-19 08:46:19 +03:00 committed by Evgenii Stratonikov
parent c0b86f2d93
commit 559ad58ab1
2 changed files with 19 additions and 31 deletions

View file

@ -36,20 +36,10 @@ const (
)
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
func (c *cache) runFlushLoop() {
ctx, cancel := context.WithCancel(context.Background())
ch := c.closeCh
c.wg.Add(1)
go func() {
<-ch
cancel()
c.wg.Done()
}()
func (c *cache) runFlushLoop(ctx context.Context) {
for i := 0; i < c.workersCount; i++ {
c.wg.Add(1)
go c.workerFlushSmall()
go c.workerFlushSmall(ctx)
}
c.wg.Add(1)
@ -68,20 +58,20 @@ func (c *cache) runFlushLoop() {
for {
select {
case <-tt.C:
c.flushSmallObjects()
c.flushSmallObjects(ctx)
tt.Reset(defaultFlushInterval)
case <-c.closeCh:
case <-ctx.Done():
return
}
}
}()
}
func (c *cache) flushSmallObjects() {
func (c *cache) flushSmallObjects(ctx context.Context) {
var lastKey []byte
for {
select {
case <-c.closeCh:
case <-ctx.Done():
return
default:
}
@ -137,7 +127,7 @@ func (c *cache) flushSmallObjects() {
count++
select {
case c.flushCh <- m[i]:
case <-c.closeCh:
case <-ctx.Done():
c.modeMtx.RUnlock()
return
}
@ -170,7 +160,7 @@ func (c *cache) workerFlushBig(ctx context.Context) {
_ = c.flushFSTree(ctx, true)
c.modeMtx.RUnlock()
case <-c.closeCh:
case <-ctx.Done():
return
}
}
@ -228,7 +218,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
}
// workerFlushSmall writes small objects to the main storage.
func (c *cache) workerFlushSmall() {
func (c *cache) workerFlushSmall(ctx context.Context) {
defer c.wg.Done()
var objInfo objectInfo
@ -236,11 +226,11 @@ func (c *cache) workerFlushSmall() {
// Give priority to direct put.
select {
case objInfo = <-c.flushCh:
case <-c.closeCh:
case <-ctx.Done():
return
}
err := c.flushObject(context.TODO(), objInfo.obj, objInfo.data, writecache.StorageTypeDB)
err := c.flushObject(ctx, objInfo.obj, objInfo.data, writecache.StorageTypeDB)
if err != nil {
// Error is handled in flushObject.
continue