writecache: Remove usage of close channel in bbolt and badger #688
5 changed files with 49 additions and 65 deletions
|
@ -26,12 +26,12 @@ type cache struct {
|
||||||
// helps to avoid multiple flushing of one object
|
// helps to avoid multiple flushing of one object
|
||||||
scheduled4Flush map[oid.Address]struct{}
|
scheduled4Flush map[oid.Address]struct{}
|
||||||
scheduled4FlushMtx sync.RWMutex
|
scheduled4FlushMtx sync.RWMutex
|
||||||
// closeCh is close channel, protected by modeMtx.
|
|
||||||
closeCh chan struct{}
|
|
||||||
// wg is a wait group for flush workers.
|
// wg is a wait group for flush workers.
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
// store contains underlying database.
|
// store contains underlying database.
|
||||||
store
|
store
|
||||||
|
// cancel is cancel function, protected by modeMtx in Close.
|
||||||
|
cancel func()
|
||||||
}
|
}
|
||||||
|
|
||||||
// wcStorageType is used for write-cache operations logging.
|
// wcStorageType is used for write-cache operations logging.
|
||||||
|
@ -89,11 +89,6 @@ func (c *cache) Open(_ context.Context, readOnly bool) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return metaerr.Wrap(err)
|
return metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Opening after Close is done during maintenance mode,
|
|
||||||
// thus we need to create a channel here.
|
|
||||||
c.closeCh = make(chan struct{})
|
|
||||||
|
|
||||||
return metaerr.Wrap(c.initCounters())
|
return metaerr.Wrap(c.initCounters())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -101,8 +96,10 @@ func (c *cache) Open(_ context.Context, readOnly bool) error {
|
||||||
func (c *cache) Init() error {
|
func (c *cache) Init() error {
|
||||||
c.log.Info(logs.WritecacheBadgerInitExperimental)
|
c.log.Info(logs.WritecacheBadgerInitExperimental)
|
||||||
c.metrics.SetMode(c.mode)
|
c.metrics.SetMode(c.mode)
|
||||||
c.runFlushLoop()
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
c.runGCLoop()
|
c.cancel = cancel
|
||||||
fyrchik marked this conversation as resolved
Outdated
|
|||||||
|
c.runFlushLoop(ctx)
|
||||||
|
c.runGCLoop(ctx)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -111,8 +108,9 @@ func (c *cache) Close() error {
|
||||||
// We cannot lock mutex for the whole operation duration
|
// We cannot lock mutex for the whole operation duration
|
||||||
// because it is taken by some background workers, so `wg.Wait()` is done without modeMtx.
|
// because it is taken by some background workers, so `wg.Wait()` is done without modeMtx.
|
||||||
c.modeMtx.Lock()
|
c.modeMtx.Lock()
|
||||||
if c.closeCh != nil {
|
if c.cancel != nil {
|
||||||
close(c.closeCh)
|
c.cancel()
|
||||||
|
c.cancel = nil
|
||||||
}
|
}
|
||||||
c.mode = mode.DegradedReadOnly // prevent new operations from being processed
|
c.mode = mode.DegradedReadOnly // prevent new operations from being processed
|
||||||
c.modeMtx.Unlock()
|
c.modeMtx.Unlock()
|
||||||
|
@ -122,7 +120,6 @@ func (c *cache) Close() error {
|
||||||
c.modeMtx.Lock()
|
c.modeMtx.Lock()
|
||||||
defer c.modeMtx.Unlock()
|
defer c.modeMtx.Unlock()
|
||||||
|
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
What about setting it to nil right after we called it? What about setting it to nil right after we called it?
acid-ant
commented
Good catch, updated. Good catch, updated.
|
|||||||
c.closeCh = nil
|
|
||||||
var err error
|
var err error
|
||||||
if c.db != nil {
|
if c.db != nil {
|
||||||
err = c.db.Close()
|
err = c.db.Close()
|
||||||
|
|
|
@ -39,18 +39,16 @@ type collector struct {
|
||||||
cache *cache
|
cache *cache
|
||||||
fyrchik
commented
Why this change? Why this change?
aarifullin
commented
Agree. I also don't understand the intention to remove these fields and pass them to Agree. I also don't understand the intention to remove these fields and pass them to `send` method. Could you explain, please?
acid-ant
commented
That was the previous implementation when I tried to use context from main. Also, the idea was to reduce the amount of entities. Let's revert these changes. That was the previous implementation when I tried to use context from main. Also, the idea was to reduce the amount of entities. Let's revert these changes.
|
|||||||
scheduled int
|
scheduled int
|
||||||
processed int
|
processed int
|
||||||
cancel func()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *collector) Send(buf *z.Buffer) error {
|
func (c *collector) send(ctx context.Context, cancel func(), buf *z.Buffer) error {
|
||||||
list, err := badger.BufferToKVList(buf)
|
list, err := badger.BufferToKVList(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, kv := range list.Kv {
|
for _, kv := range list.Kv {
|
||||||
select {
|
select {
|
||||||
case <-c.cache.closeCh:
|
case <-ctx.Done():
|
||||||
c.cancel()
|
|
||||||
return nil
|
return nil
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
@ -58,7 +56,7 @@ func (c *collector) Send(buf *z.Buffer) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if c.scheduled >= flushBatchSize {
|
if c.scheduled >= flushBatchSize {
|
||||||
c.cancel()
|
cancel()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if got, want := len(kv.Key), len(internalKey{}); got != want {
|
if got, want := len(kv.Key), len(internalKey{}); got != want {
|
||||||
|
@ -90,8 +88,7 @@ func (c *collector) Send(buf *z.Buffer) error {
|
||||||
data: val,
|
data: val,
|
||||||
obj: obj,
|
obj: obj,
|
||||||
}:
|
}:
|
||||||
case <-c.cache.closeCh:
|
case <-ctx.Done():
|
||||||
c.cancel()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -99,10 +96,10 @@ func (c *collector) Send(buf *z.Buffer) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
|
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
|
||||||
func (c *cache) runFlushLoop() {
|
func (c *cache) runFlushLoop(ctx context.Context) {
|
||||||
for i := 0; i < c.workersCount; i++ {
|
for i := 0; i < c.workersCount; i++ {
|
||||||
c.wg.Add(1)
|
c.wg.Add(1)
|
||||||
go c.workerFlushSmall()
|
go c.workerFlushSmall(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.wg.Add(1)
|
c.wg.Add(1)
|
||||||
|
@ -115,19 +112,19 @@ func (c *cache) runFlushLoop() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-tt.C:
|
case <-tt.C:
|
||||||
c.flushSmallObjects()
|
c.flushSmallObjects(ctx)
|
||||||
tt.Reset(defaultFlushInterval)
|
tt.Reset(defaultFlushInterval)
|
||||||
case <-c.closeCh:
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) flushSmallObjects() {
|
func (c *cache) flushSmallObjects(ctx context.Context) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-c.closeCh:
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
@ -144,14 +141,15 @@ func (c *cache) flushSmallObjects() {
|
||||||
c.modeMtx.RUnlock()
|
c.modeMtx.RUnlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
ctx, cancel := context.WithCancel(context.TODO())
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
fyrchik
commented
Do we need this Do we need this `cancel` argument, why not cancel context after `Orchestrate()`?
acid-ant
commented
We need to interrupt at the moment when scheduled more than We need to interrupt at the moment when scheduled more than `flushBatchSize` items for flush. That is why I use `cancel` inside `Send`.
|
|||||||
coll := collector{
|
coll := collector{
|
||||||
cache: c,
|
cache: c,
|
||||||
cancel: cancel,
|
|
||||||
}
|
}
|
||||||
stream := c.db.NewStream()
|
stream := c.db.NewStream()
|
||||||
// All calls to Send are done by a single goroutine
|
// All calls to Send are done by a single goroutine
|
||||||
stream.Send = coll.Send
|
stream.Send = func(buf *z.Buffer) error {
|
||||||
|
return coll.send(ctx, cancel, buf)
|
||||||
|
}
|
||||||
if err := stream.Orchestrate(ctx); err != nil {
|
if err := stream.Orchestrate(ctx); err != nil {
|
||||||
c.log.Debug(fmt.Sprintf(
|
c.log.Debug(fmt.Sprintf(
|
||||||
"error during flushing object from wc: %s", err))
|
"error during flushing object from wc: %s", err))
|
||||||
|
@ -176,7 +174,7 @@ func (c *cache) reportFlushError(msg string, addr string, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// workerFlushSmall writes small objects to the main storage.
|
// workerFlushSmall writes small objects to the main storage.
|
||||||
func (c *cache) workerFlushSmall() {
|
func (c *cache) workerFlushSmall(ctx context.Context) {
|
||||||
defer c.wg.Done()
|
defer c.wg.Done()
|
||||||
|
|
||||||
var objInfo objectInfo
|
var objInfo objectInfo
|
||||||
|
@ -184,11 +182,11 @@ func (c *cache) workerFlushSmall() {
|
||||||
// Give priority to direct put.
|
// Give priority to direct put.
|
||||||
select {
|
select {
|
||||||
case objInfo = <-c.flushCh:
|
case objInfo = <-c.flushCh:
|
||||||
case <-c.closeCh:
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err := c.flushObject(context.TODO(), objInfo.obj, objInfo.data, writecache.StorageTypeDB)
|
err := c.flushObject(ctx, objInfo.obj, objInfo.data, writecache.StorageTypeDB)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
c.deleteFromDB([]internalKey{addr2key(objInfo.addr)})
|
c.deleteFromDB([]internalKey{addr2key(objInfo.addr)})
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,12 +1,13 @@
|
||||||
package writecachebadger
|
package writecachebadger
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (c *cache) runGCLoop() {
|
func (c *cache) runGCLoop(ctx context.Context) {
|
||||||
c.wg.Add(1)
|
c.wg.Add(1)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -17,7 +18,7 @@ func (c *cache) runGCLoop() {
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-c.closeCh:
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case <-t.C:
|
case <-t.C:
|
||||||
// This serves to synchronize the c.db field when changing mode as well.
|
// This serves to synchronize the c.db field when changing mode as well.
|
||||||
|
|
|
@ -30,8 +30,8 @@ type cache struct {
|
||||||
|
|
||||||
// flushCh is a channel with objects to flush.
|
// flushCh is a channel with objects to flush.
|
||||||
flushCh chan objectInfo
|
flushCh chan objectInfo
|
||||||
// closeCh is close channel, protected by modeMtx.
|
// cancel is cancel function, protected by modeMtx in Close.
|
||||||
`Init` is not protected by modeMtx. Comment or behaviour should be fixed.
acid-ant
commented
Description updated. Description updated.
|
|||||||
closeCh chan struct{}
|
cancel func()
|
||||||
// wg is a wait group for flush workers.
|
// wg is a wait group for flush workers.
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
// store contains underlying database.
|
// store contains underlying database.
|
||||||
|
@ -104,17 +104,15 @@ func (c *cache) Open(_ context.Context, readOnly bool) error {
|
||||||
return metaerr.Wrap(err)
|
return metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Opening after Close is done during maintenance mode,
|
|
||||||
// thus we need to create a channel here.
|
|
||||||
c.closeCh = make(chan struct{})
|
|
||||||
|
|
||||||
return metaerr.Wrap(c.initCounters())
|
return metaerr.Wrap(c.initCounters())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Init runs necessary services.
|
// Init runs necessary services.
|
||||||
func (c *cache) Init() error {
|
func (c *cache) Init() error {
|
||||||
c.metrics.SetMode(c.mode)
|
c.metrics.SetMode(c.mode)
|
||||||
c.runFlushLoop()
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
c.cancel = cancel
|
||||||
|
c.runFlushLoop(ctx)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -123,8 +121,9 @@ func (c *cache) Close() error {
|
||||||
// We cannot lock mutex for the whole operation duration
|
// We cannot lock mutex for the whole operation duration
|
||||||
// because it is taken by some background workers, so `wg.Wait()` is done without modeMtx.
|
// because it is taken by some background workers, so `wg.Wait()` is done without modeMtx.
|
||||||
c.modeMtx.Lock()
|
c.modeMtx.Lock()
|
||||||
if c.closeCh != nil {
|
if c.cancel != nil {
|
||||||
close(c.closeCh)
|
c.cancel()
|
||||||
|
c.cancel = nil
|
||||||
}
|
}
|
||||||
c.mode = mode.DegradedReadOnly // prevent new operations from being processed
|
c.mode = mode.DegradedReadOnly // prevent new operations from being processed
|
||||||
c.modeMtx.Unlock()
|
c.modeMtx.Unlock()
|
||||||
|
@ -134,7 +133,6 @@ func (c *cache) Close() error {
|
||||||
c.modeMtx.Lock()
|
c.modeMtx.Lock()
|
||||||
defer c.modeMtx.Unlock()
|
defer c.modeMtx.Unlock()
|
||||||
|
|
||||||
c.closeCh = nil
|
|
||||||
var err error
|
var err error
|
||||||
if c.db != nil {
|
if c.db != nil {
|
||||||
err = c.db.Close()
|
err = c.db.Close()
|
||||||
|
|
|
@ -36,20 +36,10 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
|
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
|
||||||
func (c *cache) runFlushLoop() {
|
func (c *cache) runFlushLoop(ctx context.Context) {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
|
|
||||||
ch := c.closeCh
|
|
||||||
c.wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
<-ch
|
|
||||||
cancel()
|
|
||||||
c.wg.Done()
|
|
||||||
}()
|
|
||||||
|
|
||||||
for i := 0; i < c.workersCount; i++ {
|
for i := 0; i < c.workersCount; i++ {
|
||||||
c.wg.Add(1)
|
c.wg.Add(1)
|
||||||
go c.workerFlushSmall()
|
go c.workerFlushSmall(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.wg.Add(1)
|
c.wg.Add(1)
|
||||||
|
@ -68,20 +58,20 @@ func (c *cache) runFlushLoop() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-tt.C:
|
case <-tt.C:
|
||||||
c.flushSmallObjects()
|
c.flushSmallObjects(ctx)
|
||||||
tt.Reset(defaultFlushInterval)
|
tt.Reset(defaultFlushInterval)
|
||||||
case <-c.closeCh:
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) flushSmallObjects() {
|
func (c *cache) flushSmallObjects(ctx context.Context) {
|
||||||
var lastKey []byte
|
var lastKey []byte
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-c.closeCh:
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
@ -137,7 +127,7 @@ func (c *cache) flushSmallObjects() {
|
||||||
count++
|
count++
|
||||||
select {
|
select {
|
||||||
case c.flushCh <- m[i]:
|
case c.flushCh <- m[i]:
|
||||||
case <-c.closeCh:
|
case <-ctx.Done():
|
||||||
c.modeMtx.RUnlock()
|
c.modeMtx.RUnlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -170,7 +160,7 @@ func (c *cache) workerFlushBig(ctx context.Context) {
|
||||||
_ = c.flushFSTree(ctx, true)
|
_ = c.flushFSTree(ctx, true)
|
||||||
|
|
||||||
c.modeMtx.RUnlock()
|
c.modeMtx.RUnlock()
|
||||||
case <-c.closeCh:
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -228,7 +218,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// workerFlushSmall writes small objects to the main storage.
|
// workerFlushSmall writes small objects to the main storage.
|
||||||
func (c *cache) workerFlushSmall() {
|
func (c *cache) workerFlushSmall(ctx context.Context) {
|
||||||
defer c.wg.Done()
|
defer c.wg.Done()
|
||||||
|
|
||||||
var objInfo objectInfo
|
var objInfo objectInfo
|
||||||
|
@ -236,11 +226,11 @@ func (c *cache) workerFlushSmall() {
|
||||||
// Give priority to direct put.
|
// Give priority to direct put.
|
||||||
select {
|
select {
|
||||||
case objInfo = <-c.flushCh:
|
case objInfo = <-c.flushCh:
|
||||||
case <-c.closeCh:
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err := c.flushObject(context.TODO(), objInfo.obj, objInfo.data, writecache.StorageTypeDB)
|
err := c.flushObject(ctx, objInfo.obj, objInfo.data, writecache.StorageTypeDB)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Error is handled in flushObject.
|
// Error is handled in flushObject.
|
||||||
continue
|
continue
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue
Not protected by
modeMtx
here as the comment says.Protected in
Close
, as it was forcloseCh
too. Thought this comment is about this.Ok, it seems
closeCh
had this problem too