Compare commits

...

2 commits

Author SHA1 Message Date
0f4633d65d [#642] writecache: Remove usage of close channel in bbolt
All checks were successful
DCO action / DCO (pull_request) Successful in 6m29s
Vulncheck / Vulncheck (pull_request) Successful in 8m44s
Build / Build Components (1.21) (pull_request) Successful in 8m43s
Build / Build Components (1.20) (pull_request) Successful in 9m5s
Tests and linters / Staticcheck (pull_request) Successful in 2m59s
Tests and linters / Lint (pull_request) Successful in 3m42s
Tests and linters / Tests (1.21) (pull_request) Successful in 3m45s
Tests and linters / Tests (1.20) (pull_request) Successful in 29m53s
Tests and linters / Tests with -race (pull_request) Successful in 34m0s
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2023-10-17 13:25:43 +03:00
0b7fd293f1 [#642] writecache: Remove usage of close channel in badger
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2023-10-17 13:25:43 +03:00
5 changed files with 49 additions and 65 deletions

View file

@ -26,12 +26,12 @@ type cache struct {
// helps to avoid multiple flushing of one object // helps to avoid multiple flushing of one object
scheduled4Flush map[oid.Address]struct{} scheduled4Flush map[oid.Address]struct{}
scheduled4FlushMtx sync.RWMutex scheduled4FlushMtx sync.RWMutex
// closeCh is close channel, protected by modeMtx.
closeCh chan struct{}
// wg is a wait group for flush workers. // wg is a wait group for flush workers.
wg sync.WaitGroup wg sync.WaitGroup
// store contains underlying database. // store contains underlying database.
store store
// cancel is cancel function, protected by modeMtx in Close.
cancel func()
} }
// wcStorageType is used for write-cache operations logging. // wcStorageType is used for write-cache operations logging.
@ -89,11 +89,6 @@ func (c *cache) Open(_ context.Context, readOnly bool) error {
if err != nil { if err != nil {
return metaerr.Wrap(err) return metaerr.Wrap(err)
} }
// Opening after Close is done during maintenance mode,
// thus we need to create a channel here.
c.closeCh = make(chan struct{})
return metaerr.Wrap(c.initCounters()) return metaerr.Wrap(c.initCounters())
} }
@ -101,8 +96,10 @@ func (c *cache) Open(_ context.Context, readOnly bool) error {
func (c *cache) Init() error { func (c *cache) Init() error {
c.log.Info(logs.WritecacheBadgerInitExperimental) c.log.Info(logs.WritecacheBadgerInitExperimental)
c.metrics.SetMode(c.mode) c.metrics.SetMode(c.mode)
c.runFlushLoop() ctx, cancel := context.WithCancel(context.Background())
c.runGCLoop() c.cancel = cancel
c.runFlushLoop(ctx)
c.runGCLoop(ctx)
return nil return nil
} }
@ -111,8 +108,9 @@ func (c *cache) Close() error {
// We cannot lock mutex for the whole operation duration // We cannot lock mutex for the whole operation duration
// because it is taken by some background workers, so `wg.Wait()` is done without modeMtx. // because it is taken by some background workers, so `wg.Wait()` is done without modeMtx.
c.modeMtx.Lock() c.modeMtx.Lock()
if c.closeCh != nil { if c.cancel != nil {
close(c.closeCh) c.cancel()
c.cancel = nil
} }
c.mode = mode.DegradedReadOnly // prevent new operations from being processed c.mode = mode.DegradedReadOnly // prevent new operations from being processed
c.modeMtx.Unlock() c.modeMtx.Unlock()
@ -122,7 +120,6 @@ func (c *cache) Close() error {
c.modeMtx.Lock() c.modeMtx.Lock()
defer c.modeMtx.Unlock() defer c.modeMtx.Unlock()
c.closeCh = nil
var err error var err error
if c.db != nil { if c.db != nil {
err = c.db.Close() err = c.db.Close()

View file

@ -39,18 +39,16 @@ type collector struct {
cache *cache cache *cache
scheduled int scheduled int
processed int processed int
cancel func()
} }
func (c *collector) Send(buf *z.Buffer) error { func (c *collector) send(ctx context.Context, cancel func(), buf *z.Buffer) error {
list, err := badger.BufferToKVList(buf) list, err := badger.BufferToKVList(buf)
if err != nil { if err != nil {
return err return err
} }
for _, kv := range list.Kv { for _, kv := range list.Kv {
select { select {
case <-c.cache.closeCh: case <-ctx.Done():
c.cancel()
return nil return nil
default: default:
} }
@ -58,7 +56,7 @@ func (c *collector) Send(buf *z.Buffer) error {
return nil return nil
} }
if c.scheduled >= flushBatchSize { if c.scheduled >= flushBatchSize {
c.cancel() cancel()
return nil return nil
} }
if got, want := len(kv.Key), len(internalKey{}); got != want { if got, want := len(kv.Key), len(internalKey{}); got != want {
@ -90,8 +88,7 @@ func (c *collector) Send(buf *z.Buffer) error {
data: val, data: val,
obj: obj, obj: obj,
}: }:
case <-c.cache.closeCh: case <-ctx.Done():
c.cancel()
return nil return nil
} }
} }
@ -99,10 +96,10 @@ func (c *collector) Send(buf *z.Buffer) error {
} }
// runFlushLoop starts background workers which periodically flush objects to the blobstor. // runFlushLoop starts background workers which periodically flush objects to the blobstor.
func (c *cache) runFlushLoop() { func (c *cache) runFlushLoop(ctx context.Context) {
for i := 0; i < c.workersCount; i++ { for i := 0; i < c.workersCount; i++ {
c.wg.Add(1) c.wg.Add(1)
go c.workerFlushSmall() go c.workerFlushSmall(ctx)
} }
c.wg.Add(1) c.wg.Add(1)
@ -115,19 +112,19 @@ func (c *cache) runFlushLoop() {
for { for {
select { select {
case <-tt.C: case <-tt.C:
c.flushSmallObjects() c.flushSmallObjects(ctx)
tt.Reset(defaultFlushInterval) tt.Reset(defaultFlushInterval)
case <-c.closeCh: case <-ctx.Done():
return return
} }
} }
}() }()
} }
func (c *cache) flushSmallObjects() { func (c *cache) flushSmallObjects(ctx context.Context) {
for { for {
select { select {
case <-c.closeCh: case <-ctx.Done():
return return
default: default:
} }
@ -144,14 +141,15 @@ func (c *cache) flushSmallObjects() {
c.modeMtx.RUnlock() c.modeMtx.RUnlock()
return return
} }
ctx, cancel := context.WithCancel(context.TODO()) ctx, cancel := context.WithCancel(ctx)
coll := collector{ coll := collector{
cache: c, cache: c,
cancel: cancel,
} }
stream := c.db.NewStream() stream := c.db.NewStream()
// All calls to Send are done by a single goroutine // All calls to Send are done by a single goroutine
stream.Send = coll.Send stream.Send = func(buf *z.Buffer) error {
return coll.send(ctx, cancel, buf)
}
if err := stream.Orchestrate(ctx); err != nil { if err := stream.Orchestrate(ctx); err != nil {
c.log.Debug(fmt.Sprintf( c.log.Debug(fmt.Sprintf(
"error during flushing object from wc: %s", err)) "error during flushing object from wc: %s", err))
@ -176,7 +174,7 @@ func (c *cache) reportFlushError(msg string, addr string, err error) {
} }
// workerFlushSmall writes small objects to the main storage. // workerFlushSmall writes small objects to the main storage.
func (c *cache) workerFlushSmall() { func (c *cache) workerFlushSmall(ctx context.Context) {
defer c.wg.Done() defer c.wg.Done()
var objInfo objectInfo var objInfo objectInfo
@ -184,11 +182,11 @@ func (c *cache) workerFlushSmall() {
// Give priority to direct put. // Give priority to direct put.
select { select {
case objInfo = <-c.flushCh: case objInfo = <-c.flushCh:
case <-c.closeCh: case <-ctx.Done():
return return
} }
err := c.flushObject(context.TODO(), objInfo.obj, objInfo.data, writecache.StorageTypeDB) err := c.flushObject(ctx, objInfo.obj, objInfo.data, writecache.StorageTypeDB)
if err == nil { if err == nil {
c.deleteFromDB([]internalKey{addr2key(objInfo.addr)}) c.deleteFromDB([]internalKey{addr2key(objInfo.addr)})
} }

View file

@ -1,12 +1,13 @@
package writecachebadger package writecachebadger
import ( import (
"context"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
) )
func (c *cache) runGCLoop() { func (c *cache) runGCLoop(ctx context.Context) {
c.wg.Add(1) c.wg.Add(1)
go func() { go func() {
@ -17,7 +18,7 @@ func (c *cache) runGCLoop() {
for { for {
select { select {
case <-c.closeCh: case <-ctx.Done():
return return
case <-t.C: case <-t.C:
// This serves to synchronize the c.db field when changing mode as well. // This serves to synchronize the c.db field when changing mode as well.

View file

@ -30,8 +30,8 @@ type cache struct {
// flushCh is a channel with objects to flush. // flushCh is a channel with objects to flush.
flushCh chan objectInfo flushCh chan objectInfo
// closeCh is close channel, protected by modeMtx. // cancel is cancel function, protected by modeMtx in Close.
closeCh chan struct{} cancel func()
// wg is a wait group for flush workers. // wg is a wait group for flush workers.
wg sync.WaitGroup wg sync.WaitGroup
// store contains underlying database. // store contains underlying database.
@ -104,17 +104,15 @@ func (c *cache) Open(_ context.Context, readOnly bool) error {
return metaerr.Wrap(err) return metaerr.Wrap(err)
} }
// Opening after Close is done during maintenance mode,
// thus we need to create a channel here.
c.closeCh = make(chan struct{})
return metaerr.Wrap(c.initCounters()) return metaerr.Wrap(c.initCounters())
} }
// Init runs necessary services. // Init runs necessary services.
func (c *cache) Init() error { func (c *cache) Init() error {
c.metrics.SetMode(c.mode) c.metrics.SetMode(c.mode)
c.runFlushLoop() ctx, cancel := context.WithCancel(context.Background())
c.cancel = cancel
c.runFlushLoop(ctx)
return nil return nil
} }
@ -123,8 +121,9 @@ func (c *cache) Close() error {
// We cannot lock mutex for the whole operation duration // We cannot lock mutex for the whole operation duration
// because it is taken by some background workers, so `wg.Wait()` is done without modeMtx. // because it is taken by some background workers, so `wg.Wait()` is done without modeMtx.
c.modeMtx.Lock() c.modeMtx.Lock()
if c.closeCh != nil { if c.cancel != nil {
close(c.closeCh) c.cancel()
c.cancel = nil
} }
c.mode = mode.DegradedReadOnly // prevent new operations from being processed c.mode = mode.DegradedReadOnly // prevent new operations from being processed
c.modeMtx.Unlock() c.modeMtx.Unlock()
@ -134,7 +133,6 @@ func (c *cache) Close() error {
c.modeMtx.Lock() c.modeMtx.Lock()
defer c.modeMtx.Unlock() defer c.modeMtx.Unlock()
c.closeCh = nil
var err error var err error
if c.db != nil { if c.db != nil {
err = c.db.Close() err = c.db.Close()

View file

@ -36,20 +36,10 @@ const (
) )
// runFlushLoop starts background workers which periodically flush objects to the blobstor. // runFlushLoop starts background workers which periodically flush objects to the blobstor.
func (c *cache) runFlushLoop() { func (c *cache) runFlushLoop(ctx context.Context) {
ctx, cancel := context.WithCancel(context.Background())
ch := c.closeCh
c.wg.Add(1)
go func() {
<-ch
cancel()
c.wg.Done()
}()
for i := 0; i < c.workersCount; i++ { for i := 0; i < c.workersCount; i++ {
c.wg.Add(1) c.wg.Add(1)
go c.workerFlushSmall() go c.workerFlushSmall(ctx)
} }
c.wg.Add(1) c.wg.Add(1)
@ -68,20 +58,20 @@ func (c *cache) runFlushLoop() {
for { for {
select { select {
case <-tt.C: case <-tt.C:
c.flushSmallObjects() c.flushSmallObjects(ctx)
tt.Reset(defaultFlushInterval) tt.Reset(defaultFlushInterval)
case <-c.closeCh: case <-ctx.Done():
return return
} }
} }
}() }()
} }
func (c *cache) flushSmallObjects() { func (c *cache) flushSmallObjects(ctx context.Context) {
var lastKey []byte var lastKey []byte
for { for {
select { select {
case <-c.closeCh: case <-ctx.Done():
return return
default: default:
} }
@ -137,7 +127,7 @@ func (c *cache) flushSmallObjects() {
count++ count++
select { select {
case c.flushCh <- m[i]: case c.flushCh <- m[i]:
case <-c.closeCh: case <-ctx.Done():
c.modeMtx.RUnlock() c.modeMtx.RUnlock()
return return
} }
@ -170,7 +160,7 @@ func (c *cache) workerFlushBig(ctx context.Context) {
_ = c.flushFSTree(ctx, true) _ = c.flushFSTree(ctx, true)
c.modeMtx.RUnlock() c.modeMtx.RUnlock()
case <-c.closeCh: case <-ctx.Done():
return return
} }
} }
@ -228,7 +218,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
} }
// workerFlushSmall writes small objects to the main storage. // workerFlushSmall writes small objects to the main storage.
func (c *cache) workerFlushSmall() { func (c *cache) workerFlushSmall(ctx context.Context) {
defer c.wg.Done() defer c.wg.Done()
var objInfo objectInfo var objInfo objectInfo
@ -236,11 +226,11 @@ func (c *cache) workerFlushSmall() {
// Give priority to direct put. // Give priority to direct put.
select { select {
case objInfo = <-c.flushCh: case objInfo = <-c.flushCh:
case <-c.closeCh: case <-ctx.Done():
return return
} }
err := c.flushObject(context.TODO(), objInfo.obj, objInfo.data, writecache.StorageTypeDB) err := c.flushObject(ctx, objInfo.obj, objInfo.data, writecache.StorageTypeDB)
if err != nil { if err != nil {
// Error is handled in flushObject. // Error is handled in flushObject.
continue continue