From 0b7fd293f12fe67bb168b39aea0012bfb0738230 Mon Sep 17 00:00:00 2001 From: Anton Nikiforov Date: Tue, 19 Sep 2023 08:43:13 +0300 Subject: [PATCH 1/2] [#642] writecache: Remove usage of close channel in badger Signed-off-by: Anton Nikiforov --- .../writecachebadger/cachebadger.go | 21 +++++----- .../writecache/writecachebadger/flush.go | 38 +++++++++---------- .../writecache/writecachebadger/gc.go | 5 ++- 3 files changed, 30 insertions(+), 34 deletions(-) diff --git a/pkg/local_object_storage/writecache/writecachebadger/cachebadger.go b/pkg/local_object_storage/writecache/writecachebadger/cachebadger.go index d5da77635..be7046d05 100644 --- a/pkg/local_object_storage/writecache/writecachebadger/cachebadger.go +++ b/pkg/local_object_storage/writecache/writecachebadger/cachebadger.go @@ -26,12 +26,12 @@ type cache struct { // helps to avoid multiple flushing of one object scheduled4Flush map[oid.Address]struct{} scheduled4FlushMtx sync.RWMutex - // closeCh is close channel, protected by modeMtx. - closeCh chan struct{} // wg is a wait group for flush workers. wg sync.WaitGroup // store contains underlying database. store + // cancel is cancel function, protected by modeMtx in Close. + cancel func() } // wcStorageType is used for write-cache operations logging. @@ -89,11 +89,6 @@ func (c *cache) Open(_ context.Context, readOnly bool) error { if err != nil { return metaerr.Wrap(err) } - - // Opening after Close is done during maintenance mode, - // thus we need to create a channel here. - c.closeCh = make(chan struct{}) - return metaerr.Wrap(c.initCounters()) } @@ -101,8 +96,10 @@ func (c *cache) Open(_ context.Context, readOnly bool) error { func (c *cache) Init() error { c.log.Info(logs.WritecacheBadgerInitExperimental) c.metrics.SetMode(c.mode) - c.runFlushLoop() - c.runGCLoop() + ctx, cancel := context.WithCancel(context.Background()) + c.cancel = cancel + c.runFlushLoop(ctx) + c.runGCLoop(ctx) return nil } @@ -111,8 +108,9 @@ func (c *cache) Close() error { // We cannot lock mutex for the whole operation duration // because it is taken by some background workers, so `wg.Wait()` is done without modeMtx. c.modeMtx.Lock() - if c.closeCh != nil { - close(c.closeCh) + if c.cancel != nil { + c.cancel() + c.cancel = nil } c.mode = mode.DegradedReadOnly // prevent new operations from being processed c.modeMtx.Unlock() @@ -122,7 +120,6 @@ func (c *cache) Close() error { c.modeMtx.Lock() defer c.modeMtx.Unlock() - c.closeCh = nil var err error if c.db != nil { err = c.db.Close() diff --git a/pkg/local_object_storage/writecache/writecachebadger/flush.go b/pkg/local_object_storage/writecache/writecachebadger/flush.go index e0333eff7..1ee6bbb66 100644 --- a/pkg/local_object_storage/writecache/writecachebadger/flush.go +++ b/pkg/local_object_storage/writecache/writecachebadger/flush.go @@ -39,18 +39,16 @@ type collector struct { cache *cache scheduled int processed int - cancel func() } -func (c *collector) Send(buf *z.Buffer) error { +func (c *collector) send(ctx context.Context, cancel func(), buf *z.Buffer) error { list, err := badger.BufferToKVList(buf) if err != nil { return err } for _, kv := range list.Kv { select { - case <-c.cache.closeCh: - c.cancel() + case <-ctx.Done(): return nil default: } @@ -58,7 +56,7 @@ func (c *collector) Send(buf *z.Buffer) error { return nil } if c.scheduled >= flushBatchSize { - c.cancel() + cancel() return nil } if got, want := len(kv.Key), len(internalKey{}); got != want { @@ -90,8 +88,7 @@ func (c *collector) Send(buf *z.Buffer) error { data: val, obj: obj, }: - case <-c.cache.closeCh: - c.cancel() + case <-ctx.Done(): return nil } } @@ -99,10 +96,10 @@ func (c *collector) Send(buf *z.Buffer) error { } // runFlushLoop starts background workers which periodically flush objects to the blobstor. -func (c *cache) runFlushLoop() { +func (c *cache) runFlushLoop(ctx context.Context) { for i := 0; i < c.workersCount; i++ { c.wg.Add(1) - go c.workerFlushSmall() + go c.workerFlushSmall(ctx) } c.wg.Add(1) @@ -115,19 +112,19 @@ func (c *cache) runFlushLoop() { for { select { case <-tt.C: - c.flushSmallObjects() + c.flushSmallObjects(ctx) tt.Reset(defaultFlushInterval) - case <-c.closeCh: + case <-ctx.Done(): return } } }() } -func (c *cache) flushSmallObjects() { +func (c *cache) flushSmallObjects(ctx context.Context) { for { select { - case <-c.closeCh: + case <-ctx.Done(): return default: } @@ -144,14 +141,15 @@ func (c *cache) flushSmallObjects() { c.modeMtx.RUnlock() return } - ctx, cancel := context.WithCancel(context.TODO()) + ctx, cancel := context.WithCancel(ctx) coll := collector{ - cache: c, - cancel: cancel, + cache: c, } stream := c.db.NewStream() // All calls to Send are done by a single goroutine - stream.Send = coll.Send + stream.Send = func(buf *z.Buffer) error { + return coll.send(ctx, cancel, buf) + } if err := stream.Orchestrate(ctx); err != nil { c.log.Debug(fmt.Sprintf( "error during flushing object from wc: %s", err)) @@ -176,7 +174,7 @@ func (c *cache) reportFlushError(msg string, addr string, err error) { } // workerFlushSmall writes small objects to the main storage. -func (c *cache) workerFlushSmall() { +func (c *cache) workerFlushSmall(ctx context.Context) { defer c.wg.Done() var objInfo objectInfo @@ -184,11 +182,11 @@ func (c *cache) workerFlushSmall() { // Give priority to direct put. select { case objInfo = <-c.flushCh: - case <-c.closeCh: + case <-ctx.Done(): return } - err := c.flushObject(context.TODO(), objInfo.obj, objInfo.data, writecache.StorageTypeDB) + err := c.flushObject(ctx, objInfo.obj, objInfo.data, writecache.StorageTypeDB) if err == nil { c.deleteFromDB([]internalKey{addr2key(objInfo.addr)}) } diff --git a/pkg/local_object_storage/writecache/writecachebadger/gc.go b/pkg/local_object_storage/writecache/writecachebadger/gc.go index 8432a9c04..b856efc3d 100644 --- a/pkg/local_object_storage/writecache/writecachebadger/gc.go +++ b/pkg/local_object_storage/writecache/writecachebadger/gc.go @@ -1,12 +1,13 @@ package writecachebadger import ( + "context" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" ) -func (c *cache) runGCLoop() { +func (c *cache) runGCLoop(ctx context.Context) { c.wg.Add(1) go func() { @@ -17,7 +18,7 @@ func (c *cache) runGCLoop() { for { select { - case <-c.closeCh: + case <-ctx.Done(): return case <-t.C: // This serves to synchronize the c.db field when changing mode as well. -- 2.45.3 From 0f4633d65d205d63050d2633d8a6d91ab1b590b2 Mon Sep 17 00:00:00 2001 From: Anton Nikiforov Date: Tue, 19 Sep 2023 08:46:19 +0300 Subject: [PATCH 2/2] [#642] writecache: Remove usage of close channel in bbolt Signed-off-by: Anton Nikiforov --- .../writecache/writecachebbolt/cachebbolt.go | 18 +++++------ .../writecache/writecachebbolt/flush.go | 32 +++++++------------ 2 files changed, 19 insertions(+), 31 deletions(-) diff --git a/pkg/local_object_storage/writecache/writecachebbolt/cachebbolt.go b/pkg/local_object_storage/writecache/writecachebbolt/cachebbolt.go index 363ee8448..9d54cd5c2 100644 --- a/pkg/local_object_storage/writecache/writecachebbolt/cachebbolt.go +++ b/pkg/local_object_storage/writecache/writecachebbolt/cachebbolt.go @@ -30,8 +30,8 @@ type cache struct { // flushCh is a channel with objects to flush. flushCh chan objectInfo - // closeCh is close channel, protected by modeMtx. - closeCh chan struct{} + // cancel is cancel function, protected by modeMtx in Close. + cancel func() // wg is a wait group for flush workers. wg sync.WaitGroup // store contains underlying database. @@ -104,17 +104,15 @@ func (c *cache) Open(_ context.Context, readOnly bool) error { return metaerr.Wrap(err) } - // Opening after Close is done during maintenance mode, - // thus we need to create a channel here. - c.closeCh = make(chan struct{}) - return metaerr.Wrap(c.initCounters()) } // Init runs necessary services. func (c *cache) Init() error { c.metrics.SetMode(c.mode) - c.runFlushLoop() + ctx, cancel := context.WithCancel(context.Background()) + c.cancel = cancel + c.runFlushLoop(ctx) return nil } @@ -123,8 +121,9 @@ func (c *cache) Close() error { // We cannot lock mutex for the whole operation duration // because it is taken by some background workers, so `wg.Wait()` is done without modeMtx. c.modeMtx.Lock() - if c.closeCh != nil { - close(c.closeCh) + if c.cancel != nil { + c.cancel() + c.cancel = nil } c.mode = mode.DegradedReadOnly // prevent new operations from being processed c.modeMtx.Unlock() @@ -134,7 +133,6 @@ func (c *cache) Close() error { c.modeMtx.Lock() defer c.modeMtx.Unlock() - c.closeCh = nil var err error if c.db != nil { err = c.db.Close() diff --git a/pkg/local_object_storage/writecache/writecachebbolt/flush.go b/pkg/local_object_storage/writecache/writecachebbolt/flush.go index fd322e9fa..393776dd1 100644 --- a/pkg/local_object_storage/writecache/writecachebbolt/flush.go +++ b/pkg/local_object_storage/writecache/writecachebbolt/flush.go @@ -36,20 +36,10 @@ const ( ) // runFlushLoop starts background workers which periodically flush objects to the blobstor. -func (c *cache) runFlushLoop() { - ctx, cancel := context.WithCancel(context.Background()) - - ch := c.closeCh - c.wg.Add(1) - go func() { - <-ch - cancel() - c.wg.Done() - }() - +func (c *cache) runFlushLoop(ctx context.Context) { for i := 0; i < c.workersCount; i++ { c.wg.Add(1) - go c.workerFlushSmall() + go c.workerFlushSmall(ctx) } c.wg.Add(1) @@ -68,20 +58,20 @@ func (c *cache) runFlushLoop() { for { select { case <-tt.C: - c.flushSmallObjects() + c.flushSmallObjects(ctx) tt.Reset(defaultFlushInterval) - case <-c.closeCh: + case <-ctx.Done(): return } } }() } -func (c *cache) flushSmallObjects() { +func (c *cache) flushSmallObjects(ctx context.Context) { var lastKey []byte for { select { - case <-c.closeCh: + case <-ctx.Done(): return default: } @@ -137,7 +127,7 @@ func (c *cache) flushSmallObjects() { count++ select { case c.flushCh <- m[i]: - case <-c.closeCh: + case <-ctx.Done(): c.modeMtx.RUnlock() return } @@ -170,7 +160,7 @@ func (c *cache) workerFlushBig(ctx context.Context) { _ = c.flushFSTree(ctx, true) c.modeMtx.RUnlock() - case <-c.closeCh: + case <-ctx.Done(): return } } @@ -228,7 +218,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { } // workerFlushSmall writes small objects to the main storage. -func (c *cache) workerFlushSmall() { +func (c *cache) workerFlushSmall(ctx context.Context) { defer c.wg.Done() var objInfo objectInfo @@ -236,11 +226,11 @@ func (c *cache) workerFlushSmall() { // Give priority to direct put. select { case objInfo = <-c.flushCh: - case <-c.closeCh: + case <-ctx.Done(): return } - err := c.flushObject(context.TODO(), objInfo.obj, objInfo.data, writecache.StorageTypeDB) + err := c.flushObject(ctx, objInfo.obj, objInfo.data, writecache.StorageTypeDB) if err != nil { // Error is handled in flushObject. continue -- 2.45.3