[#1648] writecache: Fix race condition when reporting cache size metrics
All checks were successful
DCO action / DCO (pull_request) Successful in 51s
Vulncheck / Vulncheck (pull_request) Successful in 1m1s
Build / Build Components (pull_request) Successful in 1m45s
Tests and linters / Lint (pull_request) Successful in 2m26s
Tests and linters / Run gofumpt (pull_request) Successful in 2m37s
Tests and linters / Tests (pull_request) Successful in 2m51s
Tests and linters / Staticcheck (pull_request) Successful in 3m13s
Tests and linters / Tests with -race (pull_request) Successful in 3m26s
Tests and linters / gopls check (pull_request) Successful in 4m13s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m13s
All checks were successful
DCO action / DCO (pull_request) Successful in 51s
Vulncheck / Vulncheck (pull_request) Successful in 1m1s
Build / Build Components (pull_request) Successful in 1m45s
Tests and linters / Lint (pull_request) Successful in 2m26s
Tests and linters / Run gofumpt (pull_request) Successful in 2m37s
Tests and linters / Tests (pull_request) Successful in 2m51s
Tests and linters / Staticcheck (pull_request) Successful in 3m13s
Tests and linters / Tests with -race (pull_request) Successful in 3m26s
Tests and linters / gopls check (pull_request) Successful in 4m13s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m13s
There is a race condition when multiple cache operation try to report the cache size metrics simultaneously. Consider the following example: - the initial total size of objects stored in the cache size is 2 - worker X deletes an object and reads the cache size, which is 1 - worker Y deletes an object and reads the cache size, which is 0 - worker Y reports the cache size it learnt, which is 0 - worker X reports the cache size it learnt, which is 1 As a result, the observed cache size is 1 (i. e. one object remains in the cache), which is incorrect because the actual cache size is 0. To fix this, a separate worker for reporting the cache size metric has been created. All operations should use a queue (a buffered channel) to request the reporter worker to report the metrics. Currently, all queue writes are non-blocking. Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
This commit is contained in:
parent
9b29e7392f
commit
b28e45f5a4
3 changed files with 44 additions and 2 deletions
|
@ -30,6 +30,9 @@ type cache struct {
|
|||
fsTree *fstree.FSTree
|
||||
// counter contains atomic counters for the number of objects stored in cache.
|
||||
counter *fstree.SimpleCounter
|
||||
|
||||
sizeMetricsReporterQueue chan struct{}
|
||||
sizeMetricsReporterStopped chan struct{}
|
||||
}
|
||||
|
||||
// wcStorageType is used for write-cache operations logging.
|
||||
|
@ -43,6 +46,8 @@ type objectInfo struct {
|
|||
const (
|
||||
defaultMaxObjectSize = 64 * 1024 * 1024 // 64 MiB
|
||||
defaultMaxCacheSize = 1 << 30 // 1 GiB
|
||||
|
||||
sizeMetricsReporterQueueSize = 1000
|
||||
)
|
||||
|
||||
var dummyCanceler context.CancelFunc = func() {}
|
||||
|
@ -94,10 +99,17 @@ func (c *cache) Open(_ context.Context, mod mode.Mode) error {
|
|||
if err != nil {
|
||||
return metaerr.Wrap(err)
|
||||
}
|
||||
c.initSizeMetricsReporter()
|
||||
c.initCounters()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cache) initSizeMetricsReporter() {
|
||||
c.sizeMetricsReporterQueue = make(chan struct{}, sizeMetricsReporterQueueSize)
|
||||
c.sizeMetricsReporterStopped = make(chan struct{})
|
||||
c.startCacheSizeReporter()
|
||||
}
|
||||
|
||||
// Init runs necessary services.
|
||||
func (c *cache) Init(ctx context.Context) error {
|
||||
c.metrics.SetMode(mode.ConvertToComponentModeDegraded(c.mode))
|
||||
|
@ -123,6 +135,9 @@ func (c *cache) Close(ctx context.Context) error {
|
|||
|
||||
c.wg.Wait()
|
||||
|
||||
// The reporter should be stopped after all flush workers are stopped.
|
||||
c.stopCacheSizeReporter()
|
||||
|
||||
c.modeMtx.Lock()
|
||||
defer c.modeMtx.Unlock()
|
||||
|
||||
|
|
|
@ -58,3 +58,28 @@ func (metricsStub) Flush(bool, StorageType) {}
|
|||
func (metricsStub) Evict(StorageType) {}
|
||||
|
||||
func (metricsStub) Close() {}
|
||||
|
||||
func (c *cache) startCacheSizeReporter() {
|
||||
go func() {
|
||||
defer close(c.sizeMetricsReporterStopped)
|
||||
|
||||
for range c.sizeMetricsReporterQueue {
|
||||
count, size := c.counter.CountSize()
|
||||
c.metrics.SetActualCounters(count)
|
||||
c.metrics.SetEstimateSize(size)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (c *cache) stopCacheSizeReporter() {
|
||||
if c.sizeMetricsReporterQueue == nil {
|
||||
// Underlying storage was not initialized.
|
||||
return
|
||||
}
|
||||
|
||||
close(c.sizeMetricsReporterQueue)
|
||||
<-c.sizeMetricsReporterStopped
|
||||
|
||||
c.sizeMetricsReporterQueue = nil
|
||||
c.sizeMetricsReporterStopped = nil
|
||||
}
|
||||
|
|
|
@ -2,8 +2,10 @@ package writecache
|
|||
|
||||
func (c *cache) estimateCacheSize() (uint64, uint64) {
|
||||
count, size := c.counter.CountSize()
|
||||
c.metrics.SetEstimateSize(size)
|
||||
c.metrics.SetActualCounters(count)
|
||||
select {
|
||||
case c.sizeMetricsReporterQueue <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
return count, size
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue