From f3c141de413e38ecadaa89baf2c7187d1dadb87e Mon Sep 17 00:00:00 2001 From: Aleksey Savchuk Date: Tue, 18 Feb 2025 10:51:43 +0300 Subject: [PATCH] [#xx] writecache: Fix race condition when reporting cache size metrics There is a race condition when multiple cache operation try to report the cache size metrics simultaneously. Consider the following example: - the initial total size of objects stored in the cache size is 2 - worker X deletes an object and reads the cache size, which is 1 - worker Y deletes an object and reads the cache size, which is 0 - worker Y reports the cache size it learnt, which is 0 - worker X reports the cache size it learnt, which is 1 As a result, the observed cache size is 1 (i. e. one object remains in the cache), which is incorrect because the actual cache size is 0. To fix this, a separate worker for reporting the cache size metric has been created. All operations should use a queue (a buffered channel) to request the reporter worker to report the metrics. Currently, all queue writes are non-blocking. Signed-off-by: Aleksey Savchuk --- pkg/local_object_storage/writecache/cache.go | 15 +++++++++++ .../writecache/metrics.go | 25 +++++++++++++++++++ pkg/local_object_storage/writecache/state.go | 6 +++-- 3 files changed, 44 insertions(+), 2 deletions(-) diff --git a/pkg/local_object_storage/writecache/cache.go b/pkg/local_object_storage/writecache/cache.go index b99d73d3a..fb57a9407 100644 --- a/pkg/local_object_storage/writecache/cache.go +++ b/pkg/local_object_storage/writecache/cache.go @@ -30,6 +30,9 @@ type cache struct { fsTree *fstree.FSTree // counter contains atomic counters for the number of objects stored in cache. counter *fstree.SimpleCounter + + sizeMetricsReporterQueue chan struct{} + sizeMetricsReporterStopped chan struct{} } // wcStorageType is used for write-cache operations logging. @@ -43,6 +46,8 @@ type objectInfo struct { const ( defaultMaxObjectSize = 64 * 1024 * 1024 // 64 MiB defaultMaxCacheSize = 1 << 30 // 1 GiB + + sizeMetricsReporterQueueSize = 1000 ) var dummyCanceler context.CancelFunc = func() {} @@ -94,10 +99,17 @@ func (c *cache) Open(_ context.Context, mod mode.Mode) error { if err != nil { return metaerr.Wrap(err) } + c.initSizeMetricsReporter() c.initCounters() return nil } +func (c *cache) initSizeMetricsReporter() { + c.sizeMetricsReporterQueue = make(chan struct{}, sizeMetricsReporterQueueSize) + c.sizeMetricsReporterStopped = make(chan struct{}) + c.startCacheSizeReporter() +} + // Init runs necessary services. func (c *cache) Init(ctx context.Context) error { c.metrics.SetMode(mode.ConvertToComponentModeDegraded(c.mode)) @@ -123,6 +135,9 @@ func (c *cache) Close(ctx context.Context) error { c.wg.Wait() + // The reporter should be stopped after all flush workers are stopped. + c.stopCacheSizeReporter() + c.modeMtx.Lock() defer c.modeMtx.Unlock() diff --git a/pkg/local_object_storage/writecache/metrics.go b/pkg/local_object_storage/writecache/metrics.go index e3641f85e..7503d3cce 100644 --- a/pkg/local_object_storage/writecache/metrics.go +++ b/pkg/local_object_storage/writecache/metrics.go @@ -58,3 +58,28 @@ func (metricsStub) Flush(bool, StorageType) {} func (metricsStub) Evict(StorageType) {} func (metricsStub) Close() {} + +func (c *cache) startCacheSizeReporter() { + go func() { + defer close(c.sizeMetricsReporterStopped) + + for range c.sizeMetricsReporterQueue { + count, size := c.counter.CountSize() + c.metrics.SetActualCounters(count) + c.metrics.SetEstimateSize(size) + } + }() +} + +func (c *cache) stopCacheSizeReporter() { + if c.sizeMetricsReporterQueue == nil { + // Underlying storage was not initialized. + return + } + + close(c.sizeMetricsReporterQueue) + <-c.sizeMetricsReporterStopped + + c.sizeMetricsReporterQueue = nil + c.sizeMetricsReporterStopped = nil +} diff --git a/pkg/local_object_storage/writecache/state.go b/pkg/local_object_storage/writecache/state.go index 44caa2603..5d4f1baa0 100644 --- a/pkg/local_object_storage/writecache/state.go +++ b/pkg/local_object_storage/writecache/state.go @@ -2,8 +2,10 @@ package writecache func (c *cache) estimateCacheSize() (uint64, uint64) { count, size := c.counter.CountSize() - c.metrics.SetEstimateSize(size) - c.metrics.SetActualCounters(count) + select { + case c.sizeMetricsReporterQueue <- struct{}{}: + default: + } return count, size }