diff --git a/pkg/local_object_storage/writecache/writecachebadger/cachebadger.go b/pkg/local_object_storage/writecache/writecachebadger/cachebadger.go index d5da7763..be7046d0 100644 --- a/pkg/local_object_storage/writecache/writecachebadger/cachebadger.go +++ b/pkg/local_object_storage/writecache/writecachebadger/cachebadger.go @@ -26,12 +26,12 @@ type cache struct { // helps to avoid multiple flushing of one object scheduled4Flush map[oid.Address]struct{} scheduled4FlushMtx sync.RWMutex - // closeCh is close channel, protected by modeMtx. - closeCh chan struct{} // wg is a wait group for flush workers. wg sync.WaitGroup // store contains underlying database. store + // cancel is cancel function, protected by modeMtx in Close. + cancel func() } // wcStorageType is used for write-cache operations logging. @@ -89,11 +89,6 @@ func (c *cache) Open(_ context.Context, readOnly bool) error { if err != nil { return metaerr.Wrap(err) } - - // Opening after Close is done during maintenance mode, - // thus we need to create a channel here. - c.closeCh = make(chan struct{}) - return metaerr.Wrap(c.initCounters()) } @@ -101,8 +96,10 @@ func (c *cache) Open(_ context.Context, readOnly bool) error { func (c *cache) Init() error { c.log.Info(logs.WritecacheBadgerInitExperimental) c.metrics.SetMode(c.mode) - c.runFlushLoop() - c.runGCLoop() + ctx, cancel := context.WithCancel(context.Background()) + c.cancel = cancel + c.runFlushLoop(ctx) + c.runGCLoop(ctx) return nil } @@ -111,8 +108,9 @@ func (c *cache) Close() error { // We cannot lock mutex for the whole operation duration // because it is taken by some background workers, so `wg.Wait()` is done without modeMtx. c.modeMtx.Lock() - if c.closeCh != nil { - close(c.closeCh) + if c.cancel != nil { + c.cancel() + c.cancel = nil } c.mode = mode.DegradedReadOnly // prevent new operations from being processed c.modeMtx.Unlock() @@ -122,7 +120,6 @@ func (c *cache) Close() error { c.modeMtx.Lock() defer c.modeMtx.Unlock() - c.closeCh = nil var err error if c.db != nil { err = c.db.Close() diff --git a/pkg/local_object_storage/writecache/writecachebadger/flush.go b/pkg/local_object_storage/writecache/writecachebadger/flush.go index e0333eff..1ee6bbb6 100644 --- a/pkg/local_object_storage/writecache/writecachebadger/flush.go +++ b/pkg/local_object_storage/writecache/writecachebadger/flush.go @@ -39,18 +39,16 @@ type collector struct { cache *cache scheduled int processed int - cancel func() } -func (c *collector) Send(buf *z.Buffer) error { +func (c *collector) send(ctx context.Context, cancel func(), buf *z.Buffer) error { list, err := badger.BufferToKVList(buf) if err != nil { return err } for _, kv := range list.Kv { select { - case <-c.cache.closeCh: - c.cancel() + case <-ctx.Done(): return nil default: } @@ -58,7 +56,7 @@ func (c *collector) Send(buf *z.Buffer) error { return nil } if c.scheduled >= flushBatchSize { - c.cancel() + cancel() return nil } if got, want := len(kv.Key), len(internalKey{}); got != want { @@ -90,8 +88,7 @@ func (c *collector) Send(buf *z.Buffer) error { data: val, obj: obj, }: - case <-c.cache.closeCh: - c.cancel() + case <-ctx.Done(): return nil } } @@ -99,10 +96,10 @@ func (c *collector) Send(buf *z.Buffer) error { } // runFlushLoop starts background workers which periodically flush objects to the blobstor. -func (c *cache) runFlushLoop() { +func (c *cache) runFlushLoop(ctx context.Context) { for i := 0; i < c.workersCount; i++ { c.wg.Add(1) - go c.workerFlushSmall() + go c.workerFlushSmall(ctx) } c.wg.Add(1) @@ -115,19 +112,19 @@ func (c *cache) runFlushLoop() { for { select { case <-tt.C: - c.flushSmallObjects() + c.flushSmallObjects(ctx) tt.Reset(defaultFlushInterval) - case <-c.closeCh: + case <-ctx.Done(): return } } }() } -func (c *cache) flushSmallObjects() { +func (c *cache) flushSmallObjects(ctx context.Context) { for { select { - case <-c.closeCh: + case <-ctx.Done(): return default: } @@ -144,14 +141,15 @@ func (c *cache) flushSmallObjects() { c.modeMtx.RUnlock() return } - ctx, cancel := context.WithCancel(context.TODO()) + ctx, cancel := context.WithCancel(ctx) coll := collector{ - cache: c, - cancel: cancel, + cache: c, } stream := c.db.NewStream() // All calls to Send are done by a single goroutine - stream.Send = coll.Send + stream.Send = func(buf *z.Buffer) error { + return coll.send(ctx, cancel, buf) + } if err := stream.Orchestrate(ctx); err != nil { c.log.Debug(fmt.Sprintf( "error during flushing object from wc: %s", err)) @@ -176,7 +174,7 @@ func (c *cache) reportFlushError(msg string, addr string, err error) { } // workerFlushSmall writes small objects to the main storage. -func (c *cache) workerFlushSmall() { +func (c *cache) workerFlushSmall(ctx context.Context) { defer c.wg.Done() var objInfo objectInfo @@ -184,11 +182,11 @@ func (c *cache) workerFlushSmall() { // Give priority to direct put. select { case objInfo = <-c.flushCh: - case <-c.closeCh: + case <-ctx.Done(): return } - err := c.flushObject(context.TODO(), objInfo.obj, objInfo.data, writecache.StorageTypeDB) + err := c.flushObject(ctx, objInfo.obj, objInfo.data, writecache.StorageTypeDB) if err == nil { c.deleteFromDB([]internalKey{addr2key(objInfo.addr)}) } diff --git a/pkg/local_object_storage/writecache/writecachebadger/gc.go b/pkg/local_object_storage/writecache/writecachebadger/gc.go index 8432a9c0..b856efc3 100644 --- a/pkg/local_object_storage/writecache/writecachebadger/gc.go +++ b/pkg/local_object_storage/writecache/writecachebadger/gc.go @@ -1,12 +1,13 @@ package writecachebadger import ( + "context" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" ) -func (c *cache) runGCLoop() { +func (c *cache) runGCLoop(ctx context.Context) { c.wg.Add(1) go func() { @@ -17,7 +18,7 @@ func (c *cache) runGCLoop() { for { select { - case <-c.closeCh: + case <-ctx.Done(): return case <-t.C: // This serves to synchronize the c.db field when changing mode as well.