[#1653] qos: Add metrics

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2025-03-05 13:53:32 +03:00 committed by Dmitrii Stepanov
parent d36afa31c7
commit 3727d60331
10 changed files with 214 additions and 21 deletions

View file

@ -1048,6 +1048,7 @@ func (c *cfg) getShardOpts(ctx context.Context, shCfg shardCfg) shardOptsWithID
} }
if c.metricsCollector != nil { if c.metricsCollector != nil {
mbOptions = append(mbOptions, meta.WithMetrics(lsmetrics.NewMetabaseMetrics(shCfg.metaCfg.path, c.metricsCollector.MetabaseMetrics()))) mbOptions = append(mbOptions, meta.WithMetrics(lsmetrics.NewMetabaseMetrics(shCfg.metaCfg.path, c.metricsCollector.MetabaseMetrics())))
shCfg.limiter.SetMetrics(c.metricsCollector.QoSMetrics())
} }
var sh shardOptsWithID var sh shardOptsWithID

View file

@ -23,6 +23,7 @@ const (
policerSubsystem = "policer" policerSubsystem = "policer"
commonCacheSubsystem = "common_cache" commonCacheSubsystem = "common_cache"
multinetSubsystem = "multinet" multinetSubsystem = "multinet"
qosSubsystem = "qos"
successLabel = "success" successLabel = "success"
shardIDLabel = "shard_id" shardIDLabel = "shard_id"
@ -43,6 +44,7 @@ const (
hitLabel = "hit" hitLabel = "hit"
cacheLabel = "cache" cacheLabel = "cache"
sourceIPLabel = "source_ip" sourceIPLabel = "source_ip"
ioTagLabel = "io_tag"
readWriteMode = "READ_WRITE" readWriteMode = "READ_WRITE"
readOnlyMode = "READ_ONLY" readOnlyMode = "READ_ONLY"

View file

@ -26,6 +26,7 @@ type NodeMetrics struct {
morphCache *morphCacheMetrics morphCache *morphCacheMetrics
log logger.LogMetrics log logger.LogMetrics
multinet *multinetMetrics multinet *multinetMetrics
qos *QoSMetrics
// nolint: unused // nolint: unused
appInfo *ApplicationInfo appInfo *ApplicationInfo
} }
@ -55,6 +56,7 @@ func NewNodeMetrics() *NodeMetrics {
log: logger.NewLogMetrics(namespace), log: logger.NewLogMetrics(namespace),
appInfo: NewApplicationInfo(misc.Version), appInfo: NewApplicationInfo(misc.Version),
multinet: newMultinetMetrics(namespace), multinet: newMultinetMetrics(namespace),
qos: newQoSMetrics(),
} }
} }
@ -126,3 +128,7 @@ func (m *NodeMetrics) LogMetrics() logger.LogMetrics {
func (m *NodeMetrics) MultinetMetrics() MultinetMetrics { func (m *NodeMetrics) MultinetMetrics() MultinetMetrics {
return m.multinet return m.multinet
} }
func (m *NodeMetrics) QoSMetrics() *QoSMetrics {
return m.qos
}

52
internal/metrics/qos.go Normal file
View file

@ -0,0 +1,52 @@
package metrics
import (
"git.frostfs.info/TrueCloudLab/frostfs-observability/metrics"
"github.com/prometheus/client_golang/prometheus"
)
type QoSMetrics struct {
opsCounter *prometheus.GaugeVec
}
func newQoSMetrics() *QoSMetrics {
return &QoSMetrics{
opsCounter: metrics.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: qosSubsystem,
Name: "operations_total",
Help: "Count of pending, in progree, completed and failed due of resource exhausted error operations for each shard",
}, []string{shardIDLabel, operationLabel, ioTagLabel, typeLabel}),
}
}
func (m *QoSMetrics) SetOperationTagCounters(shardID, operation, tag string, pending, inProgress, completed, resourceExhausted uint64) {
m.opsCounter.With(prometheus.Labels{
shardIDLabel: shardID,
operationLabel: operation,
ioTagLabel: tag,
typeLabel: "pending",
}).Set(float64(pending))
m.opsCounter.With(prometheus.Labels{
shardIDLabel: shardID,
operationLabel: operation,
ioTagLabel: tag,
typeLabel: "in_progress",
}).Set(float64(inProgress))
m.opsCounter.With(prometheus.Labels{
shardIDLabel: shardID,
operationLabel: operation,
ioTagLabel: tag,
typeLabel: "completed",
}).Set(float64(completed))
m.opsCounter.With(prometheus.Labels{
shardIDLabel: shardID,
operationLabel: operation,
ioTagLabel: tag,
typeLabel: "resource_exhausted",
}).Set(float64(resourceExhausted))
}
func (m *QoSMetrics) Close(shardID string) {
m.opsCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
}

View file

@ -4,6 +4,8 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"sync"
"sync/atomic"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits"
@ -15,6 +17,9 @@ import (
const ( const (
defaultIdleTimeout time.Duration = 0 defaultIdleTimeout time.Duration = 0
defaultShare float64 = 1.0 defaultShare float64 = 1.0
minusOne = ^uint64(0)
defaultMetricsCollectTimeout = 5 * time.Second
) )
type ReleaseFunc scheduling.ReleaseFunc type ReleaseFunc scheduling.ReleaseFunc
@ -22,6 +27,8 @@ type ReleaseFunc scheduling.ReleaseFunc
type Limiter interface { type Limiter interface {
ReadRequest(context.Context) (ReleaseFunc, error) ReadRequest(context.Context) (ReleaseFunc, error)
WriteRequest(context.Context) (ReleaseFunc, error) WriteRequest(context.Context) (ReleaseFunc, error)
SetParentID(string)
SetMetrics(Metrics)
Close() Close()
} }
@ -34,10 +41,6 @@ func NewLimiter(c *limits.Config) (Limiter, error) {
if err := validateConfig(c); err != nil { if err := validateConfig(c); err != nil {
return nil, err return nil, err
} }
read, write := c.Read(), c.Write()
if isNoop(read, write) {
return noopLimiterInstance, nil
}
readScheduler, err := createScheduler(c.Read()) readScheduler, err := createScheduler(c.Read())
if err != nil { if err != nil {
return nil, fmt.Errorf("create read scheduler: %w", err) return nil, fmt.Errorf("create read scheduler: %w", err)
@ -46,10 +49,18 @@ func NewLimiter(c *limits.Config) (Limiter, error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("create write scheduler: %w", err) return nil, fmt.Errorf("create write scheduler: %w", err)
} }
return &mClockLimiter{ l := &mClockLimiter{
readScheduler: readScheduler, readScheduler: readScheduler,
writeScheduler: writeScheduler, writeScheduler: writeScheduler,
}, nil closeCh: make(chan struct{}),
wg: &sync.WaitGroup{},
readStats: createStats(),
writeStats: createStats(),
}
l.shardID.Store(&shardID{})
l.metrics.Store(&metricsHolder{metrics: &noopMetrics{}})
l.startMetricsCollect()
return l, nil
} }
func createScheduler(config limits.OpConfig) (scheduler, error) { func createScheduler(config limits.OpConfig) (scheduler, error) {
@ -91,7 +102,7 @@ var (
) )
func NewNoopLimiter() Limiter { func NewNoopLimiter() Limiter {
return &noopLimiter{} return noopLimiterInstance
} }
type noopLimiter struct{} type noopLimiter struct{}
@ -104,43 +115,109 @@ func (n *noopLimiter) WriteRequest(context.Context) (ReleaseFunc, error) {
return releaseStub, nil return releaseStub, nil
} }
func (n *noopLimiter) SetParentID(string) {}
func (n *noopLimiter) Close() {} func (n *noopLimiter) Close() {}
func (n *noopLimiter) SetMetrics(Metrics) {}
var _ Limiter = (*mClockLimiter)(nil) var _ Limiter = (*mClockLimiter)(nil)
type shardID struct {
id string
}
type mClockLimiter struct { type mClockLimiter struct {
readScheduler scheduler readScheduler scheduler
writeScheduler scheduler writeScheduler scheduler
readStats map[string]*stat
writeStats map[string]*stat
shardID atomic.Pointer[shardID]
metrics atomic.Pointer[metricsHolder]
closeCh chan struct{}
wg *sync.WaitGroup
} }
func (n *mClockLimiter) ReadRequest(ctx context.Context) (ReleaseFunc, error) { func (n *mClockLimiter) ReadRequest(ctx context.Context) (ReleaseFunc, error) {
return requestArrival(ctx, n.readScheduler) return requestArrival(ctx, n.readScheduler, n.readStats)
} }
func (n *mClockLimiter) WriteRequest(ctx context.Context) (ReleaseFunc, error) { func (n *mClockLimiter) WriteRequest(ctx context.Context) (ReleaseFunc, error) {
return requestArrival(ctx, n.writeScheduler) return requestArrival(ctx, n.writeScheduler, n.writeStats)
} }
func requestArrival(ctx context.Context, s scheduler) (ReleaseFunc, error) { func requestArrival(ctx context.Context, s scheduler, stats map[string]*stat) (ReleaseFunc, error) {
tag, ok := tagging.IOTagFromContext(ctx) tag, ok := tagging.IOTagFromContext(ctx)
if !ok { if !ok {
tag = IOTagClient.String() tag = IOTagClient.String()
} }
stat := getStat(tag, stats)
stat.pending.Add(1)
if tag == IOTagCritical.String() { if tag == IOTagCritical.String() {
return releaseStub, nil stat.inProgress.Add(1)
return func() {
stat.completed.Add(1)
}, nil
} }
rel, err := s.RequestArrival(ctx, tag) rel, err := s.RequestArrival(ctx, tag)
stat.inProgress.Add(1)
if err != nil { if err != nil {
if errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) || if errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) ||
errors.Is(err, errSemaphoreLimitExceeded) { errors.Is(err, errSemaphoreLimitExceeded) {
stat.resourceExhausted.Add(1)
return nil, &apistatus.ResourceExhausted{} return nil, &apistatus.ResourceExhausted{}
} }
stat.completed.Add(1)
return nil, err return nil, err
} }
return ReleaseFunc(rel), nil return func() {
rel()
stat.completed.Add(1)
}, nil
} }
func (n *mClockLimiter) Close() { func (n *mClockLimiter) Close() {
n.readScheduler.Close() n.readScheduler.Close()
n.writeScheduler.Close() n.writeScheduler.Close()
close(n.closeCh)
n.wg.Wait()
n.metrics.Load().metrics.Close(n.shardID.Load().id)
}
func (n *mClockLimiter) SetParentID(parentID string) {
n.shardID.Store(&shardID{id: parentID})
}
func (n *mClockLimiter) SetMetrics(m Metrics) {
n.metrics.Store(&metricsHolder{metrics: m})
}
func (n *mClockLimiter) startMetricsCollect() {
n.wg.Add(1)
go func() {
defer n.wg.Done()
ticker := time.NewTicker(defaultMetricsCollectTimeout)
defer ticker.Stop()
for {
select {
case <-n.closeCh:
return
case <-ticker.C:
shardID := n.shardID.Load().id
if shardID == "" {
continue
}
metrics := n.metrics.Load().metrics
for tag, s := range n.readStats {
metrics.SetOperationTagCounters(shardID, "read", tag, s.pending.Load(), s.inProgress.Load(), s.completed.Load(), s.resourceExhausted.Load())
}
for tag, s := range n.writeStats {
metrics.SetOperationTagCounters(shardID, "write", tag, s.pending.Load(), s.inProgress.Load(), s.completed.Load(), s.resourceExhausted.Load())
}
}
}
}()
} }

