diff --git a/pkg/local_object_storage/writecache/cache.go b/pkg/local_object_storage/writecache/cache.go index b99d73d3a..fb57a9407 100644 --- a/pkg/local_object_storage/writecache/cache.go +++ b/pkg/local_object_storage/writecache/cache.go @@ -30,6 +30,9 @@ type cache struct { fsTree *fstree.FSTree // counter contains atomic counters for the number of objects stored in cache. counter *fstree.SimpleCounter + + sizeMetricsReporterQueue chan struct{} + sizeMetricsReporterStopped chan struct{} } // wcStorageType is used for write-cache operations logging. @@ -43,6 +46,8 @@ type objectInfo struct { const ( defaultMaxObjectSize = 64 * 1024 * 1024 // 64 MiB defaultMaxCacheSize = 1 << 30 // 1 GiB + + sizeMetricsReporterQueueSize = 1000 ) var dummyCanceler context.CancelFunc = func() {} @@ -94,10 +99,17 @@ func (c *cache) Open(_ context.Context, mod mode.Mode) error { if err != nil { return metaerr.Wrap(err) } + c.initSizeMetricsReporter() c.initCounters() return nil } +func (c *cache) initSizeMetricsReporter() { + c.sizeMetricsReporterQueue = make(chan struct{}, sizeMetricsReporterQueueSize) + c.sizeMetricsReporterStopped = make(chan struct{}) + c.startCacheSizeReporter() +} + // Init runs necessary services. func (c *cache) Init(ctx context.Context) error { c.metrics.SetMode(mode.ConvertToComponentModeDegraded(c.mode)) @@ -123,6 +135,9 @@ func (c *cache) Close(ctx context.Context) error { c.wg.Wait() + // The reporter should be stopped after all flush workers are stopped. + c.stopCacheSizeReporter() + c.modeMtx.Lock() defer c.modeMtx.Unlock() diff --git a/pkg/local_object_storage/writecache/metrics.go b/pkg/local_object_storage/writecache/metrics.go index e3641f85e..7503d3cce 100644 --- a/pkg/local_object_storage/writecache/metrics.go +++ b/pkg/local_object_storage/writecache/metrics.go @@ -58,3 +58,28 @@ func (metricsStub) Flush(bool, StorageType) {} func (metricsStub) Evict(StorageType) {} func (metricsStub) Close() {} + +func (c *cache) startCacheSizeReporter() { + go func() { + defer close(c.sizeMetricsReporterStopped) + + for range c.sizeMetricsReporterQueue { + count, size := c.counter.CountSize() + c.metrics.SetActualCounters(count) + c.metrics.SetEstimateSize(size) + } + }() +} + +func (c *cache) stopCacheSizeReporter() { + if c.sizeMetricsReporterQueue == nil { + // Underlying storage was not initialized. + return + } + + close(c.sizeMetricsReporterQueue) + <-c.sizeMetricsReporterStopped + + c.sizeMetricsReporterQueue = nil + c.sizeMetricsReporterStopped = nil +} diff --git a/pkg/local_object_storage/writecache/state.go b/pkg/local_object_storage/writecache/state.go index 44caa2603..5d4f1baa0 100644 --- a/pkg/local_object_storage/writecache/state.go +++ b/pkg/local_object_storage/writecache/state.go @@ -2,8 +2,10 @@ package writecache func (c *cache) estimateCacheSize() (uint64, uint64) { count, size := c.counter.CountSize() - c.metrics.SetEstimateSize(size) - c.metrics.SetActualCounters(count) + select { + case c.sizeMetricsReporterQueue <- struct{}{}: + default: + } return count, size }