From 3727d60331d3d410087a146da1df2dfbef7fa7cd Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 5 Mar 2025 13:53:32 +0300 Subject: [PATCH] [#1653] qos: Add metrics Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/config.go | 1 + internal/metrics/consts.go | 2 + internal/metrics/node.go | 6 ++ internal/metrics/qos.go | 52 +++++++++ internal/qos/limiter.go | 101 +++++++++++++++--- internal/qos/metrics.go | 31 ++++++ internal/qos/stats.go | 28 +++++ internal/qos/validate.go | 9 -- .../engine/engine_test.go | 4 + pkg/local_object_storage/shard/id.go | 1 + 10 files changed, 214 insertions(+), 21 deletions(-) create mode 100644 internal/metrics/qos.go create mode 100644 internal/qos/metrics.go create mode 100644 internal/qos/stats.go diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index afde0bbc0c..92aa827f24 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -1048,6 +1048,7 @@ func (c *cfg) getShardOpts(ctx context.Context, shCfg shardCfg) shardOptsWithID } if c.metricsCollector != nil { mbOptions = append(mbOptions, meta.WithMetrics(lsmetrics.NewMetabaseMetrics(shCfg.metaCfg.path, c.metricsCollector.MetabaseMetrics()))) + shCfg.limiter.SetMetrics(c.metricsCollector.QoSMetrics()) } var sh shardOptsWithID diff --git a/internal/metrics/consts.go b/internal/metrics/consts.go index cb165de69c..9123541ff5 100644 --- a/internal/metrics/consts.go +++ b/internal/metrics/consts.go @@ -23,6 +23,7 @@ const ( policerSubsystem = "policer" commonCacheSubsystem = "common_cache" multinetSubsystem = "multinet" + qosSubsystem = "qos" successLabel = "success" shardIDLabel = "shard_id" @@ -43,6 +44,7 @@ const ( hitLabel = "hit" cacheLabel = "cache" sourceIPLabel = "source_ip" + ioTagLabel = "io_tag" readWriteMode = "READ_WRITE" readOnlyMode = "READ_ONLY" diff --git a/internal/metrics/node.go b/internal/metrics/node.go index 4ea3c7c24d..8ade19eb27 100644 --- a/internal/metrics/node.go +++ b/internal/metrics/node.go @@ -26,6 +26,7 @@ type NodeMetrics struct { morphCache *morphCacheMetrics log logger.LogMetrics multinet *multinetMetrics + qos *QoSMetrics // nolint: unused appInfo *ApplicationInfo } @@ -55,6 +56,7 @@ func NewNodeMetrics() *NodeMetrics { log: logger.NewLogMetrics(namespace), appInfo: NewApplicationInfo(misc.Version), multinet: newMultinetMetrics(namespace), + qos: newQoSMetrics(), } } @@ -126,3 +128,7 @@ func (m *NodeMetrics) LogMetrics() logger.LogMetrics { func (m *NodeMetrics) MultinetMetrics() MultinetMetrics { return m.multinet } + +func (m *NodeMetrics) QoSMetrics() *QoSMetrics { + return m.qos +} diff --git a/internal/metrics/qos.go b/internal/metrics/qos.go new file mode 100644 index 0000000000..17fb67a27a --- /dev/null +++ b/internal/metrics/qos.go @@ -0,0 +1,52 @@ +package metrics + +import ( + "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics" + "github.com/prometheus/client_golang/prometheus" +) + +type QoSMetrics struct { + opsCounter *prometheus.GaugeVec +} + +func newQoSMetrics() *QoSMetrics { + return &QoSMetrics{ + opsCounter: metrics.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: qosSubsystem, + Name: "operations_total", + Help: "Count of pending, in progree, completed and failed due of resource exhausted error operations for each shard", + }, []string{shardIDLabel, operationLabel, ioTagLabel, typeLabel}), + } +} + +func (m *QoSMetrics) SetOperationTagCounters(shardID, operation, tag string, pending, inProgress, completed, resourceExhausted uint64) { + m.opsCounter.With(prometheus.Labels{ + shardIDLabel: shardID, + operationLabel: operation, + ioTagLabel: tag, + typeLabel: "pending", + }).Set(float64(pending)) + m.opsCounter.With(prometheus.Labels{ + shardIDLabel: shardID, + operationLabel: operation, + ioTagLabel: tag, + typeLabel: "in_progress", + }).Set(float64(inProgress)) + m.opsCounter.With(prometheus.Labels{ + shardIDLabel: shardID, + operationLabel: operation, + ioTagLabel: tag, + typeLabel: "completed", + }).Set(float64(completed)) + m.opsCounter.With(prometheus.Labels{ + shardIDLabel: shardID, + operationLabel: operation, + ioTagLabel: tag, + typeLabel: "resource_exhausted", + }).Set(float64(resourceExhausted)) +} + +func (m *QoSMetrics) Close(shardID string) { + m.opsCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID}) +} diff --git a/internal/qos/limiter.go b/internal/qos/limiter.go index b1406a7f37..8f00791c50 100644 --- a/internal/qos/limiter.go +++ b/internal/qos/limiter.go @@ -4,6 +4,8 @@ import ( "context" "errors" "fmt" + "sync" + "sync/atomic" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits" @@ -15,6 +17,9 @@ import ( const ( defaultIdleTimeout time.Duration = 0 defaultShare float64 = 1.0 + minusOne = ^uint64(0) + + defaultMetricsCollectTimeout = 5 * time.Second ) type ReleaseFunc scheduling.ReleaseFunc @@ -22,6 +27,8 @@ type ReleaseFunc scheduling.ReleaseFunc type Limiter interface { ReadRequest(context.Context) (ReleaseFunc, error) WriteRequest(context.Context) (ReleaseFunc, error) + SetParentID(string) + SetMetrics(Metrics) Close() } @@ -34,10 +41,6 @@ func NewLimiter(c *limits.Config) (Limiter, error) { if err := validateConfig(c); err != nil { return nil, err } - read, write := c.Read(), c.Write() - if isNoop(read, write) { - return noopLimiterInstance, nil - } readScheduler, err := createScheduler(c.Read()) if err != nil { return nil, fmt.Errorf("create read scheduler: %w", err) @@ -46,10 +49,18 @@ func NewLimiter(c *limits.Config) (Limiter, error) { if err != nil { return nil, fmt.Errorf("create write scheduler: %w", err) } - return &mClockLimiter{ + l := &mClockLimiter{ readScheduler: readScheduler, writeScheduler: writeScheduler, - }, nil + closeCh: make(chan struct{}), + wg: &sync.WaitGroup{}, + readStats: createStats(), + writeStats: createStats(), + } + l.shardID.Store(&shardID{}) + l.metrics.Store(&metricsHolder{metrics: &noopMetrics{}}) + l.startMetricsCollect() + return l, nil } func createScheduler(config limits.OpConfig) (scheduler, error) { @@ -91,7 +102,7 @@ var ( ) func NewNoopLimiter() Limiter { - return &noopLimiter{} + return noopLimiterInstance } type noopLimiter struct{} @@ -104,43 +115,109 @@ func (n *noopLimiter) WriteRequest(context.Context) (ReleaseFunc, error) { return releaseStub, nil } +func (n *noopLimiter) SetParentID(string) {} + func (n *noopLimiter) Close() {} +func (n *noopLimiter) SetMetrics(Metrics) {} + var _ Limiter = (*mClockLimiter)(nil) +type shardID struct { + id string +} + type mClockLimiter struct { readScheduler scheduler writeScheduler scheduler + + readStats map[string]*stat + writeStats map[string]*stat + + shardID atomic.Pointer[shardID] + metrics atomic.Pointer[metricsHolder] + closeCh chan struct{} + wg *sync.WaitGroup } func (n *mClockLimiter) ReadRequest(ctx context.Context) (ReleaseFunc, error) { - return requestArrival(ctx, n.readScheduler) + return requestArrival(ctx, n.readScheduler, n.readStats) } func (n *mClockLimiter) WriteRequest(ctx context.Context) (ReleaseFunc, error) { - return requestArrival(ctx, n.writeScheduler) + return requestArrival(ctx, n.writeScheduler, n.writeStats) } -func requestArrival(ctx context.Context, s scheduler) (ReleaseFunc, error) { +func requestArrival(ctx context.Context, s scheduler, stats map[string]*stat) (ReleaseFunc, error) { tag, ok := tagging.IOTagFromContext(ctx) if !ok { tag = IOTagClient.String() } + stat := getStat(tag, stats) + stat.pending.Add(1) if tag == IOTagCritical.String() { - return releaseStub, nil + stat.inProgress.Add(1) + return func() { + stat.completed.Add(1) + }, nil } rel, err := s.RequestArrival(ctx, tag) + stat.inProgress.Add(1) if err != nil { if errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) || errors.Is(err, errSemaphoreLimitExceeded) { + stat.resourceExhausted.Add(1) return nil, &apistatus.ResourceExhausted{} } + stat.completed.Add(1) return nil, err } - return ReleaseFunc(rel), nil + return func() { + rel() + stat.completed.Add(1) + }, nil } func (n *mClockLimiter) Close() { n.readScheduler.Close() n.writeScheduler.Close() + close(n.closeCh) + n.wg.Wait() + n.metrics.Load().metrics.Close(n.shardID.Load().id) +} + +func (n *mClockLimiter) SetParentID(parentID string) { + n.shardID.Store(&shardID{id: parentID}) +} + +func (n *mClockLimiter) SetMetrics(m Metrics) { + n.metrics.Store(&metricsHolder{metrics: m}) +} + +func (n *mClockLimiter) startMetricsCollect() { + n.wg.Add(1) + go func() { + defer n.wg.Done() + + ticker := time.NewTicker(defaultMetricsCollectTimeout) + defer ticker.Stop() + for { + select { + case <-n.closeCh: + return + case <-ticker.C: + shardID := n.shardID.Load().id + if shardID == "" { + continue + } + metrics := n.metrics.Load().metrics + for tag, s := range n.readStats { + metrics.SetOperationTagCounters(shardID, "read", tag, s.pending.Load(), s.inProgress.Load(), s.completed.Load(), s.resourceExhausted.Load()) + } + for tag, s := range n.writeStats { + metrics.SetOperationTagCounters(shardID, "write", tag, s.pending.Load(), s.inProgress.Load(), s.completed.Load(), s.resourceExhausted.Load()) + } + } + } + }() } diff --git a/internal/qos/metrics.go b/internal/qos/metrics.go new file mode 100644 index 0000000000..c00da51b71 --- /dev/null +++ b/internal/qos/metrics.go @@ -0,0 +1,31 @@ +package qos + +import "sync/atomic" + +type Metrics interface { + SetOperationTagCounters(shardID, operation, tag string, pending, inProgress, completed, resourceExhausted uint64) + Close(shardID string) +} + +var _ Metrics = (*noopMetrics)(nil) + +type noopMetrics struct{} + +func (n *noopMetrics) SetOperationTagCounters(string, string, string, uint64, uint64, uint64, uint64) { +} + +func (n *noopMetrics) Close(string) {} + +// stat presents limiter statistics cumulative counters. +// +// Each operation changes its status as follows: `pending` -> `in_progress` -> `completed` or `resource_exhausted`. +type stat struct { + completed atomic.Uint64 + pending atomic.Uint64 + resourceExhausted atomic.Uint64 + inProgress atomic.Uint64 +} + +type metricsHolder struct { + metrics Metrics +} diff --git a/internal/qos/stats.go b/internal/qos/stats.go new file mode 100644 index 0000000000..f077f552ba --- /dev/null +++ b/internal/qos/stats.go @@ -0,0 +1,28 @@ +package qos + +const unknownStatsTag = "unknown" + +var statTags = map[string]struct{}{ + IOTagClient.String(): {}, + IOTagBackground.String(): {}, + IOTagInternal.String(): {}, + IOTagPolicer.String(): {}, + IOTagWritecache.String(): {}, + IOTagCritical.String(): {}, + unknownStatsTag: {}, +} + +func createStats() map[string]*stat { + result := make(map[string]*stat) + for tag := range statTags { + result[tag] = &stat{} + } + return result +} + +func getStat(tag string, stats map[string]*stat) *stat { + if v, ok := stats[tag]; ok { + return v + } + return stats[unknownStatsTag] +} diff --git a/internal/qos/validate.go b/internal/qos/validate.go index 43aa749423..3fa4ebbd17 100644 --- a/internal/qos/validate.go +++ b/internal/qos/validate.go @@ -90,12 +90,3 @@ func float64Value(f *float64) float64 { } return *f } - -func isNoop(read, write limits.OpConfig) bool { - return read.MaxRunningOps == limits.NoLimit && - read.MaxWaitingOps == limits.NoLimit && - write.MaxRunningOps == limits.NoLimit && - write.MaxWaitingOps == limits.NoLimit && - len(read.Tags) == 0 && - len(write.Tags) == 0 -} diff --git a/pkg/local_object_storage/engine/engine_test.go b/pkg/local_object_storage/engine/engine_test.go index 7ddde1f024..3f9196128c 100644 --- a/pkg/local_object_storage/engine/engine_test.go +++ b/pkg/local_object_storage/engine/engine_test.go @@ -163,6 +163,8 @@ type testQoSLimiter struct { write atomic.Int64 } +func (t *testQoSLimiter) SetMetrics(qos.Metrics) {} + func (t *testQoSLimiter) Close() { require.Equal(t.t, int64(0), t.read.Load(), "read requests count after limiter close must be 0") require.Equal(t.t, int64(0), t.write.Load(), "write requests count after limiter close must be 0") @@ -177,3 +179,5 @@ func (t *testQoSLimiter) WriteRequest(context.Context) (qos.ReleaseFunc, error) t.write.Add(1) return func() { t.write.Add(-1) }, nil } + +func (t *testQoSLimiter) SetParentID(string) {} diff --git a/pkg/local_object_storage/shard/id.go b/pkg/local_object_storage/shard/id.go index 26492cf01e..b233b705c9 100644 --- a/pkg/local_object_storage/shard/id.go +++ b/pkg/local_object_storage/shard/id.go @@ -61,6 +61,7 @@ func (s *Shard) UpdateID(ctx context.Context) (err error) { if s.pilorama != nil { s.pilorama.SetParentID(s.info.ID.String()) } + s.opsLimiter.SetParentID(s.info.ID.String()) if len(idFromMetabase) == 0 && !modeDegraded { if setErr := s.metaBase.SetShardID(ctx, *s.info.ID, s.GetMode()); setErr != nil {