31
internal/qos/metrics.go Normal file
View file

@ -0,0 +1,31 @@
package qos
import "sync/atomic"
type Metrics interface {
SetOperationTagCounters(shardID, operation, tag string, pending, inProgress, completed, resourceExhausted uint64)
Close(shardID string)
}
var _ Metrics = (*noopMetrics)(nil)
type noopMetrics struct{}
func (n *noopMetrics) SetOperationTagCounters(string, string, string, uint64, uint64, uint64, uint64) {
}
func (n *noopMetrics) Close(string) {}
// stat presents limiter statistics cumulative counters.
//
// Each operation changes its status as follows: `pending` -> `in_progress` -> `completed` or `resource_exhausted`.
type stat struct {
completed atomic.Uint64
pending atomic.Uint64
resourceExhausted atomic.Uint64
inProgress atomic.Uint64
}
type metricsHolder struct {
metrics Metrics
}

28
internal/qos/stats.go Normal file
View file

@ -0,0 +1,28 @@
package qos
const unknownStatsTag = "unknown"
var statTags = map[string]struct{}{
IOTagClient.String(): {},
IOTagBackground.String(): {},
IOTagInternal.String(): {},
IOTagPolicer.String(): {},
IOTagWritecache.String(): {},
IOTagCritical.String(): {},
unknownStatsTag: {},
}
func createStats() map[string]*stat {
result := make(map[string]*stat)
for tag := range statTags {
result[tag] = &stat{}
}
return result
}
func getStat(tag string, stats map[string]*stat) *stat {
if v, ok := stats[tag]; ok {
return v
}
return stats[unknownStatsTag]
}

