forked from TrueCloudLab/frostfs-node
[#1201] writecache: Cancel background flush without lock
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
81070ada01
commit
4c7ff159ec
1 changed files with 10 additions and 7 deletions
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||||
|
@ -30,7 +31,7 @@ type cache struct {
|
||||||
// flushCh is a channel with objects to flush.
|
// flushCh is a channel with objects to flush.
|
||||||
flushCh chan objectInfo
|
flushCh chan objectInfo
|
||||||
// cancel is cancel function, protected by modeMtx in Close.
|
// cancel is cancel function, protected by modeMtx in Close.
|
||||||
cancel func()
|
cancel atomic.Value
|
||||||
// wg is a wait group for flush workers.
|
// wg is a wait group for flush workers.
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
// store contains underlying database.
|
// store contains underlying database.
|
||||||
|
@ -54,7 +55,10 @@ const (
|
||||||
defaultMaxCacheSize = 1 << 30 // 1 GiB
|
defaultMaxCacheSize = 1 << 30 // 1 GiB
|
||||||
)
|
)
|
||||||
|
|
||||||
var defaultBucket = []byte{0}
|
var (
|
||||||
|
defaultBucket = []byte{0}
|
||||||
|
dummyCanceler context.CancelFunc = func() {}
|
||||||
|
)
|
||||||
|
|
||||||
// New creates new writecache instance.
|
// New creates new writecache instance.
|
||||||
func New(opts ...Option) Cache {
|
func New(opts ...Option) Cache {
|
||||||
|
@ -114,20 +118,19 @@ func (c *cache) Open(_ context.Context, mod mode.Mode) error {
|
||||||
func (c *cache) Init() error {
|
func (c *cache) Init() error {
|
||||||
c.metrics.SetMode(mode.ConvertToComponentModeDegraded(c.mode))
|
c.metrics.SetMode(mode.ConvertToComponentModeDegraded(c.mode))
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
c.cancel = cancel
|
c.cancel.Store(cancel)
|
||||||
c.runFlushLoop(ctx)
|
c.runFlushLoop(ctx)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
|
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
|
||||||
func (c *cache) Close() error {
|
func (c *cache) Close() error {
|
||||||
|
if cancelValue := c.cancel.Swap(dummyCanceler); cancelValue != nil {
|
||||||
|
cancelValue.(context.CancelFunc)()
|
||||||
|
}
|
||||||
// We cannot lock mutex for the whole operation duration
|
// We cannot lock mutex for the whole operation duration
|
||||||
// because it is taken by some background workers, so `wg.Wait()` is done without modeMtx.
|
// because it is taken by some background workers, so `wg.Wait()` is done without modeMtx.
|
||||||
c.modeMtx.Lock()
|
c.modeMtx.Lock()
|
||||||
if c.cancel != nil {
|
|
||||||
c.cancel()
|
|
||||||
c.cancel = nil
|
|
||||||
}
|
|
||||||
c.mode = mode.DegradedReadOnly // prevent new operations from being processed
|
c.mode = mode.DegradedReadOnly // prevent new operations from being processed
|
||||||
c.modeMtx.Unlock()
|
c.modeMtx.Unlock()
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue