diff --git a/pkg/local_object_storage/writecache/writecachebbolt/cachebbolt.go b/pkg/local_object_storage/writecache/writecachebbolt/cachebbolt.go index 363ee844..9d54cd5c 100644 --- a/pkg/local_object_storage/writecache/writecachebbolt/cachebbolt.go +++ b/pkg/local_object_storage/writecache/writecachebbolt/cachebbolt.go @@ -30,8 +30,8 @@ type cache struct { // flushCh is a channel with objects to flush. flushCh chan objectInfo - // closeCh is close channel, protected by modeMtx. - closeCh chan struct{} + // cancel is cancel function, protected by modeMtx in Close. + cancel func() // wg is a wait group for flush workers. wg sync.WaitGroup // store contains underlying database. @@ -104,17 +104,15 @@ func (c *cache) Open(_ context.Context, readOnly bool) error { return metaerr.Wrap(err) } - // Opening after Close is done during maintenance mode, - // thus we need to create a channel here. - c.closeCh = make(chan struct{}) - return metaerr.Wrap(c.initCounters()) } // Init runs necessary services. func (c *cache) Init() error { c.metrics.SetMode(c.mode) - c.runFlushLoop() + ctx, cancel := context.WithCancel(context.Background()) + c.cancel = cancel + c.runFlushLoop(ctx) return nil } @@ -123,8 +121,9 @@ func (c *cache) Close() error { // We cannot lock mutex for the whole operation duration // because it is taken by some background workers, so `wg.Wait()` is done without modeMtx. c.modeMtx.Lock() - if c.closeCh != nil { - close(c.closeCh) + if c.cancel != nil { + c.cancel() + c.cancel = nil } c.mode = mode.DegradedReadOnly // prevent new operations from being processed c.modeMtx.Unlock() @@ -134,7 +133,6 @@ func (c *cache) Close() error { c.modeMtx.Lock() defer c.modeMtx.Unlock() - c.closeCh = nil var err error if c.db != nil { err = c.db.Close() diff --git a/pkg/local_object_storage/writecache/writecachebbolt/flush.go b/pkg/local_object_storage/writecache/writecachebbolt/flush.go index fd322e9f..393776dd 100644 --- a/pkg/local_object_storage/writecache/writecachebbolt/flush.go +++ b/pkg/local_object_storage/writecache/writecachebbolt/flush.go @@ -36,20 +36,10 @@ const ( ) // runFlushLoop starts background workers which periodically flush objects to the blobstor. -func (c *cache) runFlushLoop() { - ctx, cancel := context.WithCancel(context.Background()) - - ch := c.closeCh - c.wg.Add(1) - go func() { - <-ch - cancel() - c.wg.Done() - }() - +func (c *cache) runFlushLoop(ctx context.Context) { for i := 0; i < c.workersCount; i++ { c.wg.Add(1) - go c.workerFlushSmall() + go c.workerFlushSmall(ctx) } c.wg.Add(1) @@ -68,20 +58,20 @@ func (c *cache) runFlushLoop() { for { select { case <-tt.C: - c.flushSmallObjects() + c.flushSmallObjects(ctx) tt.Reset(defaultFlushInterval) - case <-c.closeCh: + case <-ctx.Done(): return } } }() } -func (c *cache) flushSmallObjects() { +func (c *cache) flushSmallObjects(ctx context.Context) { var lastKey []byte for { select { - case <-c.closeCh: + case <-ctx.Done(): return default: } @@ -137,7 +127,7 @@ func (c *cache) flushSmallObjects() { count++ select { case c.flushCh <- m[i]: - case <-c.closeCh: + case <-ctx.Done(): c.modeMtx.RUnlock() return } @@ -170,7 +160,7 @@ func (c *cache) workerFlushBig(ctx context.Context) { _ = c.flushFSTree(ctx, true) c.modeMtx.RUnlock() - case <-c.closeCh: + case <-ctx.Done(): return } } @@ -228,7 +218,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { } // workerFlushSmall writes small objects to the main storage. -func (c *cache) workerFlushSmall() { +func (c *cache) workerFlushSmall(ctx context.Context) { defer c.wg.Done() var objInfo objectInfo @@ -236,11 +226,11 @@ func (c *cache) workerFlushSmall() { // Give priority to direct put. select { case objInfo = <-c.flushCh: - case <-c.closeCh: + case <-ctx.Done(): return } - err := c.flushObject(context.TODO(), objInfo.obj, objInfo.data, writecache.StorageTypeDB) + err := c.flushObject(ctx, objInfo.obj, objInfo.data, writecache.StorageTypeDB) if err != nil { // Error is handled in flushObject. continue