View file

@ -90,12 +90,3 @@ func float64Value(f *float64) float64 {
} }
return *f return *f
} }
func isNoop(read, write limits.OpConfig) bool {
return read.MaxRunningOps == limits.NoLimit &&
read.MaxWaitingOps == limits.NoLimit &&
write.MaxRunningOps == limits.NoLimit &&
write.MaxWaitingOps == limits.NoLimit &&
len(read.Tags) == 0 &&
len(write.Tags) == 0
}

View file

@ -163,6 +163,8 @@ type testQoSLimiter struct {
write atomic.Int64 write atomic.Int64
} }
func (t *testQoSLimiter) SetMetrics(qos.Metrics) {}
func (t *testQoSLimiter) Close() { func (t *testQoSLimiter) Close() {
require.Equal(t.t, int64(0), t.read.Load(), "read requests count after limiter close must be 0") require.Equal(t.t, int64(0), t.read.Load(), "read requests count after limiter close must be 0")
require.Equal(t.t, int64(0), t.write.Load(), "write requests count after limiter close must be 0") require.Equal(t.t, int64(0), t.write.Load(), "write requests count after limiter close must be 0")
@ -177,3 +179,5 @@ func (t *testQoSLimiter) WriteRequest(context.Context) (qos.ReleaseFunc, error)
t.write.Add(1) t.write.Add(1)
return func() { t.write.Add(-1) }, nil return func() { t.write.Add(-1) }, nil
} }
func (t *testQoSLimiter) SetParentID(string) {}

View file

@ -61,6 +61,7 @@ func (s *Shard) UpdateID(ctx context.Context) (err error) {
if s.pilorama != nil { if s.pilorama != nil {
s.pilorama.SetParentID(s.info.ID.String()) s.pilorama.SetParentID(s.info.ID.String())
} }
s.opsLimiter.SetParentID(s.info.ID.String())
if len(idFromMetabase) == 0 && !modeDegraded { if len(idFromMetabase) == 0 && !modeDegraded {
if setErr := s.metaBase.SetShardID(ctx, *s.info.ID, s.GetMode()); setErr != nil { if setErr := s.metaBase.SetShardID(ctx, *s.info.ID, s.GetMode()); setErr != nil {