IO tag metrics #1653

Merged
dstepanov-yadro merged 4 commits from dstepanov-yadro/frostfs-node:feat/io_tag_metrics into master 2025-03-11 10:57:48 +00:00
18 changed files with 285 additions and 38 deletions

View file

@ -1048,6 +1048,7 @@ func (c *cfg) getShardOpts(ctx context.Context, shCfg shardCfg) shardOptsWithID
} }
if c.metricsCollector != nil { if c.metricsCollector != nil {
mbOptions = append(mbOptions, meta.WithMetrics(lsmetrics.NewMetabaseMetrics(shCfg.metaCfg.path, c.metricsCollector.MetabaseMetrics()))) mbOptions = append(mbOptions, meta.WithMetrics(lsmetrics.NewMetabaseMetrics(shCfg.metaCfg.path, c.metricsCollector.MetabaseMetrics())))
shCfg.limiter.SetMetrics(c.metricsCollector.QoSMetrics())
} }
var sh shardOptsWithID var sh shardOptsWithID

View file

@ -47,7 +47,7 @@ func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublic
} }
ioTag, err := qos.FromRawString(rawTag) ioTag, err := qos.FromRawString(rawTag)
if err != nil { if err != nil {
s.logger.Warn(ctx, logs.FailedToParseIncomingIOTag, zap.Error(err)) s.logger.Debug(ctx, logs.FailedToParseIncomingIOTag, zap.Error(err))
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String()) return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
} }
@ -70,6 +70,7 @@ func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublic
return ctx return ctx
} }
} }
s.logger.Debug(ctx, logs.FailedToValidateIncomingIOTag)
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String()) return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
case qos.IOTagInternal: case qos.IOTagInternal:
for _, pk := range s.allowedInternalPubs { for _, pk := range s.allowedInternalPubs {
@ -87,9 +88,10 @@ func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublic
return ctx return ctx
} }
} }
s.logger.Debug(ctx, logs.FailedToValidateIncomingIOTag)
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String()) return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
default: default:
s.logger.Warn(ctx, logs.NotSupportedIncomingIOTagReplacedWithClient, zap.Stringer("io_tag", ioTag)) s.logger.Debug(ctx, logs.NotSupportedIncomingIOTagReplacedWithClient, zap.Stringer("io_tag", ioTag))
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String()) return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
} }
} }

View file

@ -513,4 +513,5 @@ const (
FailedToParseIncomingIOTag = "failed to parse incoming IO tag" FailedToParseIncomingIOTag = "failed to parse incoming IO tag"
NotSupportedIncomingIOTagReplacedWithClient = "incoming IO tag is not supported, replaced with `client`" NotSupportedIncomingIOTagReplacedWithClient = "incoming IO tag is not supported, replaced with `client`"
FailedToGetNetmapToAdjustIOTag = "failed to get netmap to adjust IO tag, replaced with `client`" FailedToGetNetmapToAdjustIOTag = "failed to get netmap to adjust IO tag, replaced with `client`"
FailedToValidateIncomingIOTag = "failed to validate incoming IO tag, replaced with `client`"
) )

View file

@ -23,6 +23,7 @@ const (
policerSubsystem = "policer" policerSubsystem = "policer"
commonCacheSubsystem = "common_cache" commonCacheSubsystem = "common_cache"
multinetSubsystem = "multinet" multinetSubsystem = "multinet"
qosSubsystem = "qos"
successLabel = "success" successLabel = "success"
shardIDLabel = "shard_id" shardIDLabel = "shard_id"
@ -43,6 +44,7 @@ const (
hitLabel = "hit" hitLabel = "hit"
cacheLabel = "cache" cacheLabel = "cache"
sourceIPLabel = "source_ip" sourceIPLabel = "source_ip"
ioTagLabel = "io_tag"
readWriteMode = "READ_WRITE" readWriteMode = "READ_WRITE"
readOnlyMode = "READ_ONLY" readOnlyMode = "READ_ONLY"

View file

@ -26,6 +26,7 @@ type NodeMetrics struct {
morphCache *morphCacheMetrics morphCache *morphCacheMetrics
log logger.LogMetrics log logger.LogMetrics
multinet *multinetMetrics multinet *multinetMetrics
qos *QoSMetrics
// nolint: unused // nolint: unused
appInfo *ApplicationInfo appInfo *ApplicationInfo
} }
@ -55,6 +56,7 @@ func NewNodeMetrics() *NodeMetrics {
log: logger.NewLogMetrics(namespace), log: logger.NewLogMetrics(namespace),
appInfo: NewApplicationInfo(misc.Version), appInfo: NewApplicationInfo(misc.Version),
multinet: newMultinetMetrics(namespace), multinet: newMultinetMetrics(namespace),
qos: newQoSMetrics(),
} }
} }
@ -126,3 +128,7 @@ func (m *NodeMetrics) LogMetrics() logger.LogMetrics {
func (m *NodeMetrics) MultinetMetrics() MultinetMetrics { func (m *NodeMetrics) MultinetMetrics() MultinetMetrics {
return m.multinet return m.multinet
} }
func (m *NodeMetrics) QoSMetrics() *QoSMetrics {
return m.qos
}

View file

@ -9,13 +9,14 @@ import (
) )
type ObjectServiceMetrics interface { type ObjectServiceMetrics interface {
AddRequestDuration(method string, d time.Duration, success bool) AddRequestDuration(method string, d time.Duration, success bool, ioTag string)
AddPayloadSize(method string, size int) AddPayloadSize(method string, size int)
} }
type objectServiceMetrics struct { type objectServiceMetrics struct {
methodDuration *prometheus.HistogramVec methodDuration *prometheus.HistogramVec
payloadCounter *prometheus.CounterVec payloadCounter *prometheus.CounterVec
ioTagOpsCounter *prometheus.CounterVec
} }
func newObjectServiceMetrics() *objectServiceMetrics { func newObjectServiceMetrics() *objectServiceMetrics {
@ -32,14 +33,24 @@ func newObjectServiceMetrics() *objectServiceMetrics {
Name: "request_payload_bytes", Name: "request_payload_bytes",
Help: "Object Service request payload", Help: "Object Service request payload",
}, []string{methodLabel}), }, []string{methodLabel}),
ioTagOpsCounter: metrics.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: objectSubsystem,
Name: "requests_total",
Help: "Count of requests for each IO tag",
}, []string{methodLabel, ioTagLabel}),
} }
} }
func (m *objectServiceMetrics) AddRequestDuration(method string, d time.Duration, success bool) { func (m *objectServiceMetrics) AddRequestDuration(method string, d time.Duration, success bool, ioTag string) {
m.methodDuration.With(prometheus.Labels{ m.methodDuration.With(prometheus.Labels{
methodLabel: method, methodLabel: method,
successLabel: strconv.FormatBool(success), successLabel: strconv.FormatBool(success),
}).Observe(d.Seconds()) }).Observe(d.Seconds())
m.ioTagOpsCounter.With(prometheus.Labels{
ioTagLabel: ioTag,
methodLabel: method,
}).Inc()
} }
func (m *objectServiceMetrics) AddPayloadSize(method string, size int) { func (m *objectServiceMetrics) AddPayloadSize(method string, size int) {

52
internal/metrics/qos.go Normal file
View file

@ -0,0 +1,52 @@
package metrics
import (
"git.frostfs.info/TrueCloudLab/frostfs-observability/metrics"
"github.com/prometheus/client_golang/prometheus"
)
type QoSMetrics struct {
opsCounter *prometheus.GaugeVec
}
func newQoSMetrics() *QoSMetrics {
return &QoSMetrics{
opsCounter: metrics.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: qosSubsystem,
Name: "operations_total",
Help: "Count of pending, in progree, completed and failed due of resource exhausted error operations for each shard",
}, []string{shardIDLabel, operationLabel, ioTagLabel, typeLabel}),
}
}
func (m *QoSMetrics) SetOperationTagCounters(shardID, operation, tag string, pending, inProgress, completed, resourceExhausted uint64) {
m.opsCounter.With(prometheus.Labels{
shardIDLabel: shardID,
operationLabel: operation,
ioTagLabel: tag,
typeLabel: "pending",
}).Set(float64(pending))
m.opsCounter.With(prometheus.Labels{
shardIDLabel: shardID,
operationLabel: operation,
ioTagLabel: tag,
typeLabel: "in_progress",
}).Set(float64(inProgress))
m.opsCounter.With(prometheus.Labels{
shardIDLabel: shardID,
operationLabel: operation,
ioTagLabel: tag,
typeLabel: "completed",
}).Set(float64(completed))
m.opsCounter.With(prometheus.Labels{
shardIDLabel: shardID,
operationLabel: operation,
ioTagLabel: tag,
typeLabel: "resource_exhausted",
}).Set(float64(resourceExhausted))
}
func (m *QoSMetrics) Close(shardID string) {
m.opsCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
}

View file

@ -12,12 +12,14 @@ type TreeMetricsRegister interface {
AddReplicateTaskDuration(time.Duration, bool) AddReplicateTaskDuration(time.Duration, bool)
AddReplicateWaitDuration(time.Duration, bool) AddReplicateWaitDuration(time.Duration, bool)
AddSyncDuration(time.Duration, bool) AddSyncDuration(time.Duration, bool)
AddOperation(string, string)
} }
type treeServiceMetrics struct { type treeServiceMetrics struct {
replicateTaskDuration *prometheus.HistogramVec replicateTaskDuration *prometheus.HistogramVec
replicateWaitDuration *prometheus.HistogramVec replicateWaitDuration *prometheus.HistogramVec
syncOpDuration *prometheus.HistogramVec syncOpDuration *prometheus.HistogramVec
ioTagOpsCounter *prometheus.CounterVec
} }
var _ TreeMetricsRegister = (*treeServiceMetrics)(nil) var _ TreeMetricsRegister = (*treeServiceMetrics)(nil)
@ -42,6 +44,12 @@ func newTreeServiceMetrics() *treeServiceMetrics {
Name: "sync_duration_seconds", Name: "sync_duration_seconds",
Help: "Duration of synchronization operations", Help: "Duration of synchronization operations",
}, []string{successLabel}), }, []string{successLabel}),
ioTagOpsCounter: metrics.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: treeServiceSubsystem,
Name: "requests_total",
Help: "Count of requests for each IO tag",
}, []string{methodLabel, ioTagLabel}),
} }
} }
@ -62,3 +70,10 @@ func (m *treeServiceMetrics) AddSyncDuration(d time.Duration, success bool) {
successLabel: strconv.FormatBool(success), successLabel: strconv.FormatBool(success),
}).Observe(d.Seconds()) }).Observe(d.Seconds())
} }
func (m *treeServiceMetrics) AddOperation(op string, ioTag string) {
m.ioTagOpsCounter.With(prometheus.Labels{
ioTagLabel: ioTag,
methodLabel: op,
}).Inc()
}

View file

@ -4,6 +4,8 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"sync"
"sync/atomic"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits"
@ -15,6 +17,9 @@ import (
const ( const (
defaultIdleTimeout time.Duration = 0 defaultIdleTimeout time.Duration = 0
defaultShare float64 = 1.0 defaultShare float64 = 1.0
minusOne = ^uint64(0)
defaultMetricsCollectTimeout = 5 * time.Second
) )
type ReleaseFunc scheduling.ReleaseFunc type ReleaseFunc scheduling.ReleaseFunc
@ -22,6 +27,8 @@ type ReleaseFunc scheduling.ReleaseFunc
type Limiter interface { type Limiter interface {
ReadRequest(context.Context) (ReleaseFunc, error) ReadRequest(context.Context) (ReleaseFunc, error)
WriteRequest(context.Context) (ReleaseFunc, error) WriteRequest(context.Context) (ReleaseFunc, error)
SetParentID(string)
SetMetrics(Metrics)
Close() Close()
} }
@ -34,10 +41,6 @@ func NewLimiter(c *limits.Config) (Limiter, error) {
if err := validateConfig(c); err != nil { if err := validateConfig(c); err != nil {
return nil, err return nil, err
} }
read, write := c.Read(), c.Write()
fyrchik marked this conversation as resolved Outdated

This belongs to the commit [#1653] qos: Add metrics
Could you explain why have you removed there lines?

This belongs to the commit `[#1653] qos: Add metrics` Could you explain why have you removed there lines?

After this commit limiter must be able to report metrics. But noop limiter is just stub. So not to add extra code for noop limiter I just dropped this code.

After this commit limiter must be able to report metrics. But noop limiter is just stub. So not to add extra code for noop limiter I just dropped this code.
if isNoop(read, write) {
return noopLimiterInstance, nil
}
readScheduler, err := createScheduler(c.Read()) readScheduler, err := createScheduler(c.Read())
if err != nil { if err != nil {
return nil, fmt.Errorf("create read scheduler: %w", err) return nil, fmt.Errorf("create read scheduler: %w", err)
@ -46,10 +49,18 @@ func NewLimiter(c *limits.Config) (Limiter, error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("create write scheduler: %w", err) return nil, fmt.Errorf("create write scheduler: %w", err)
} }
return &mClockLimiter{ l := &mClockLimiter{
readScheduler: readScheduler, readScheduler: readScheduler,
writeScheduler: writeScheduler, writeScheduler: writeScheduler,
}, nil closeCh: make(chan struct{}),
wg: &sync.WaitGroup{},
readStats: createStats(),
writeStats: createStats(),
}
l.shardID.Store(&shardID{})
l.metrics.Store(&metricsHolder{metrics: &noopMetrics{}})
l.startMetricsCollect()
return l, nil
} }
func createScheduler(config limits.OpConfig) (scheduler, error) { func createScheduler(config limits.OpConfig) (scheduler, error) {
@ -91,7 +102,7 @@ var (
) )
func NewNoopLimiter() Limiter { func NewNoopLimiter() Limiter {
return &noopLimiter{} return noopLimiterInstance
} }
type noopLimiter struct{} type noopLimiter struct{}
@ -104,43 +115,109 @@ func (n *noopLimiter) WriteRequest(context.Context) (ReleaseFunc, error) {
return releaseStub, nil return releaseStub, nil
} }
func (n *noopLimiter) SetParentID(string) {}
func (n *noopLimiter) Close() {} func (n *noopLimiter) Close() {}
func (n *noopLimiter) SetMetrics(Metrics) {}
var _ Limiter = (*mClockLimiter)(nil) var _ Limiter = (*mClockLimiter)(nil)
type shardID struct {
id string
}
type mClockLimiter struct { type mClockLimiter struct {
readScheduler scheduler readScheduler scheduler
writeScheduler scheduler writeScheduler scheduler
fyrchik marked this conversation as resolved Outdated

This will contain a pointer to string anyway (implicitly, when setting interface value to string).
Why have you decided to use atomic.Value instead of atomic.Pointer?

This will contain a pointer to string anyway (implicitly, when setting interface value to string). Why have you decided to use `atomic.Value` instead of `atomic.Pointer`?

I scare about pointer dereference. But ok, replaced with atomic.Pointer[shardID]

I scare about pointer dereference. But ok, replaced with `atomic.Pointer[shardID]`
readStats map[string]*stat
writeStats map[string]*stat
shardID atomic.Pointer[shardID]
metrics atomic.Pointer[metricsHolder]
closeCh chan struct{}
wg *sync.WaitGroup
} }
func (n *mClockLimiter) ReadRequest(ctx context.Context) (ReleaseFunc, error) { func (n *mClockLimiter) ReadRequest(ctx context.Context) (ReleaseFunc, error) {
return requestArrival(ctx, n.readScheduler) return requestArrival(ctx, n.readScheduler, n.readStats)
} }
func (n *mClockLimiter) WriteRequest(ctx context.Context) (ReleaseFunc, error) { func (n *mClockLimiter) WriteRequest(ctx context.Context) (ReleaseFunc, error) {
return requestArrival(ctx, n.writeScheduler) return requestArrival(ctx, n.writeScheduler, n.writeStats)
} }
func requestArrival(ctx context.Context, s scheduler) (ReleaseFunc, error) { func requestArrival(ctx context.Context, s scheduler, stats map[string]*stat) (ReleaseFunc, error) {
tag, ok := tagging.IOTagFromContext(ctx) tag, ok := tagging.IOTagFromContext(ctx)
if !ok { if !ok {
tag = IOTagClient.String() tag = IOTagClient.String()
} }
stat := getStat(tag, stats)
stat.pending.Add(1)
if tag == IOTagCritical.String() { if tag == IOTagCritical.String() {
return releaseStub, nil stat.inProgress.Add(1)
return func() {
stat.completed.Add(1)
}, nil
} }
rel, err := s.RequestArrival(ctx, tag) rel, err := s.RequestArrival(ctx, tag)
stat.inProgress.Add(1)
if err != nil { if err != nil {
if errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) || if errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) ||
errors.Is(err, errSemaphoreLimitExceeded) { errors.Is(err, errSemaphoreLimitExceeded) {
stat.resourceExhausted.Add(1)
return nil, &apistatus.ResourceExhausted{} return nil, &apistatus.ResourceExhausted{}
} }
stat.completed.Add(1)
fyrchik marked this conversation as resolved Outdated

It is important that stat.completed.Add() is called before rel?
The way I see it is that rel will release resources associated with the request, and we should "complete" request only afterwards. But OK, if it was delibarate.

It is important that `stat.completed.Add()` is called before `rel`? The way I see it is that `rel` will release resources associated with the request, and we should "complete" request only afterwards. But OK, if it was delibarate.

Fixed

Fixed
return nil, err return nil, err
} }
return ReleaseFunc(rel), nil return func() {
rel()
stat.completed.Add(1)
}, nil
} }
func (n *mClockLimiter) Close() { func (n *mClockLimiter) Close() {
n.readScheduler.Close() n.readScheduler.Close()
n.writeScheduler.Close() n.writeScheduler.Close()
close(n.closeCh)
n.wg.Wait()
n.metrics.Load().metrics.Close(n.shardID.Load().id)
}
func (n *mClockLimiter) SetParentID(parentID string) {
n.shardID.Store(&shardID{id: parentID})
}
func (n *mClockLimiter) SetMetrics(m Metrics) {
n.metrics.Store(&metricsHolder{metrics: m})
}
func (n *mClockLimiter) startMetricsCollect() {
n.wg.Add(1)
go func() {
defer n.wg.Done()
ticker := time.NewTicker(defaultMetricsCollectTimeout)
defer ticker.Stop()
for {
select {
case <-n.closeCh:
return
case <-ticker.C:
shardID := n.shardID.Load().id
if shardID == "" {
continue
}
metrics := n.metrics.Load().metrics
for tag, s := range n.readStats {
metrics.SetOperationTagCounters(shardID, "read", tag, s.pending.Load(), s.inProgress.Load(), s.completed.Load(), s.resourceExhausted.Load())
}
for tag, s := range n.writeStats {
metrics.SetOperationTagCounters(shardID, "write", tag, s.pending.Load(), s.inProgress.Load(), s.completed.Load(), s.resourceExhausted.Load())
}
}
}
}()
} }

31
internal/qos/metrics.go Normal file
View file

@ -0,0 +1,31 @@
package qos
import "sync/atomic"
type Metrics interface {
SetOperationTagCounters(shardID, operation, tag string, pending, inProgress, completed, resourceExhausted uint64)
Close(shardID string)
}
var _ Metrics = (*noopMetrics)(nil)
type noopMetrics struct{}
func (n *noopMetrics) SetOperationTagCounters(string, string, string, uint64, uint64, uint64, uint64) {
}
func (n *noopMetrics) Close(string) {}
// stat presents limiter statistics cumulative counters.
//
// Each operation changes its status as follows: `pending` -> `in_progress` -> `completed` or `resource_exhausted`.

So these metrics do not reflect an instantaneous snapshot, but rather an accumulated value?
It should be noted somewhere, because when I see metric{type="in_progress"}=123, my natural interpretation is "how much requests are currently being handled".
Maybe we could export in_progress - pending instead (without changing this struct)?

So these metrics do not reflect an instantaneous snapshot, but rather an accumulated value? It should be noted somewhere, because when I see `metric{type="in_progress"}=123`, my natural interpretation is "how much requests are currently being handled". Maybe we could export `in_progress - pending` instead (without changing this struct)?

Changed a little struct description.
complatedTotal might be clearer, but it looks cumbersome.
Prometheus metrics have _total suffix like others accumulated counters.
in_progress is also of interest, for example, to check the weights of tags.

Changed a little struct description. `complatedTotal` might be clearer, but it looks cumbersome. Prometheus metrics have `_total` suffix like others accumulated counters. `in_progress` is also of interest, for example, to check the weights of tags.
type stat struct {
completed atomic.Uint64
pending atomic.Uint64
resourceExhausted atomic.Uint64
inProgress atomic.Uint64
}
type metricsHolder struct {
metrics Metrics
}

28
internal/qos/stats.go Normal file
View file

@ -0,0 +1,28 @@
package qos
const unknownStatsTag = "unknown"
var statTags = map[string]struct{}{
IOTagClient.String(): {},
IOTagBackground.String(): {},
IOTagInternal.String(): {},
IOTagPolicer.String(): {},
IOTagWritecache.String(): {},
IOTagCritical.String(): {},
unknownStatsTag: {},
}
func createStats() map[string]*stat {
result := make(map[string]*stat)
for tag := range statTags {
result[tag] = &stat{}
}
return result
}
func getStat(tag string, stats map[string]*stat) *stat {
if v, ok := stats[tag]; ok {
return v
fyrchik marked this conversation as resolved Outdated

Is there any reason you use statTags map here instead of stats?
The first returned value could then be returned immediately without additional map access.

Is there any reason you use `statTags` map here instead of `stats`? The first returned value could then be returned immediately without additional map access.

No, as they must contain the same keys. Replaced with stats.

No, as they must contain the same keys. Replaced with `stats`.
}
return stats[unknownStatsTag]
}

How about change to this:

	if _, ok := stats[tag]; ok {
		return stats[tag]
	}
	return stats[unknownStatsTag]
How about change to this: ``` if _, ok := stats[tag]; ok { return stats[tag] } return stats[unknownStatsTag] ```

fixed

fixed

View file

@ -1,6 +1,11 @@
package qos package qos
import "fmt" import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
)
type IOTag string type IOTag string
@ -37,3 +42,11 @@ func FromRawString(s string) (IOTag, error) {
func (t IOTag) String() string { func (t IOTag) String() string {
return string(t) return string(t)
} }
func IOTagFromContext(ctx context.Context) string {
tag, ok := tagging.IOTagFromContext(ctx)
if !ok {
tag = "undefined"
fyrchik marked this conversation as resolved Outdated

There is undefined here, and unknown in internal/qos/stats.go.
How do these strings relate and why do they differ?

There is `undefined` here, and `unknown` in `internal/qos/stats.go`. How do these strings relate and why do they differ?

This method used to get tag from context to report metrics. If current context doesn't contain any tag, then undefined placeholder used.

In internal/qos/stats.go before getting tag stats:

	tag, ok := tagging.IOTagFromContext(ctx)
	if !ok {
		tag = IOTagClient.String()
	}

This is because shard assumes that request without tag is client request.
So when when getting tag stat, tag must be defined. But it could be unknown in case of some minor bugs I think. mclock scheduler will return error in this case, but for metrics it is unknown tag.

This method used to get tag from context to report metrics. If current context doesn't contain any tag, then `undefined` placeholder used. In `internal/qos/stats.go` before getting tag stats: ``` tag, ok := tagging.IOTagFromContext(ctx) if !ok { tag = IOTagClient.String() } ``` This is because shard assumes that request without tag is `client` request. So when when getting tag stat, tag must be defined. But it could be unknown in case of some minor bugs I think. `mclock` scheduler will return error in this case, but for metrics it is `unknown` tag.

Also object service and tree service first adjust IO tags and then report metrics, so there must not be unknown tags.

Also object service and tree service first adjust IO tags and then report metrics, so there must not be `unknown` tags.
}
return tag
}

View file

@ -90,12 +90,3 @@ func float64Value(f *float64) float64 {
} }
return *f return *f
} }
func isNoop(read, write limits.OpConfig) bool {
return read.MaxRunningOps == limits.NoLimit &&
read.MaxWaitingOps == limits.NoLimit &&
write.MaxRunningOps == limits.NoLimit &&
write.MaxWaitingOps == limits.NoLimit &&
len(read.Tags) == 0 &&
len(write.Tags) == 0
}

View file

@ -163,6 +163,8 @@ type testQoSLimiter struct {
write atomic.Int64 write atomic.Int64
} }
func (t *testQoSLimiter) SetMetrics(qos.Metrics) {}
func (t *testQoSLimiter) Close() { func (t *testQoSLimiter) Close() {
require.Equal(t.t, int64(0), t.read.Load(), "read requests count after limiter close must be 0") require.Equal(t.t, int64(0), t.read.Load(), "read requests count after limiter close must be 0")
require.Equal(t.t, int64(0), t.write.Load(), "write requests count after limiter close must be 0") require.Equal(t.t, int64(0), t.write.Load(), "write requests count after limiter close must be 0")
@ -177,3 +179,5 @@ func (t *testQoSLimiter) WriteRequest(context.Context) (qos.ReleaseFunc, error)
t.write.Add(1) t.write.Add(1)
return func() { t.write.Add(-1) }, nil return func() { t.write.Add(-1) }, nil
} }
func (t *testQoSLimiter) SetParentID(string) {}

View file

@ -61,6 +61,7 @@ func (s *Shard) UpdateID(ctx context.Context) (err error) {
if s.pilorama != nil { if s.pilorama != nil {
s.pilorama.SetParentID(s.info.ID.String()) s.pilorama.SetParentID(s.info.ID.String())
} }
s.opsLimiter.SetParentID(s.info.ID.String())
if len(idFromMetabase) == 0 && !modeDegraded { if len(idFromMetabase) == 0 && !modeDegraded {
if setErr := s.metaBase.SetShardID(ctx, *s.info.ID, s.GetMode()); setErr != nil { if setErr := s.metaBase.SetShardID(ctx, *s.info.ID, s.GetMode()); setErr != nil {

View file

@ -4,6 +4,7 @@ import (
"context" "context"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
) )
@ -34,7 +35,7 @@ type (
} }
MetricRegister interface { MetricRegister interface {
AddRequestDuration(string, time.Duration, bool) AddRequestDuration(string, time.Duration, bool, string)
AddPayloadSize(string, int) AddPayloadSize(string, int)
} }
) )
@ -51,7 +52,7 @@ func (m MetricCollector) Get(req *object.GetRequest, stream GetObjectStream) (er
if m.enabled { if m.enabled {
t := time.Now() t := time.Now()
defer func() { defer func() {
m.metrics.AddRequestDuration("Get", time.Since(t), err == nil) m.metrics.AddRequestDuration("Get", time.Since(t), err == nil, qos.IOTagFromContext(stream.Context()))
}() }()
err = m.next.Get(req, &getStreamMetric{ err = m.next.Get(req, &getStreamMetric{
ServerStream: stream, ServerStream: stream,
@ -106,7 +107,7 @@ func (m MetricCollector) PutSingle(ctx context.Context, request *object.PutSingl
res, err := m.next.PutSingle(ctx, request) res, err := m.next.PutSingle(ctx, request)
m.metrics.AddRequestDuration("PutSingle", time.Since(t), err == nil) m.metrics.AddRequestDuration("PutSingle", time.Since(t), err == nil, qos.IOTagFromContext(ctx))
if err == nil { if err == nil {
m.metrics.AddPayloadSize("PutSingle", len(request.GetBody().GetObject().GetPayload())) m.metrics.AddPayloadSize("PutSingle", len(request.GetBody().GetObject().GetPayload()))
} }
@ -122,7 +123,7 @@ func (m MetricCollector) Head(ctx context.Context, request *object.HeadRequest)
res, err := m.next.Head(ctx, request) res, err := m.next.Head(ctx, request)
m.metrics.AddRequestDuration("Head", time.Since(t), err == nil) m.metrics.AddRequestDuration("Head", time.Since(t), err == nil, qos.IOTagFromContext(ctx))
return res, err return res, err
} }
@ -135,7 +136,7 @@ func (m MetricCollector) Search(req *object.SearchRequest, stream SearchStream)
err := m.next.Search(req, stream) err := m.next.Search(req, stream)
m.metrics.AddRequestDuration("Search", time.Since(t), err == nil) m.metrics.AddRequestDuration("Search", time.Since(t), err == nil, qos.IOTagFromContext(stream.Context()))
return err return err
} }
@ -148,7 +149,7 @@ func (m MetricCollector) Delete(ctx context.Context, request *object.DeleteReque
res, err := m.next.Delete(ctx, request) res, err := m.next.Delete(ctx, request)
m.metrics.AddRequestDuration("Delete", time.Since(t), err == nil) m.metrics.AddRequestDuration("Delete", time.Since(t), err == nil, qos.IOTagFromContext(ctx))
return res, err return res, err
} }
return m.next.Delete(ctx, request) return m.next.Delete(ctx, request)
@ -160,7 +161,7 @@ func (m MetricCollector) GetRange(req *object.GetRangeRequest, stream GetObjectR
err := m.next.GetRange(req, stream) err := m.next.GetRange(req, stream)
m.metrics.AddRequestDuration("GetRange", time.Since(t), err == nil) m.metrics.AddRequestDuration("GetRange", time.Since(t), err == nil, qos.IOTagFromContext(stream.Context()))
return err return err
} }
@ -173,7 +174,7 @@ func (m MetricCollector) GetRangeHash(ctx context.Context, request *object.GetRa
res, err := m.next.GetRangeHash(ctx, request) res, err := m.next.GetRangeHash(ctx, request)
m.metrics.AddRequestDuration("GetRangeHash", time.Since(t), err == nil) m.metrics.AddRequestDuration("GetRangeHash", time.Since(t), err == nil, qos.IOTagFromContext(ctx))
return res, err return res, err
} }
@ -209,7 +210,7 @@ func (s putStreamMetric) Send(ctx context.Context, req *object.PutRequest) error
func (s putStreamMetric) CloseAndRecv(ctx context.Context) (*object.PutResponse, error) { func (s putStreamMetric) CloseAndRecv(ctx context.Context) (*object.PutResponse, error) {
res, err := s.stream.CloseAndRecv(ctx) res, err := s.stream.CloseAndRecv(ctx)
s.metrics.AddRequestDuration("Put", time.Since(s.start), err == nil) s.metrics.AddRequestDuration("Put", time.Since(s.start), err == nil, qos.IOTagFromContext(ctx))
return res, err return res, err
} }
@ -223,7 +224,7 @@ func (s patchStreamMetric) Send(ctx context.Context, req *object.PatchRequest) e
func (s patchStreamMetric) CloseAndRecv(ctx context.Context) (*object.PatchResponse, error) { func (s patchStreamMetric) CloseAndRecv(ctx context.Context) (*object.PatchResponse, error) {
res, err := s.stream.CloseAndRecv(ctx) res, err := s.stream.CloseAndRecv(ctx)
s.metrics.AddRequestDuration("Patch", time.Since(s.start), err == nil) s.metrics.AddRequestDuration("Patch", time.Since(s.start), err == nil, qos.IOTagFromContext(ctx))
return res, err return res, err
} }

View file

@ -6,6 +6,7 @@ type MetricsRegister interface {
AddReplicateTaskDuration(time.Duration, bool) AddReplicateTaskDuration(time.Duration, bool)
AddReplicateWaitDuration(time.Duration, bool) AddReplicateWaitDuration(time.Duration, bool)
AddSyncDuration(time.Duration, bool) AddSyncDuration(time.Duration, bool)
AddOperation(string, string)
} }
type defaultMetricsRegister struct{} type defaultMetricsRegister struct{}
@ -13,3 +14,4 @@ type defaultMetricsRegister struct{}
func (defaultMetricsRegister) AddReplicateTaskDuration(time.Duration, bool) {} func (defaultMetricsRegister) AddReplicateTaskDuration(time.Duration, bool) {}
func (defaultMetricsRegister) AddReplicateWaitDuration(time.Duration, bool) {} func (defaultMetricsRegister) AddReplicateWaitDuration(time.Duration, bool) {}
func (defaultMetricsRegister) AddSyncDuration(time.Duration, bool) {} func (defaultMetricsRegister) AddSyncDuration(time.Duration, bool) {}
func (defaultMetricsRegister) AddOperation(string, string) {}

View file

@ -105,6 +105,7 @@ func (s *Service) Shutdown() {
} }
func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) { func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) {
defer s.metrics.AddOperation("Add", qos.IOTagFromContext(ctx))
if !s.initialSyncDone.Load() { if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing return nil, ErrAlreadySyncing
} }
@ -148,6 +149,7 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error
} }
func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) { func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) {
defer s.metrics.AddOperation("AddByPath", qos.IOTagFromContext(ctx))
if !s.initialSyncDone.Load() { if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing return nil, ErrAlreadySyncing
} }
@ -203,6 +205,7 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP
} }
func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) { func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) {
defer s.metrics.AddOperation("Remove", qos.IOTagFromContext(ctx))
if !s.initialSyncDone.Load() { if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing return nil, ErrAlreadySyncing
} }
@ -247,6 +250,7 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
// Move applies client operation to the specified tree and pushes in queue // Move applies client operation to the specified tree and pushes in queue
// for replication on other nodes. // for replication on other nodes.
func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) { func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) {
defer s.metrics.AddOperation("Move", qos.IOTagFromContext(ctx))
if !s.initialSyncDone.Load() { if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing return nil, ErrAlreadySyncing
} }
@ -290,6 +294,7 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er
} }
func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) { func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) {
defer s.metrics.AddOperation("GetNodeByPath", qos.IOTagFromContext(ctx))
if !s.initialSyncDone.Load() { if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing return nil, ErrAlreadySyncing
} }
@ -363,6 +368,7 @@ func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest)
} }
func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error { func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error {
defer s.metrics.AddOperation("GetSubTree", qos.IOTagFromContext(srv.Context()))
if !s.initialSyncDone.Load() { if !s.initialSyncDone.Load() {
return ErrAlreadySyncing return ErrAlreadySyncing
} }
@ -590,6 +596,7 @@ func sortByFilename(nodes []pilorama.NodeInfo, d GetSubTreeRequest_Body_Order_Di
// Apply locally applies operation from the remote node to the tree. // Apply locally applies operation from the remote node to the tree.
func (s *Service) Apply(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) { func (s *Service) Apply(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) {
defer s.metrics.AddOperation("Apply", qos.IOTagFromContext(ctx))
err := verifyMessage(req) err := verifyMessage(req)
if err != nil { if err != nil {
return nil, err return nil, err
@ -633,6 +640,7 @@ func (s *Service) Apply(ctx context.Context, req *ApplyRequest) (*ApplyResponse,
} }
func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error { func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {
defer s.metrics.AddOperation("GetOpLog", qos.IOTagFromContext(srv.Context()))
if !s.initialSyncDone.Load() { if !s.initialSyncDone.Load() {
return ErrAlreadySyncing return ErrAlreadySyncing
} }
@ -697,6 +705,7 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer)
} }
func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) { func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) {
defer s.metrics.AddOperation("TreeList", qos.IOTagFromContext(ctx))
if !s.initialSyncDone.Load() { if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing return nil, ErrAlreadySyncing
} }