forked from TrueCloudLab/frostfs-node
[#642] writecache: Remove usage of close channel in bbolt
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
parent
0b7fd293f1
commit
0f4633d65d
2 changed files with 19 additions and 31 deletions
|
@ -30,8 +30,8 @@ type cache struct {
|
||||||
|
|
||||||
// flushCh is a channel with objects to flush.
|
// flushCh is a channel with objects to flush.
|
||||||
flushCh chan objectInfo
|
flushCh chan objectInfo
|
||||||
// closeCh is close channel, protected by modeMtx.
|
// cancel is cancel function, protected by modeMtx in Close.
|
||||||
closeCh chan struct{}
|
cancel func()
|
||||||
// wg is a wait group for flush workers.
|
// wg is a wait group for flush workers.
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
// store contains underlying database.
|
// store contains underlying database.
|
||||||
|
@ -104,17 +104,15 @@ func (c *cache) Open(_ context.Context, readOnly bool) error {
|
||||||
return metaerr.Wrap(err)
|
return metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Opening after Close is done during maintenance mode,
|
|
||||||
// thus we need to create a channel here.
|
|
||||||
c.closeCh = make(chan struct{})
|
|
||||||
|
|
||||||
return metaerr.Wrap(c.initCounters())
|
return metaerr.Wrap(c.initCounters())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Init runs necessary services.
|
// Init runs necessary services.
|
||||||
func (c *cache) Init() error {
|
func (c *cache) Init() error {
|
||||||
c.metrics.SetMode(c.mode)
|
c.metrics.SetMode(c.mode)
|
||||||
c.runFlushLoop()
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
c.cancel = cancel
|
||||||
|
c.runFlushLoop(ctx)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -123,8 +121,9 @@ func (c *cache) Close() error {
|
||||||
// We cannot lock mutex for the whole operation duration
|
// We cannot lock mutex for the whole operation duration
|
||||||
// because it is taken by some background workers, so `wg.Wait()` is done without modeMtx.
|
// because it is taken by some background workers, so `wg.Wait()` is done without modeMtx.
|
||||||
c.modeMtx.Lock()
|
c.modeMtx.Lock()
|
||||||
if c.closeCh != nil {
|
if c.cancel != nil {
|
||||||
close(c.closeCh)
|
c.cancel()
|
||||||
|
c.cancel = nil
|
||||||
}
|
}
|
||||||
c.mode = mode.DegradedReadOnly // prevent new operations from being processed
|
c.mode = mode.DegradedReadOnly // prevent new operations from being processed
|
||||||
c.modeMtx.Unlock()
|
c.modeMtx.Unlock()
|
||||||
|
@ -134,7 +133,6 @@ func (c *cache) Close() error {
|
||||||
c.modeMtx.Lock()
|
c.modeMtx.Lock()
|
||||||
defer c.modeMtx.Unlock()
|
defer c.modeMtx.Unlock()
|
||||||
|
|
||||||
c.closeCh = nil
|
|
||||||
var err error
|
var err error
|
||||||
if c.db != nil {
|
if c.db != nil {
|
||||||
err = c.db.Close()
|
err = c.db.Close()
|
||||||
|
|
|
@ -36,20 +36,10 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
|
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
|
||||||
func (c *cache) runFlushLoop() {
|
func (c *cache) runFlushLoop(ctx context.Context) {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
|
|
||||||
ch := c.closeCh
|
|
||||||
c.wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
<-ch
|
|
||||||
cancel()
|
|
||||||
c.wg.Done()
|
|
||||||
}()
|
|
||||||
|
|
||||||
for i := 0; i < c.workersCount; i++ {
|
for i := 0; i < c.workersCount; i++ {
|
||||||
c.wg.Add(1)
|
c.wg.Add(1)
|
||||||
go c.workerFlushSmall()
|
go c.workerFlushSmall(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.wg.Add(1)
|
c.wg.Add(1)
|
||||||
|
@ -68,20 +58,20 @@ func (c *cache) runFlushLoop() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-tt.C:
|
case <-tt.C:
|
||||||
c.flushSmallObjects()
|
c.flushSmallObjects(ctx)
|
||||||
tt.Reset(defaultFlushInterval)
|
tt.Reset(defaultFlushInterval)
|
||||||
case <-c.closeCh:
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) flushSmallObjects() {
|
func (c *cache) flushSmallObjects(ctx context.Context) {
|
||||||
var lastKey []byte
|
var lastKey []byte
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-c.closeCh:
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
@ -137,7 +127,7 @@ func (c *cache) flushSmallObjects() {
|
||||||
count++
|
count++
|
||||||
select {
|
select {
|
||||||
case c.flushCh <- m[i]:
|
case c.flushCh <- m[i]:
|
||||||
case <-c.closeCh:
|
case <-ctx.Done():
|
||||||
c.modeMtx.RUnlock()
|
c.modeMtx.RUnlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -170,7 +160,7 @@ func (c *cache) workerFlushBig(ctx context.Context) {
|
||||||
_ = c.flushFSTree(ctx, true)
|
_ = c.flushFSTree(ctx, true)
|
||||||
|
|
||||||
c.modeMtx.RUnlock()
|
c.modeMtx.RUnlock()
|
||||||
case <-c.closeCh:
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -228,7 +218,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// workerFlushSmall writes small objects to the main storage.
|
// workerFlushSmall writes small objects to the main storage.
|
||||||
func (c *cache) workerFlushSmall() {
|
func (c *cache) workerFlushSmall(ctx context.Context) {
|
||||||
defer c.wg.Done()
|
defer c.wg.Done()
|
||||||
|
|
||||||
var objInfo objectInfo
|
var objInfo objectInfo
|
||||||
|
@ -236,11 +226,11 @@ func (c *cache) workerFlushSmall() {
|
||||||
// Give priority to direct put.
|
// Give priority to direct put.
|
||||||
select {
|
select {
|
||||||
case objInfo = <-c.flushCh:
|
case objInfo = <-c.flushCh:
|
||||||
case <-c.closeCh:
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err := c.flushObject(context.TODO(), objInfo.obj, objInfo.data, writecache.StorageTypeDB)
|
err := c.flushObject(ctx, objInfo.obj, objInfo.data, writecache.StorageTypeDB)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Error is handled in flushObject.
|
// Error is handled in flushObject.
|
||||||
continue
|
continue
|
||||||
|
|
Loading…
Reference in a new issue