writecache: Remove usage of close channel in bbolt and badger #688

Merged
fyrchik merged 2 commits from acid-ant/frostfs-node:feature/642-wc-flush-with-ctx-from-main into master 2023-10-24 15:57:52 +00:00
5 changed files with 49 additions and 65 deletions

View file

@ -26,12 +26,12 @@ type cache struct {
// helps to avoid multiple flushing of one object
scheduled4Flush map[oid.Address]struct{}
scheduled4FlushMtx sync.RWMutex
// closeCh is close channel, protected by modeMtx.
closeCh chan struct{}
// wg is a wait group for flush workers.
wg sync.WaitGroup
// store contains underlying database.
store
// cancel is cancel function, protected by modeMtx in Close.
cancel func()
}
// wcStorageType is used for write-cache operations logging.
@ -89,11 +89,6 @@ func (c *cache) Open(_ context.Context, readOnly bool) error {
if err != nil {
return metaerr.Wrap(err)
}
// Opening after Close is done during maintenance mode,
// thus we need to create a channel here.
c.closeCh = make(chan struct{})
return metaerr.Wrap(c.initCounters())
}
@ -101,8 +96,10 @@ func (c *cache) Open(_ context.Context, readOnly bool) error {
func (c *cache) Init() error {
c.log.Info(logs.WritecacheBadgerInitExperimental)
c.metrics.SetMode(c.mode)
c.runFlushLoop()
c.runGCLoop()
ctx, cancel := context.WithCancel(context.Background())
c.cancel = cancel
fyrchik marked this conversation as resolved Outdated

Not protected by modeMtx here as the comment says.

Not protected by `modeMtx` here as the comment says.

Protected in Close, as it was for closeCh too. Thought this comment is about this.

Protected in `Close`, as it was for `closeCh` too. Thought this comment is about this.

Ok, it seems closeCh had this problem too

Ok, it seems `closeCh` had this problem too
c.runFlushLoop(ctx)
c.runGCLoop(ctx)
return nil
}
@ -111,8 +108,9 @@ func (c *cache) Close() error {
// We cannot lock mutex for the whole operation duration
// because it is taken by some background workers, so `wg.Wait()` is done without modeMtx.
c.modeMtx.Lock()
if c.closeCh != nil {
close(c.closeCh)
if c.cancel != nil {
c.cancel()
c.cancel = nil
}
c.mode = mode.DegradedReadOnly // prevent new operations from being processed
c.modeMtx.Unlock()
@ -122,7 +120,6 @@ func (c *cache) Close() error {
c.modeMtx.Lock()
defer c.modeMtx.Unlock()
fyrchik marked this conversation as resolved Outdated

What about setting it to nil right after we called it?

What about setting it to nil right after we called it?

Good catch, updated.

Good catch, updated.
c.closeCh = nil
var err error
if c.db != nil {
err = c.db.Close()

View file

@ -39,18 +39,16 @@ type collector struct {
cache *cache

Why this change?

Why this change?

Agree. I also don't understand the intention to remove these fields and pass them to send method. Could you explain, please?

Agree. I also don't understand the intention to remove these fields and pass them to `send` method. Could you explain, please?

That was the previous implementation when I tried to use context from main. Also, the idea was to reduce the amount of entities. Let's revert these changes.

That was the previous implementation when I tried to use context from main. Also, the idea was to reduce the amount of entities. Let's revert these changes.
scheduled int
processed int
cancel func()
}
func (c *collector) Send(buf *z.Buffer) error {
func (c *collector) send(ctx context.Context, cancel func(), buf *z.Buffer) error {
list, err := badger.BufferToKVList(buf)
if err != nil {
return err
}
for _, kv := range list.Kv {
select {
case <-c.cache.closeCh:
c.cancel()
case <-ctx.Done():
return nil
default:
}
@ -58,7 +56,7 @@ func (c *collector) Send(buf *z.Buffer) error {
return nil
}
if c.scheduled >= flushBatchSize {
c.cancel()
cancel()
return nil
}
if got, want := len(kv.Key), len(internalKey{}); got != want {
@ -90,8 +88,7 @@ func (c *collector) Send(buf *z.Buffer) error {
data: val,
obj: obj,
}:
case <-c.cache.closeCh:
c.cancel()
case <-ctx.Done():
return nil
}
}
@ -99,10 +96,10 @@ func (c *collector) Send(buf *z.Buffer) error {
}
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
func (c *cache) runFlushLoop() {
func (c *cache) runFlushLoop(ctx context.Context) {
for i := 0; i < c.workersCount; i++ {
c.wg.Add(1)
go c.workerFlushSmall()
go c.workerFlushSmall(ctx)
}
c.wg.Add(1)
@ -115,19 +112,19 @@ func (c *cache) runFlushLoop() {
for {
select {
case <-tt.C:
c.flushSmallObjects()
c.flushSmallObjects(ctx)
tt.Reset(defaultFlushInterval)
case <-c.closeCh:
case <-ctx.Done():
return
}
}
}()
}
func (c *cache) flushSmallObjects() {
func (c *cache) flushSmallObjects(ctx context.Context) {
for {
select {
case <-c.closeCh:
case <-ctx.Done():
return
default:
}
@ -144,14 +141,15 @@ func (c *cache) flushSmallObjects() {
c.modeMtx.RUnlock()
return
}
ctx, cancel := context.WithCancel(context.TODO())
ctx, cancel := context.WithCancel(ctx)

Do we need this cancel argument, why not cancel context after Orchestrate()?

Do we need this `cancel` argument, why not cancel context after `Orchestrate()`?

We need to interrupt at the moment when scheduled more than flushBatchSize items for flush. That is why I use cancel inside Send.

We need to interrupt at the moment when scheduled more than `flushBatchSize` items for flush. That is why I use `cancel` inside `Send`.
coll := collector{
cache: c,
cancel: cancel,
cache: c,
}
stream := c.db.NewStream()
// All calls to Send are done by a single goroutine
stream.Send = coll.Send
stream.Send = func(buf *z.Buffer) error {
return coll.send(ctx, cancel, buf)
}
if err := stream.Orchestrate(ctx); err != nil {
c.log.Debug(fmt.Sprintf(
"error during flushing object from wc: %s", err))
@ -176,7 +174,7 @@ func (c *cache) reportFlushError(msg string, addr string, err error) {
}
// workerFlushSmall writes small objects to the main storage.
func (c *cache) workerFlushSmall() {
func (c *cache) workerFlushSmall(ctx context.Context) {
defer c.wg.Done()
var objInfo objectInfo
@ -184,11 +182,11 @@ func (c *cache) workerFlushSmall() {
// Give priority to direct put.
select {
case objInfo = <-c.flushCh:
case <-c.closeCh:
case <-ctx.Done():
return
}
err := c.flushObject(context.TODO(), objInfo.obj, objInfo.data, writecache.StorageTypeDB)
err := c.flushObject(ctx, objInfo.obj, objInfo.data, writecache.StorageTypeDB)
if err == nil {
c.deleteFromDB([]internalKey{addr2key(objInfo.addr)})
}

View file

@ -1,12 +1,13 @@
package writecachebadger
import (
"context"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
)
func (c *cache) runGCLoop() {
func (c *cache) runGCLoop(ctx context.Context) {
c.wg.Add(1)
go func() {
@ -17,7 +18,7 @@ func (c *cache) runGCLoop() {
for {
select {
case <-c.closeCh:
case <-ctx.Done():
return
case <-t.C:
// This serves to synchronize the c.db field when changing mode as well.

View file

@ -30,8 +30,8 @@ type cache struct {
// flushCh is a channel with objects to flush.
flushCh chan objectInfo
// closeCh is close channel, protected by modeMtx.
closeCh chan struct{}
// cancel is cancel function, protected by modeMtx in Close.

Init is not protected by modeMtx. Comment or behaviour should be fixed.

`Init` is not protected by modeMtx. Comment or behaviour should be fixed.

Description updated.

Description updated.
cancel func()
// wg is a wait group for flush workers.
wg sync.WaitGroup
// store contains underlying database.
@ -104,17 +104,15 @@ func (c *cache) Open(_ context.Context, readOnly bool) error {
return metaerr.Wrap(err)
}
// Opening after Close is done during maintenance mode,
// thus we need to create a channel here.
c.closeCh = make(chan struct{})
return metaerr.Wrap(c.initCounters())
}
// Init runs necessary services.
func (c *cache) Init() error {
c.metrics.SetMode(c.mode)
c.runFlushLoop()
ctx, cancel := context.WithCancel(context.Background())
c.cancel = cancel
c.runFlushLoop(ctx)
return nil
}
@ -123,8 +121,9 @@ func (c *cache) Close() error {
// We cannot lock mutex for the whole operation duration
// because it is taken by some background workers, so `wg.Wait()` is done without modeMtx.
c.modeMtx.Lock()
if c.closeCh != nil {
close(c.closeCh)
if c.cancel != nil {
c.cancel()
c.cancel = nil
}
c.mode = mode.DegradedReadOnly // prevent new operations from being processed
c.modeMtx.Unlock()
@ -134,7 +133,6 @@ func (c *cache) Close() error {
c.modeMtx.Lock()
defer c.modeMtx.Unlock()
c.closeCh = nil
var err error
if c.db != nil {
err = c.db.Close()

View file

@ -36,20 +36,10 @@ const (
)
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
func (c *cache) runFlushLoop() {
ctx, cancel := context.WithCancel(context.Background())
ch := c.closeCh
c.wg.Add(1)
go func() {
<-ch
cancel()
c.wg.Done()
}()
func (c *cache) runFlushLoop(ctx context.Context) {
for i := 0; i < c.workersCount; i++ {
c.wg.Add(1)
go c.workerFlushSmall()
go c.workerFlushSmall(ctx)
}
c.wg.Add(1)
@ -68,20 +58,20 @@ func (c *cache) runFlushLoop() {
for {
select {
case <-tt.C:
c.flushSmallObjects()
c.flushSmallObjects(ctx)
tt.Reset(defaultFlushInterval)
case <-c.closeCh:
case <-ctx.Done():
return
}
}
}()
}
func (c *cache) flushSmallObjects() {
func (c *cache) flushSmallObjects(ctx context.Context) {
var lastKey []byte
for {
select {
case <-c.closeCh:
case <-ctx.Done():
return
default:
}
@ -137,7 +127,7 @@ func (c *cache) flushSmallObjects() {
count++
select {
case c.flushCh <- m[i]:
case <-c.closeCh:
case <-ctx.Done():
c.modeMtx.RUnlock()
return
}
@ -170,7 +160,7 @@ func (c *cache) workerFlushBig(ctx context.Context) {
_ = c.flushFSTree(ctx, true)
c.modeMtx.RUnlock()
case <-c.closeCh:
case <-ctx.Done():
return
}
}
@ -228,7 +218,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
}
// workerFlushSmall writes small objects to the main storage.
func (c *cache) workerFlushSmall() {
func (c *cache) workerFlushSmall(ctx context.Context) {
defer c.wg.Done()
var objInfo objectInfo
@ -236,11 +226,11 @@ func (c *cache) workerFlushSmall() {
// Give priority to direct put.
select {
case objInfo = <-c.flushCh:
case <-c.closeCh:
case <-ctx.Done():
return
}
err := c.flushObject(context.TODO(), objInfo.obj, objInfo.data, writecache.StorageTypeDB)
err := c.flushObject(ctx, objInfo.obj, objInfo.data, writecache.StorageTypeDB)
if err != nil {
// Error is handled in flushObject.
continue