[#xx] writecache: Fix race condition when reporting cache size metrics
Some checks failed
DCO action / DCO (pull_request) Failing after 34s
Vulncheck / Vulncheck (pull_request) Successful in 1m0s
Build / Build Components (pull_request) Successful in 1m30s
Tests and linters / Run gofumpt (pull_request) Successful in 1m22s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m40s
Tests and linters / Staticcheck (pull_request) Successful in 2m30s
Tests and linters / Tests (pull_request) Successful in 2m33s
Tests and linters / gopls check (pull_request) Successful in 2m54s
Tests and linters / Tests with -race (pull_request) Successful in 3m9s
Tests and linters / Lint (pull_request) Successful in 3m26s
Some checks failed
DCO action / DCO (pull_request) Failing after 34s
Vulncheck / Vulncheck (pull_request) Successful in 1m0s
Build / Build Components (pull_request) Successful in 1m30s
Tests and linters / Run gofumpt (pull_request) Successful in 1m22s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m40s
Tests and linters / Staticcheck (pull_request) Successful in 2m30s
Tests and linters / Tests (pull_request) Successful in 2m33s
Tests and linters / gopls check (pull_request) Successful in 2m54s
Tests and linters / Tests with -race (pull_request) Successful in 3m9s
Tests and linters / Lint (pull_request) Successful in 3m26s
There is a race condition when multiple cache operation try to report the cache size metrics simultaneously. Consider the following example: - the initial total size of objects stored in the cache size is 2 - worker X deletes an object and reads the cache size, which is 1 - worker Y deletes an object and reads the cache size, which is 0 - worker Y reports the cache size it learnt, which is 0 - worker X reports the cache size it learnt, which is 1 As a result, the observed cache size is 1 (i. e. one object remains in the cache), which is incorrect because the actual cache size is 0. To fix this, a separate worker for reporting the cache size metric has been created. All operations should use a queue (a buffered channel) to request the reporter worker to report the metrics. Currently, all queue writes are non-blocking. Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
This commit is contained in:
parent
fe0cf86dc6
commit
f3c141de41
3 changed files with 44 additions and 2 deletions
|
@ -30,6 +30,9 @@ type cache struct {
|
||||||
fsTree *fstree.FSTree
|
fsTree *fstree.FSTree
|
||||||
// counter contains atomic counters for the number of objects stored in cache.
|
// counter contains atomic counters for the number of objects stored in cache.
|
||||||
counter *fstree.SimpleCounter
|
counter *fstree.SimpleCounter
|
||||||
|
|
||||||
|
sizeMetricsReporterQueue chan struct{}
|
||||||
|
sizeMetricsReporterStopped chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// wcStorageType is used for write-cache operations logging.
|
// wcStorageType is used for write-cache operations logging.
|
||||||
|
@ -43,6 +46,8 @@ type objectInfo struct {
|
||||||
const (
|
const (
|
||||||
defaultMaxObjectSize = 64 * 1024 * 1024 // 64 MiB
|
defaultMaxObjectSize = 64 * 1024 * 1024 // 64 MiB
|
||||||
defaultMaxCacheSize = 1 << 30 // 1 GiB
|
defaultMaxCacheSize = 1 << 30 // 1 GiB
|
||||||
|
|
||||||
|
sizeMetricsReporterQueueSize = 1000
|
||||||
)
|
)
|
||||||
|
|
||||||
var dummyCanceler context.CancelFunc = func() {}
|
var dummyCanceler context.CancelFunc = func() {}
|
||||||
|
@ -94,10 +99,17 @@ func (c *cache) Open(_ context.Context, mod mode.Mode) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return metaerr.Wrap(err)
|
return metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
c.initSizeMetricsReporter()
|
||||||
c.initCounters()
|
c.initCounters()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *cache) initSizeMetricsReporter() {
|
||||||
|
c.sizeMetricsReporterQueue = make(chan struct{}, sizeMetricsReporterQueueSize)
|
||||||
|
c.sizeMetricsReporterStopped = make(chan struct{})
|
||||||
|
c.startCacheSizeReporter()
|
||||||
|
}
|
||||||
|
|
||||||
// Init runs necessary services.
|
// Init runs necessary services.
|
||||||
func (c *cache) Init(ctx context.Context) error {
|
func (c *cache) Init(ctx context.Context) error {
|
||||||
c.metrics.SetMode(mode.ConvertToComponentModeDegraded(c.mode))
|
c.metrics.SetMode(mode.ConvertToComponentModeDegraded(c.mode))
|
||||||
|
@ -123,6 +135,9 @@ func (c *cache) Close(ctx context.Context) error {
|
||||||
|
|
||||||
c.wg.Wait()
|
c.wg.Wait()
|
||||||
|
|
||||||
|
// The reporter should be stopped after all flush workers are stopped.
|
||||||
|
c.stopCacheSizeReporter()
|
||||||
|
|
||||||
c.modeMtx.Lock()
|
c.modeMtx.Lock()
|
||||||
defer c.modeMtx.Unlock()
|
defer c.modeMtx.Unlock()
|
||||||
|
|
||||||
|
|
|
@ -58,3 +58,28 @@ func (metricsStub) Flush(bool, StorageType) {}
|
||||||
func (metricsStub) Evict(StorageType) {}
|
func (metricsStub) Evict(StorageType) {}
|
||||||
|
|
||||||
func (metricsStub) Close() {}
|
func (metricsStub) Close() {}
|
||||||
|
|
||||||
|
func (c *cache) startCacheSizeReporter() {
|
||||||
|
go func() {
|
||||||
|
defer close(c.sizeMetricsReporterStopped)
|
||||||
|
|
||||||
|
for range c.sizeMetricsReporterQueue {
|
||||||
|
count, size := c.counter.CountSize()
|
||||||
|
c.metrics.SetActualCounters(count)
|
||||||
|
c.metrics.SetEstimateSize(size)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cache) stopCacheSizeReporter() {
|
||||||
|
if c.sizeMetricsReporterQueue == nil {
|
||||||
|
// Underlying storage was not initialized.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
close(c.sizeMetricsReporterQueue)
|
||||||
|
<-c.sizeMetricsReporterStopped
|
||||||
|
|
||||||
|
c.sizeMetricsReporterQueue = nil
|
||||||
|
c.sizeMetricsReporterStopped = nil
|
||||||
|
}
|
||||||
|
|
|
@ -2,8 +2,10 @@ package writecache
|
||||||
|
|
||||||
func (c *cache) estimateCacheSize() (uint64, uint64) {
|
func (c *cache) estimateCacheSize() (uint64, uint64) {
|
||||||
count, size := c.counter.CountSize()
|
count, size := c.counter.CountSize()
|
||||||
c.metrics.SetEstimateSize(size)
|
select {
|
||||||
c.metrics.SetActualCounters(count)
|
case c.sizeMetricsReporterQueue <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
return count, size
|
return count, size
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue