IO tag metrics #1653
|
@ -1048,6 +1048,7 @@ func (c *cfg) getShardOpts(ctx context.Context, shCfg shardCfg) shardOptsWithID
|
|||
}
|
||||
if c.metricsCollector != nil {
|
||||
mbOptions = append(mbOptions, meta.WithMetrics(lsmetrics.NewMetabaseMetrics(shCfg.metaCfg.path, c.metricsCollector.MetabaseMetrics())))
|
||||
shCfg.limiter.SetMetrics(c.metricsCollector.QoSMetrics())
|
||||
}
|
||||
|
||||
var sh shardOptsWithID
|
||||
|
|
|
@ -47,7 +47,7 @@ func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublic
|
|||
}
|
||||
ioTag, err := qos.FromRawString(rawTag)
|
||||
if err != nil {
|
||||
s.logger.Warn(ctx, logs.FailedToParseIncomingIOTag, zap.Error(err))
|
||||
s.logger.Debug(ctx, logs.FailedToParseIncomingIOTag, zap.Error(err))
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
}
|
||||
|
||||
|
@ -70,6 +70,7 @@ func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublic
|
|||
return ctx
|
||||
}
|
||||
}
|
||||
s.logger.Debug(ctx, logs.FailedToValidateIncomingIOTag)
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
case qos.IOTagInternal:
|
||||
for _, pk := range s.allowedInternalPubs {
|
||||
|
@ -87,9 +88,10 @@ func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublic
|
|||
return ctx
|
||||
}
|
||||
}
|
||||
s.logger.Debug(ctx, logs.FailedToValidateIncomingIOTag)
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
default:
|
||||
s.logger.Warn(ctx, logs.NotSupportedIncomingIOTagReplacedWithClient, zap.Stringer("io_tag", ioTag))
|
||||
s.logger.Debug(ctx, logs.NotSupportedIncomingIOTagReplacedWithClient, zap.Stringer("io_tag", ioTag))
|
||||
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -513,4 +513,5 @@ const (
|
|||
FailedToParseIncomingIOTag = "failed to parse incoming IO tag"
|
||||
NotSupportedIncomingIOTagReplacedWithClient = "incoming IO tag is not supported, replaced with `client`"
|
||||
FailedToGetNetmapToAdjustIOTag = "failed to get netmap to adjust IO tag, replaced with `client`"
|
||||
FailedToValidateIncomingIOTag = "failed to validate incoming IO tag, replaced with `client`"
|
||||
)
|
||||
|
|
|
@ -23,6 +23,7 @@ const (
|
|||
policerSubsystem = "policer"
|
||||
commonCacheSubsystem = "common_cache"
|
||||
multinetSubsystem = "multinet"
|
||||
qosSubsystem = "qos"
|
||||
|
||||
successLabel = "success"
|
||||
shardIDLabel = "shard_id"
|
||||
|
@ -43,6 +44,7 @@ const (
|
|||
hitLabel = "hit"
|
||||
cacheLabel = "cache"
|
||||
sourceIPLabel = "source_ip"
|
||||
ioTagLabel = "io_tag"
|
||||
|
||||
readWriteMode = "READ_WRITE"
|
||||
readOnlyMode = "READ_ONLY"
|
||||
|
|
|
@ -26,6 +26,7 @@ type NodeMetrics struct {
|
|||
morphCache *morphCacheMetrics
|
||||
log logger.LogMetrics
|
||||
multinet *multinetMetrics
|
||||
qos *QoSMetrics
|
||||
// nolint: unused
|
||||
appInfo *ApplicationInfo
|
||||
}
|
||||
|
@ -55,6 +56,7 @@ func NewNodeMetrics() *NodeMetrics {
|
|||
log: logger.NewLogMetrics(namespace),
|
||||
appInfo: NewApplicationInfo(misc.Version),
|
||||
multinet: newMultinetMetrics(namespace),
|
||||
qos: newQoSMetrics(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -126,3 +128,7 @@ func (m *NodeMetrics) LogMetrics() logger.LogMetrics {
|
|||
func (m *NodeMetrics) MultinetMetrics() MultinetMetrics {
|
||||
return m.multinet
|
||||
}
|
||||
|
||||
func (m *NodeMetrics) QoSMetrics() *QoSMetrics {
|
||||
return m.qos
|
||||
}
|
||||
|
|
|
@ -9,13 +9,14 @@ import (
|
|||
)
|
||||
|
||||
type ObjectServiceMetrics interface {
|
||||
AddRequestDuration(method string, d time.Duration, success bool)
|
||||
AddRequestDuration(method string, d time.Duration, success bool, ioTag string)
|
||||
AddPayloadSize(method string, size int)
|
||||
}
|
||||
|
||||
type objectServiceMetrics struct {
|
||||
methodDuration *prometheus.HistogramVec
|
||||
payloadCounter *prometheus.CounterVec
|
||||
ioTagOpsCounter *prometheus.CounterVec
|
||||
}
|
||||
|
||||
func newObjectServiceMetrics() *objectServiceMetrics {
|
||||
|
@ -32,14 +33,24 @@ func newObjectServiceMetrics() *objectServiceMetrics {
|
|||
Name: "request_payload_bytes",
|
||||
Help: "Object Service request payload",
|
||||
}, []string{methodLabel}),
|
||||
ioTagOpsCounter: metrics.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: objectSubsystem,
|
||||
Name: "requests_total",
|
||||
Help: "Count of requests for each IO tag",
|
||||
}, []string{methodLabel, ioTagLabel}),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *objectServiceMetrics) AddRequestDuration(method string, d time.Duration, success bool) {
|
||||
func (m *objectServiceMetrics) AddRequestDuration(method string, d time.Duration, success bool, ioTag string) {
|
||||
m.methodDuration.With(prometheus.Labels{
|
||||
methodLabel: method,
|
||||
successLabel: strconv.FormatBool(success),
|
||||
}).Observe(d.Seconds())
|
||||
m.ioTagOpsCounter.With(prometheus.Labels{
|
||||
ioTagLabel: ioTag,
|
||||
methodLabel: method,
|
||||
}).Inc()
|
||||
}
|
||||
|
||||
func (m *objectServiceMetrics) AddPayloadSize(method string, size int) {
|
||||
|
|
52
internal/metrics/qos.go
Normal file
|
@ -0,0 +1,52 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/metrics"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
type QoSMetrics struct {
|
||||
opsCounter *prometheus.GaugeVec
|
||||
}
|
||||
|
||||
func newQoSMetrics() *QoSMetrics {
|
||||
return &QoSMetrics{
|
||||
opsCounter: metrics.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: qosSubsystem,
|
||||
Name: "operations_total",
|
||||
Help: "Count of pending, in progree, completed and failed due of resource exhausted error operations for each shard",
|
||||
}, []string{shardIDLabel, operationLabel, ioTagLabel, typeLabel}),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *QoSMetrics) SetOperationTagCounters(shardID, operation, tag string, pending, inProgress, completed, resourceExhausted uint64) {
|
||||
m.opsCounter.With(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
operationLabel: operation,
|
||||
ioTagLabel: tag,
|
||||
typeLabel: "pending",
|
||||
}).Set(float64(pending))
|
||||
m.opsCounter.With(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
operationLabel: operation,
|
||||
ioTagLabel: tag,
|
||||
typeLabel: "in_progress",
|
||||
}).Set(float64(inProgress))
|
||||
m.opsCounter.With(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
operationLabel: operation,
|
||||
ioTagLabel: tag,
|
||||
typeLabel: "completed",
|
||||
}).Set(float64(completed))
|
||||
m.opsCounter.With(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
operationLabel: operation,
|
||||
ioTagLabel: tag,
|
||||
typeLabel: "resource_exhausted",
|
||||
}).Set(float64(resourceExhausted))
|
||||
}
|
||||
|
||||
func (m *QoSMetrics) Close(shardID string) {
|
||||
m.opsCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
|
||||
}
|
|
@ -12,12 +12,14 @@ type TreeMetricsRegister interface {
|
|||
AddReplicateTaskDuration(time.Duration, bool)
|
||||
AddReplicateWaitDuration(time.Duration, bool)
|
||||
AddSyncDuration(time.Duration, bool)
|
||||
AddOperation(string, string)
|
||||
}
|
||||
|
||||
type treeServiceMetrics struct {
|
||||
replicateTaskDuration *prometheus.HistogramVec
|
||||
replicateWaitDuration *prometheus.HistogramVec
|
||||
syncOpDuration *prometheus.HistogramVec
|
||||
ioTagOpsCounter *prometheus.CounterVec
|
||||
}
|
||||
|
||||
var _ TreeMetricsRegister = (*treeServiceMetrics)(nil)
|
||||
|
@ -42,6 +44,12 @@ func newTreeServiceMetrics() *treeServiceMetrics {
|
|||
Name: "sync_duration_seconds",
|
||||
Help: "Duration of synchronization operations",
|
||||
}, []string{successLabel}),
|
||||
ioTagOpsCounter: metrics.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: treeServiceSubsystem,
|
||||
Name: "requests_total",
|
||||
Help: "Count of requests for each IO tag",
|
||||
}, []string{methodLabel, ioTagLabel}),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -62,3 +70,10 @@ func (m *treeServiceMetrics) AddSyncDuration(d time.Duration, success bool) {
|
|||
successLabel: strconv.FormatBool(success),
|
||||
}).Observe(d.Seconds())
|
||||
}
|
||||
|
||||
func (m *treeServiceMetrics) AddOperation(op string, ioTag string) {
|
||||
m.ioTagOpsCounter.With(prometheus.Labels{
|
||||
ioTagLabel: ioTag,
|
||||
methodLabel: op,
|
||||
}).Inc()
|
||||
}
|
||||
|
|
|
@ -4,6 +4,8 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits"
|
||||
|
@ -15,6 +17,9 @@ import (
|
|||
const (
|
||||
defaultIdleTimeout time.Duration = 0
|
||||
defaultShare float64 = 1.0
|
||||
minusOne = ^uint64(0)
|
||||
|
||||
defaultMetricsCollectTimeout = 5 * time.Second
|
||||
)
|
||||
|
||||
type ReleaseFunc scheduling.ReleaseFunc
|
||||
|
@ -22,6 +27,8 @@ type ReleaseFunc scheduling.ReleaseFunc
|
|||
type Limiter interface {
|
||||
ReadRequest(context.Context) (ReleaseFunc, error)
|
||||
WriteRequest(context.Context) (ReleaseFunc, error)
|
||||
SetParentID(string)
|
||||
SetMetrics(Metrics)
|
||||
Close()
|
||||
}
|
||||
|
||||
|
@ -34,10 +41,6 @@ func NewLimiter(c *limits.Config) (Limiter, error) {
|
|||
if err := validateConfig(c); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
read, write := c.Read(), c.Write()
|
||||
fyrchik marked this conversation as resolved
Outdated
|
||||
if isNoop(read, write) {
|
||||
return noopLimiterInstance, nil
|
||||
}
|
||||
readScheduler, err := createScheduler(c.Read())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create read scheduler: %w", err)
|
||||
|
@ -46,10 +49,18 @@ func NewLimiter(c *limits.Config) (Limiter, error) {
|
|||
if err != nil {
|
||||
return nil, fmt.Errorf("create write scheduler: %w", err)
|
||||
}
|
||||
return &mClockLimiter{
|
||||
l := &mClockLimiter{
|
||||
readScheduler: readScheduler,
|
||||
writeScheduler: writeScheduler,
|
||||
}, nil
|
||||
closeCh: make(chan struct{}),
|
||||
wg: &sync.WaitGroup{},
|
||||
readStats: createStats(),
|
||||
writeStats: createStats(),
|
||||
}
|
||||
l.shardID.Store(&shardID{})
|
||||
l.metrics.Store(&metricsHolder{metrics: &noopMetrics{}})
|
||||
l.startMetricsCollect()
|
||||
return l, nil
|
||||
}
|
||||
|
||||
func createScheduler(config limits.OpConfig) (scheduler, error) {
|
||||
|
@ -91,7 +102,7 @@ var (
|
|||
)
|
||||
|
||||
func NewNoopLimiter() Limiter {
|
||||
return &noopLimiter{}
|
||||
return noopLimiterInstance
|
||||
}
|
||||
|
||||
type noopLimiter struct{}
|
||||
|
@ -104,43 +115,109 @@ func (n *noopLimiter) WriteRequest(context.Context) (ReleaseFunc, error) {
|
|||
return releaseStub, nil
|
||||
}
|
||||
|
||||
func (n *noopLimiter) SetParentID(string) {}
|
||||
|
||||
func (n *noopLimiter) Close() {}
|
||||
|
||||
func (n *noopLimiter) SetMetrics(Metrics) {}
|
||||
|
||||
var _ Limiter = (*mClockLimiter)(nil)
|
||||
|
||||
type shardID struct {
|
||||
id string
|
||||
}
|
||||
|
||||
type mClockLimiter struct {
|
||||
readScheduler scheduler
|
||||
writeScheduler scheduler
|
||||
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
This will contain a pointer to string anyway (implicitly, when setting interface value to string). This will contain a pointer to string anyway (implicitly, when setting interface value to string).
Why have you decided to use `atomic.Value` instead of `atomic.Pointer`?
dstepanov-yadro
commented
I scare about pointer dereference. But ok, replaced with I scare about pointer dereference. But ok, replaced with `atomic.Pointer[shardID]`
|
||||
readStats map[string]*stat
|
||||
writeStats map[string]*stat
|
||||
|
||||
shardID atomic.Pointer[shardID]
|
||||
metrics atomic.Pointer[metricsHolder]
|
||||
closeCh chan struct{}
|
||||
wg *sync.WaitGroup
|
||||
}
|
||||
|
||||
func (n *mClockLimiter) ReadRequest(ctx context.Context) (ReleaseFunc, error) {
|
||||
return requestArrival(ctx, n.readScheduler)
|
||||
return requestArrival(ctx, n.readScheduler, n.readStats)
|
||||
}
|
||||
|
||||
func (n *mClockLimiter) WriteRequest(ctx context.Context) (ReleaseFunc, error) {
|
||||
return requestArrival(ctx, n.writeScheduler)
|
||||
return requestArrival(ctx, n.writeScheduler, n.writeStats)
|
||||
}
|
||||
|
||||
func requestArrival(ctx context.Context, s scheduler) (ReleaseFunc, error) {
|
||||
func requestArrival(ctx context.Context, s scheduler, stats map[string]*stat) (ReleaseFunc, error) {
|
||||
tag, ok := tagging.IOTagFromContext(ctx)
|
||||
if !ok {
|
||||
tag = IOTagClient.String()
|
||||
}
|
||||
stat := getStat(tag, stats)
|
||||
stat.pending.Add(1)
|
||||
if tag == IOTagCritical.String() {
|
||||
return releaseStub, nil
|
||||
stat.inProgress.Add(1)
|
||||
return func() {
|
||||
stat.completed.Add(1)
|
||||
}, nil
|
||||
}
|
||||
rel, err := s.RequestArrival(ctx, tag)
|
||||
stat.inProgress.Add(1)
|
||||
if err != nil {
|
||||
if errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) ||
|
||||
errors.Is(err, errSemaphoreLimitExceeded) {
|
||||
stat.resourceExhausted.Add(1)
|
||||
return nil, &apistatus.ResourceExhausted{}
|
||||
}
|
||||
stat.completed.Add(1)
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
It is important that It is important that `stat.completed.Add()` is called before `rel`?
The way I see it is that `rel` will release resources associated with the request, and we should "complete" request only afterwards. But OK, if it was delibarate.
dstepanov-yadro
commented
Fixed Fixed
|
||||
return nil, err
|
||||
}
|
||||
return ReleaseFunc(rel), nil
|
||||
return func() {
|
||||
rel()
|
||||
stat.completed.Add(1)
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (n *mClockLimiter) Close() {
|
||||
n.readScheduler.Close()
|
||||
n.writeScheduler.Close()
|
||||
close(n.closeCh)
|
||||
n.wg.Wait()
|
||||
n.metrics.Load().metrics.Close(n.shardID.Load().id)
|
||||
}
|
||||
|
||||
func (n *mClockLimiter) SetParentID(parentID string) {
|
||||
n.shardID.Store(&shardID{id: parentID})
|
||||
}
|
||||
|
||||
func (n *mClockLimiter) SetMetrics(m Metrics) {
|
||||
n.metrics.Store(&metricsHolder{metrics: m})
|
||||
}
|
||||
|
||||
func (n *mClockLimiter) startMetricsCollect() {
|
||||
n.wg.Add(1)
|
||||
go func() {
|
||||
defer n.wg.Done()
|
||||
|
||||
ticker := time.NewTicker(defaultMetricsCollectTimeout)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-n.closeCh:
|
||||
return
|
||||
case <-ticker.C:
|
||||
shardID := n.shardID.Load().id
|
||||
if shardID == "" {
|
||||
continue
|
||||
}
|
||||
metrics := n.metrics.Load().metrics
|
||||
for tag, s := range n.readStats {
|
||||
metrics.SetOperationTagCounters(shardID, "read", tag, s.pending.Load(), s.inProgress.Load(), s.completed.Load(), s.resourceExhausted.Load())
|
||||
}
|
||||
for tag, s := range n.writeStats {
|
||||
metrics.SetOperationTagCounters(shardID, "write", tag, s.pending.Load(), s.inProgress.Load(), s.completed.Load(), s.resourceExhausted.Load())
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
|
31
internal/qos/metrics.go
Normal file
|
@ -0,0 +1,31 @@
|
|||
package qos
|
||||
|
||||
import "sync/atomic"
|
||||
|
||||
type Metrics interface {
|
||||
SetOperationTagCounters(shardID, operation, tag string, pending, inProgress, completed, resourceExhausted uint64)
|
||||
Close(shardID string)
|
||||
}
|
||||
|
||||
var _ Metrics = (*noopMetrics)(nil)
|
||||
|
||||
type noopMetrics struct{}
|
||||
|
||||
func (n *noopMetrics) SetOperationTagCounters(string, string, string, uint64, uint64, uint64, uint64) {
|
||||
}
|
||||
|
||||
func (n *noopMetrics) Close(string) {}
|
||||
|
||||
// stat presents limiter statistics cumulative counters.
|
||||
//
|
||||
// Each operation changes its status as follows: `pending` -> `in_progress` -> `completed` or `resource_exhausted`.
|
||||
fyrchik
commented
So these metrics do not reflect an instantaneous snapshot, but rather an accumulated value? So these metrics do not reflect an instantaneous snapshot, but rather an accumulated value?
It should be noted somewhere, because when I see `metric{type="in_progress"}=123`, my natural interpretation is "how much requests are currently being handled".
Maybe we could export `in_progress - pending` instead (without changing this struct)?
dstepanov-yadro
commented
Changed a little struct description. Changed a little struct description.
`complatedTotal` might be clearer, but it looks cumbersome.
Prometheus metrics have `_total` suffix like others accumulated counters.
`in_progress` is also of interest, for example, to check the weights of tags.
|
||||
type stat struct {
|
||||
completed atomic.Uint64
|
||||
pending atomic.Uint64
|
||||
resourceExhausted atomic.Uint64
|
||||
inProgress atomic.Uint64
|
||||
}
|
||||
|
||||
type metricsHolder struct {
|
||||
metrics Metrics
|
||||
}
|
28
internal/qos/stats.go
Normal file
|
@ -0,0 +1,28 @@
|
|||
package qos
|
||||
|
||||
const unknownStatsTag = "unknown"
|
||||
|
||||
var statTags = map[string]struct{}{
|
||||
IOTagClient.String(): {},
|
||||
IOTagBackground.String(): {},
|
||||
IOTagInternal.String(): {},
|
||||
IOTagPolicer.String(): {},
|
||||
IOTagWritecache.String(): {},
|
||||
IOTagCritical.String(): {},
|
||||
unknownStatsTag: {},
|
||||
}
|
||||
|
||||
func createStats() map[string]*stat {
|
||||
result := make(map[string]*stat)
|
||||
for tag := range statTags {
|
||||
result[tag] = &stat{}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func getStat(tag string, stats map[string]*stat) *stat {
|
||||
if v, ok := stats[tag]; ok {
|
||||
return v
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Is there any reason you use Is there any reason you use `statTags` map here instead of `stats`?
The first returned value could then be returned immediately without additional map access.
dstepanov-yadro
commented
No, as they must contain the same keys. Replaced with No, as they must contain the same keys. Replaced with `stats`.
|
||||
}
|
||||
return stats[unknownStatsTag]
|
||||
}
|
||||
acid-ant
commented
How about change to this:
How about change to this:
```
if _, ok := stats[tag]; ok {
return stats[tag]
}
return stats[unknownStatsTag]
```
dstepanov-yadro
commented
fixed fixed
|
|
@ -1,6 +1,11 @@
|
|||
package qos
|
||||
|
||||
import "fmt"
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
)
|
||||
|
||||
type IOTag string
|
||||
|
||||
|
@ -37,3 +42,11 @@ func FromRawString(s string) (IOTag, error) {
|
|||
func (t IOTag) String() string {
|
||||
return string(t)
|
||||
}
|
||||
|
||||
func IOTagFromContext(ctx context.Context) string {
|
||||
tag, ok := tagging.IOTagFromContext(ctx)
|
||||
if !ok {
|
||||
tag = "undefined"
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
There is There is `undefined` here, and `unknown` in `internal/qos/stats.go`.
How do these strings relate and why do they differ?
dstepanov-yadro
commented
This method used to get tag from context to report metrics. If current context doesn't contain any tag, then In
This is because shard assumes that request without tag is This method used to get tag from context to report metrics. If current context doesn't contain any tag, then `undefined` placeholder used.
In `internal/qos/stats.go` before getting tag stats:
```
tag, ok := tagging.IOTagFromContext(ctx)
if !ok {
tag = IOTagClient.String()
}
```
This is because shard assumes that request without tag is `client` request.
So when when getting tag stat, tag must be defined. But it could be unknown in case of some minor bugs I think. `mclock` scheduler will return error in this case, but for metrics it is `unknown` tag.
dstepanov-yadro
commented
Also object service and tree service first adjust IO tags and then report metrics, so there must not be Also object service and tree service first adjust IO tags and then report metrics, so there must not be `unknown` tags.
|
||||
}
|
||||
return tag
|
||||
}
|
||||
|
|
|
@ -90,12 +90,3 @@ func float64Value(f *float64) float64 {
|
|||
}
|
||||
return *f
|
||||
}
|
||||
|
||||
func isNoop(read, write limits.OpConfig) bool {
|
||||
return read.MaxRunningOps == limits.NoLimit &&
|
||||
read.MaxWaitingOps == limits.NoLimit &&
|
||||
write.MaxRunningOps == limits.NoLimit &&
|
||||
write.MaxWaitingOps == limits.NoLimit &&
|
||||
len(read.Tags) == 0 &&
|
||||
len(write.Tags) == 0
|
||||
}
|
||||
|
|
|
@ -163,6 +163,8 @@ type testQoSLimiter struct {
|
|||
write atomic.Int64
|
||||
}
|
||||
|
||||
func (t *testQoSLimiter) SetMetrics(qos.Metrics) {}
|
||||
|
||||
func (t *testQoSLimiter) Close() {
|
||||
require.Equal(t.t, int64(0), t.read.Load(), "read requests count after limiter close must be 0")
|
||||
require.Equal(t.t, int64(0), t.write.Load(), "write requests count after limiter close must be 0")
|
||||
|
@ -177,3 +179,5 @@ func (t *testQoSLimiter) WriteRequest(context.Context) (qos.ReleaseFunc, error)
|
|||
t.write.Add(1)
|
||||
return func() { t.write.Add(-1) }, nil
|
||||
}
|
||||
|
||||
func (t *testQoSLimiter) SetParentID(string) {}
|
||||
|
|
|
@ -61,6 +61,7 @@ func (s *Shard) UpdateID(ctx context.Context) (err error) {
|
|||
if s.pilorama != nil {
|
||||
s.pilorama.SetParentID(s.info.ID.String())
|
||||
}
|
||||
s.opsLimiter.SetParentID(s.info.ID.String())
|
||||
|
||||
if len(idFromMetabase) == 0 && !modeDegraded {
|
||||
if setErr := s.metaBase.SetShardID(ctx, *s.info.ID, s.GetMode()); setErr != nil {
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
||||
)
|
||||
|
@ -34,7 +35,7 @@ type (
|
|||
}
|
||||
|
||||
MetricRegister interface {
|
||||
AddRequestDuration(string, time.Duration, bool)
|
||||
AddRequestDuration(string, time.Duration, bool, string)
|
||||
AddPayloadSize(string, int)
|
||||
}
|
||||
)
|
||||
|
@ -51,7 +52,7 @@ func (m MetricCollector) Get(req *object.GetRequest, stream GetObjectStream) (er
|
|||
if m.enabled {
|
||||
t := time.Now()
|
||||
defer func() {
|
||||
m.metrics.AddRequestDuration("Get", time.Since(t), err == nil)
|
||||
m.metrics.AddRequestDuration("Get", time.Since(t), err == nil, qos.IOTagFromContext(stream.Context()))
|
||||
}()
|
||||
err = m.next.Get(req, &getStreamMetric{
|
||||
ServerStream: stream,
|
||||
|
@ -106,7 +107,7 @@ func (m MetricCollector) PutSingle(ctx context.Context, request *object.PutSingl
|
|||
|
||||
res, err := m.next.PutSingle(ctx, request)
|
||||
|
||||
m.metrics.AddRequestDuration("PutSingle", time.Since(t), err == nil)
|
||||
m.metrics.AddRequestDuration("PutSingle", time.Since(t), err == nil, qos.IOTagFromContext(ctx))
|
||||
if err == nil {
|
||||
m.metrics.AddPayloadSize("PutSingle", len(request.GetBody().GetObject().GetPayload()))
|
||||
}
|
||||
|
@ -122,7 +123,7 @@ func (m MetricCollector) Head(ctx context.Context, request *object.HeadRequest)
|
|||
|
||||
res, err := m.next.Head(ctx, request)
|
||||
|
||||
m.metrics.AddRequestDuration("Head", time.Since(t), err == nil)
|
||||
m.metrics.AddRequestDuration("Head", time.Since(t), err == nil, qos.IOTagFromContext(ctx))
|
||||
|
||||
return res, err
|
||||
}
|
||||
|
@ -135,7 +136,7 @@ func (m MetricCollector) Search(req *object.SearchRequest, stream SearchStream)
|
|||
|
||||
err := m.next.Search(req, stream)
|
||||
|
||||
m.metrics.AddRequestDuration("Search", time.Since(t), err == nil)
|
||||
m.metrics.AddRequestDuration("Search", time.Since(t), err == nil, qos.IOTagFromContext(stream.Context()))
|
||||
|
||||
return err
|
||||
}
|
||||
|
@ -148,7 +149,7 @@ func (m MetricCollector) Delete(ctx context.Context, request *object.DeleteReque
|
|||
|
||||
res, err := m.next.Delete(ctx, request)
|
||||
|
||||
m.metrics.AddRequestDuration("Delete", time.Since(t), err == nil)
|
||||
m.metrics.AddRequestDuration("Delete", time.Since(t), err == nil, qos.IOTagFromContext(ctx))
|
||||
return res, err
|
||||
}
|
||||
return m.next.Delete(ctx, request)
|
||||
|
@ -160,7 +161,7 @@ func (m MetricCollector) GetRange(req *object.GetRangeRequest, stream GetObjectR
|
|||
|
||||
err := m.next.GetRange(req, stream)
|
||||
|
||||
m.metrics.AddRequestDuration("GetRange", time.Since(t), err == nil)
|
||||
m.metrics.AddRequestDuration("GetRange", time.Since(t), err == nil, qos.IOTagFromContext(stream.Context()))
|
||||
|
||||
return err
|
||||
}
|
||||
|
@ -173,7 +174,7 @@ func (m MetricCollector) GetRangeHash(ctx context.Context, request *object.GetRa
|
|||
|
||||
res, err := m.next.GetRangeHash(ctx, request)
|
||||
|
||||
m.metrics.AddRequestDuration("GetRangeHash", time.Since(t), err == nil)
|
||||
m.metrics.AddRequestDuration("GetRangeHash", time.Since(t), err == nil, qos.IOTagFromContext(ctx))
|
||||
|
||||
return res, err
|
||||
}
|
||||
|
@ -209,7 +210,7 @@ func (s putStreamMetric) Send(ctx context.Context, req *object.PutRequest) error
|
|||
func (s putStreamMetric) CloseAndRecv(ctx context.Context) (*object.PutResponse, error) {
|
||||
res, err := s.stream.CloseAndRecv(ctx)
|
||||
|
||||
s.metrics.AddRequestDuration("Put", time.Since(s.start), err == nil)
|
||||
s.metrics.AddRequestDuration("Put", time.Since(s.start), err == nil, qos.IOTagFromContext(ctx))
|
||||
|
||||
return res, err
|
||||
}
|
||||
|
@ -223,7 +224,7 @@ func (s patchStreamMetric) Send(ctx context.Context, req *object.PatchRequest) e
|
|||
func (s patchStreamMetric) CloseAndRecv(ctx context.Context) (*object.PatchResponse, error) {
|
||||
res, err := s.stream.CloseAndRecv(ctx)
|
||||
|
||||
s.metrics.AddRequestDuration("Patch", time.Since(s.start), err == nil)
|
||||
s.metrics.AddRequestDuration("Patch", time.Since(s.start), err == nil, qos.IOTagFromContext(ctx))
|
||||
|
||||
return res, err
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ type MetricsRegister interface {
|
|||
AddReplicateTaskDuration(time.Duration, bool)
|
||||
AddReplicateWaitDuration(time.Duration, bool)
|
||||
AddSyncDuration(time.Duration, bool)
|
||||
AddOperation(string, string)
|
||||
}
|
||||
|
||||
type defaultMetricsRegister struct{}
|
||||
|
@ -13,3 +14,4 @@ type defaultMetricsRegister struct{}
|
|||
func (defaultMetricsRegister) AddReplicateTaskDuration(time.Duration, bool) {}
|
||||
func (defaultMetricsRegister) AddReplicateWaitDuration(time.Duration, bool) {}
|
||||
func (defaultMetricsRegister) AddSyncDuration(time.Duration, bool) {}
|
||||
func (defaultMetricsRegister) AddOperation(string, string) {}
|
||||
|
|
|
@ -105,6 +105,7 @@ func (s *Service) Shutdown() {
|
|||
}
|
||||
|
||||
func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) {
|
||||
defer s.metrics.AddOperation("Add", qos.IOTagFromContext(ctx))
|
||||
if !s.initialSyncDone.Load() {
|
||||
return nil, ErrAlreadySyncing
|
||||
}
|
||||
|
@ -148,6 +149,7 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error
|
|||
}
|
||||
|
||||
func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) {
|
||||
defer s.metrics.AddOperation("AddByPath", qos.IOTagFromContext(ctx))
|
||||
if !s.initialSyncDone.Load() {
|
||||
return nil, ErrAlreadySyncing
|
||||
}
|
||||
|
@ -203,6 +205,7 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP
|
|||
}
|
||||
|
||||
func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) {
|
||||
defer s.metrics.AddOperation("Remove", qos.IOTagFromContext(ctx))
|
||||
if !s.initialSyncDone.Load() {
|
||||
return nil, ErrAlreadySyncing
|
||||
}
|
||||
|
@ -247,6 +250,7 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
|
|||
// Move applies client operation to the specified tree and pushes in queue
|
||||
// for replication on other nodes.
|
||||
func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) {
|
||||
defer s.metrics.AddOperation("Move", qos.IOTagFromContext(ctx))
|
||||
if !s.initialSyncDone.Load() {
|
||||
return nil, ErrAlreadySyncing
|
||||
}
|
||||
|
@ -290,6 +294,7 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er
|
|||
}
|
||||
|
||||
func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) {
|
||||
defer s.metrics.AddOperation("GetNodeByPath", qos.IOTagFromContext(ctx))
|
||||
if !s.initialSyncDone.Load() {
|
||||
return nil, ErrAlreadySyncing
|
||||
}
|
||||
|
@ -363,6 +368,7 @@ func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest)
|
|||
}
|
||||
|
||||
func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error {
|
||||
defer s.metrics.AddOperation("GetSubTree", qos.IOTagFromContext(srv.Context()))
|
||||
if !s.initialSyncDone.Load() {
|
||||
return ErrAlreadySyncing
|
||||
}
|
||||
|
@ -590,6 +596,7 @@ func sortByFilename(nodes []pilorama.NodeInfo, d GetSubTreeRequest_Body_Order_Di
|
|||
|
||||
// Apply locally applies operation from the remote node to the tree.
|
||||
func (s *Service) Apply(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) {
|
||||
defer s.metrics.AddOperation("Apply", qos.IOTagFromContext(ctx))
|
||||
err := verifyMessage(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -633,6 +640,7 @@ func (s *Service) Apply(ctx context.Context, req *ApplyRequest) (*ApplyResponse,
|
|||
}
|
||||
|
||||
func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {
|
||||
defer s.metrics.AddOperation("GetOpLog", qos.IOTagFromContext(srv.Context()))
|
||||
if !s.initialSyncDone.Load() {
|
||||
return ErrAlreadySyncing
|
||||
}
|
||||
|
@ -697,6 +705,7 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer)
|
|||
}
|
||||
|
||||
func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) {
|
||||
defer s.metrics.AddOperation("TreeList", qos.IOTagFromContext(ctx))
|
||||
if !s.initialSyncDone.Load() {
|
||||
return nil, ErrAlreadySyncing
|
||||
}
|
||||
|
|
This belongs to the commit
[#1653] qos: Add metrics
Could you explain why have you removed there lines?
After this commit limiter must be able to report metrics. But noop limiter is just stub. So not to add extra code for noop limiter I just dropped this code.