diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index afde0bbc0..92aa827f2 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -1048,6 +1048,7 @@ func (c *cfg) getShardOpts(ctx context.Context, shCfg shardCfg) shardOptsWithID } if c.metricsCollector != nil { mbOptions = append(mbOptions, meta.WithMetrics(lsmetrics.NewMetabaseMetrics(shCfg.metaCfg.path, c.metricsCollector.MetabaseMetrics()))) + shCfg.limiter.SetMetrics(c.metricsCollector.QoSMetrics()) } var sh shardOptsWithID diff --git a/cmd/frostfs-node/qos.go b/cmd/frostfs-node/qos.go index bfc278333..9663fc6ae 100644 --- a/cmd/frostfs-node/qos.go +++ b/cmd/frostfs-node/qos.go @@ -47,7 +47,7 @@ func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublic } ioTag, err := qos.FromRawString(rawTag) if err != nil { - s.logger.Warn(ctx, logs.FailedToParseIncomingIOTag, zap.Error(err)) + s.logger.Debug(ctx, logs.FailedToParseIncomingIOTag, zap.Error(err)) return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String()) } @@ -70,6 +70,7 @@ func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublic return ctx } } + s.logger.Debug(ctx, logs.FailedToValidateIncomingIOTag) return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String()) case qos.IOTagInternal: for _, pk := range s.allowedInternalPubs { @@ -87,9 +88,10 @@ func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublic return ctx } } + s.logger.Debug(ctx, logs.FailedToValidateIncomingIOTag) return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String()) default: - s.logger.Warn(ctx, logs.NotSupportedIncomingIOTagReplacedWithClient, zap.Stringer("io_tag", ioTag)) + s.logger.Debug(ctx, logs.NotSupportedIncomingIOTagReplacedWithClient, zap.Stringer("io_tag", ioTag)) return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String()) } } diff --git a/internal/logs/logs.go b/internal/logs/logs.go index d07f47fbf..6115cdf90 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -513,4 +513,5 @@ const ( FailedToParseIncomingIOTag = "failed to parse incoming IO tag" NotSupportedIncomingIOTagReplacedWithClient = "incoming IO tag is not supported, replaced with `client`" FailedToGetNetmapToAdjustIOTag = "failed to get netmap to adjust IO tag, replaced with `client`" + FailedToValidateIncomingIOTag = "failed to validate incoming IO tag, replaced with `client`" ) diff --git a/internal/metrics/consts.go b/internal/metrics/consts.go index cb165de69..9123541ff 100644 --- a/internal/metrics/consts.go +++ b/internal/metrics/consts.go @@ -23,6 +23,7 @@ const ( policerSubsystem = "policer" commonCacheSubsystem = "common_cache" multinetSubsystem = "multinet" + qosSubsystem = "qos" successLabel = "success" shardIDLabel = "shard_id" @@ -43,6 +44,7 @@ const ( hitLabel = "hit" cacheLabel = "cache" sourceIPLabel = "source_ip" + ioTagLabel = "io_tag" readWriteMode = "READ_WRITE" readOnlyMode = "READ_ONLY" diff --git a/internal/metrics/node.go b/internal/metrics/node.go index 4ea3c7c24..8ade19eb2 100644 --- a/internal/metrics/node.go +++ b/internal/metrics/node.go @@ -26,6 +26,7 @@ type NodeMetrics struct { morphCache *morphCacheMetrics log logger.LogMetrics multinet *multinetMetrics + qos *QoSMetrics // nolint: unused appInfo *ApplicationInfo } @@ -55,6 +56,7 @@ func NewNodeMetrics() *NodeMetrics { log: logger.NewLogMetrics(namespace), appInfo: NewApplicationInfo(misc.Version), multinet: newMultinetMetrics(namespace), + qos: newQoSMetrics(), } } @@ -126,3 +128,7 @@ func (m *NodeMetrics) LogMetrics() logger.LogMetrics { func (m *NodeMetrics) MultinetMetrics() MultinetMetrics { return m.multinet } + +func (m *NodeMetrics) QoSMetrics() *QoSMetrics { + return m.qos +} diff --git a/internal/metrics/object.go b/internal/metrics/object.go index 0ba994ed3..e4f6dfde1 100644 --- a/internal/metrics/object.go +++ b/internal/metrics/object.go @@ -9,13 +9,14 @@ import ( ) type ObjectServiceMetrics interface { - AddRequestDuration(method string, d time.Duration, success bool) + AddRequestDuration(method string, d time.Duration, success bool, ioTag string) AddPayloadSize(method string, size int) } type objectServiceMetrics struct { - methodDuration *prometheus.HistogramVec - payloadCounter *prometheus.CounterVec + methodDuration *prometheus.HistogramVec + payloadCounter *prometheus.CounterVec + ioTagOpsCounter *prometheus.CounterVec } func newObjectServiceMetrics() *objectServiceMetrics { @@ -32,14 +33,24 @@ func newObjectServiceMetrics() *objectServiceMetrics { Name: "request_payload_bytes", Help: "Object Service request payload", }, []string{methodLabel}), + ioTagOpsCounter: metrics.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: objectSubsystem, + Name: "requests_total", + Help: "Count of requests for each IO tag", + }, []string{methodLabel, ioTagLabel}), } } -func (m *objectServiceMetrics) AddRequestDuration(method string, d time.Duration, success bool) { +func (m *objectServiceMetrics) AddRequestDuration(method string, d time.Duration, success bool, ioTag string) { m.methodDuration.With(prometheus.Labels{ methodLabel: method, successLabel: strconv.FormatBool(success), }).Observe(d.Seconds()) + m.ioTagOpsCounter.With(prometheus.Labels{ + ioTagLabel: ioTag, + methodLabel: method, + }).Inc() } func (m *objectServiceMetrics) AddPayloadSize(method string, size int) { diff --git a/internal/metrics/qos.go b/internal/metrics/qos.go new file mode 100644 index 000000000..17fb67a27 --- /dev/null +++ b/internal/metrics/qos.go @@ -0,0 +1,52 @@ +package metrics + +import ( + "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics" + "github.com/prometheus/client_golang/prometheus" +) + +type QoSMetrics struct { + opsCounter *prometheus.GaugeVec +} + +func newQoSMetrics() *QoSMetrics { + return &QoSMetrics{ + opsCounter: metrics.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: qosSubsystem, + Name: "operations_total", + Help: "Count of pending, in progree, completed and failed due of resource exhausted error operations for each shard", + }, []string{shardIDLabel, operationLabel, ioTagLabel, typeLabel}), + } +} + +func (m *QoSMetrics) SetOperationTagCounters(shardID, operation, tag string, pending, inProgress, completed, resourceExhausted uint64) { + m.opsCounter.With(prometheus.Labels{ + shardIDLabel: shardID, + operationLabel: operation, + ioTagLabel: tag, + typeLabel: "pending", + }).Set(float64(pending)) + m.opsCounter.With(prometheus.Labels{ + shardIDLabel: shardID, + operationLabel: operation, + ioTagLabel: tag, + typeLabel: "in_progress", + }).Set(float64(inProgress)) + m.opsCounter.With(prometheus.Labels{ + shardIDLabel: shardID, + operationLabel: operation, + ioTagLabel: tag, + typeLabel: "completed", + }).Set(float64(completed)) + m.opsCounter.With(prometheus.Labels{ + shardIDLabel: shardID, + operationLabel: operation, + ioTagLabel: tag, + typeLabel: "resource_exhausted", + }).Set(float64(resourceExhausted)) +} + +func (m *QoSMetrics) Close(shardID string) { + m.opsCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID}) +} diff --git a/internal/metrics/treeservice.go b/internal/metrics/treeservice.go index 6702aa83c..e192c4398 100644 --- a/internal/metrics/treeservice.go +++ b/internal/metrics/treeservice.go @@ -12,12 +12,14 @@ type TreeMetricsRegister interface { AddReplicateTaskDuration(time.Duration, bool) AddReplicateWaitDuration(time.Duration, bool) AddSyncDuration(time.Duration, bool) + AddOperation(string, string) } type treeServiceMetrics struct { replicateTaskDuration *prometheus.HistogramVec replicateWaitDuration *prometheus.HistogramVec syncOpDuration *prometheus.HistogramVec + ioTagOpsCounter *prometheus.CounterVec } var _ TreeMetricsRegister = (*treeServiceMetrics)(nil) @@ -42,6 +44,12 @@ func newTreeServiceMetrics() *treeServiceMetrics { Name: "sync_duration_seconds", Help: "Duration of synchronization operations", }, []string{successLabel}), + ioTagOpsCounter: metrics.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: treeServiceSubsystem, + Name: "requests_total", + Help: "Count of requests for each IO tag", + }, []string{methodLabel, ioTagLabel}), } } @@ -62,3 +70,10 @@ func (m *treeServiceMetrics) AddSyncDuration(d time.Duration, success bool) { successLabel: strconv.FormatBool(success), }).Observe(d.Seconds()) } + +func (m *treeServiceMetrics) AddOperation(op string, ioTag string) { + m.ioTagOpsCounter.With(prometheus.Labels{ + ioTagLabel: ioTag, + methodLabel: op, + }).Inc() +} diff --git a/internal/qos/limiter.go b/internal/qos/limiter.go index b1406a7f3..8f00791c5 100644 --- a/internal/qos/limiter.go +++ b/internal/qos/limiter.go @@ -4,6 +4,8 @@ import ( "context" "errors" "fmt" + "sync" + "sync/atomic" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits" @@ -15,6 +17,9 @@ import ( const ( defaultIdleTimeout time.Duration = 0 defaultShare float64 = 1.0 + minusOne = ^uint64(0) + + defaultMetricsCollectTimeout = 5 * time.Second ) type ReleaseFunc scheduling.ReleaseFunc @@ -22,6 +27,8 @@ type ReleaseFunc scheduling.ReleaseFunc type Limiter interface { ReadRequest(context.Context) (ReleaseFunc, error) WriteRequest(context.Context) (ReleaseFunc, error) + SetParentID(string) + SetMetrics(Metrics) Close() } @@ -34,10 +41,6 @@ func NewLimiter(c *limits.Config) (Limiter, error) { if err := validateConfig(c); err != nil { return nil, err } - read, write := c.Read(), c.Write() - if isNoop(read, write) { - return noopLimiterInstance, nil - } readScheduler, err := createScheduler(c.Read()) if err != nil { return nil, fmt.Errorf("create read scheduler: %w", err) @@ -46,10 +49,18 @@ func NewLimiter(c *limits.Config) (Limiter, error) { if err != nil { return nil, fmt.Errorf("create write scheduler: %w", err) } - return &mClockLimiter{ + l := &mClockLimiter{ readScheduler: readScheduler, writeScheduler: writeScheduler, - }, nil + closeCh: make(chan struct{}), + wg: &sync.WaitGroup{}, + readStats: createStats(), + writeStats: createStats(), + } + l.shardID.Store(&shardID{}) + l.metrics.Store(&metricsHolder{metrics: &noopMetrics{}}) + l.startMetricsCollect() + return l, nil } func createScheduler(config limits.OpConfig) (scheduler, error) { @@ -91,7 +102,7 @@ var ( ) func NewNoopLimiter() Limiter { - return &noopLimiter{} + return noopLimiterInstance } type noopLimiter struct{} @@ -104,43 +115,109 @@ func (n *noopLimiter) WriteRequest(context.Context) (ReleaseFunc, error) { return releaseStub, nil } +func (n *noopLimiter) SetParentID(string) {} + func (n *noopLimiter) Close() {} +func (n *noopLimiter) SetMetrics(Metrics) {} + var _ Limiter = (*mClockLimiter)(nil) +type shardID struct { + id string +} + type mClockLimiter struct { readScheduler scheduler writeScheduler scheduler + + readStats map[string]*stat + writeStats map[string]*stat + + shardID atomic.Pointer[shardID] + metrics atomic.Pointer[metricsHolder] + closeCh chan struct{} + wg *sync.WaitGroup } func (n *mClockLimiter) ReadRequest(ctx context.Context) (ReleaseFunc, error) { - return requestArrival(ctx, n.readScheduler) + return requestArrival(ctx, n.readScheduler, n.readStats) } func (n *mClockLimiter) WriteRequest(ctx context.Context) (ReleaseFunc, error) { - return requestArrival(ctx, n.writeScheduler) + return requestArrival(ctx, n.writeScheduler, n.writeStats) } -func requestArrival(ctx context.Context, s scheduler) (ReleaseFunc, error) { +func requestArrival(ctx context.Context, s scheduler, stats map[string]*stat) (ReleaseFunc, error) { tag, ok := tagging.IOTagFromContext(ctx) if !ok { tag = IOTagClient.String() } + stat := getStat(tag, stats) + stat.pending.Add(1) if tag == IOTagCritical.String() { - return releaseStub, nil + stat.inProgress.Add(1) + return func() { + stat.completed.Add(1) + }, nil } rel, err := s.RequestArrival(ctx, tag) + stat.inProgress.Add(1) if err != nil { if errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) || errors.Is(err, errSemaphoreLimitExceeded) { + stat.resourceExhausted.Add(1) return nil, &apistatus.ResourceExhausted{} } + stat.completed.Add(1) return nil, err } - return ReleaseFunc(rel), nil + return func() { + rel() + stat.completed.Add(1) + }, nil } func (n *mClockLimiter) Close() { n.readScheduler.Close() n.writeScheduler.Close() + close(n.closeCh) + n.wg.Wait() + n.metrics.Load().metrics.Close(n.shardID.Load().id) +} + +func (n *mClockLimiter) SetParentID(parentID string) { + n.shardID.Store(&shardID{id: parentID}) +} + +func (n *mClockLimiter) SetMetrics(m Metrics) { + n.metrics.Store(&metricsHolder{metrics: m}) +} + +func (n *mClockLimiter) startMetricsCollect() { + n.wg.Add(1) + go func() { + defer n.wg.Done() + + ticker := time.NewTicker(defaultMetricsCollectTimeout) + defer ticker.Stop() + for { + select { + case <-n.closeCh: + return + case <-ticker.C: + shardID := n.shardID.Load().id + if shardID == "" { + continue + } + metrics := n.metrics.Load().metrics + for tag, s := range n.readStats { + metrics.SetOperationTagCounters(shardID, "read", tag, s.pending.Load(), s.inProgress.Load(), s.completed.Load(), s.resourceExhausted.Load()) + } + for tag, s := range n.writeStats { + metrics.SetOperationTagCounters(shardID, "write", tag, s.pending.Load(), s.inProgress.Load(), s.completed.Load(), s.resourceExhausted.Load()) + } + } + } + }() } diff --git a/internal/qos/metrics.go b/internal/qos/metrics.go new file mode 100644 index 000000000..c00da51b7 --- /dev/null +++ b/internal/qos/metrics.go @@ -0,0 +1,31 @@ +package qos + +import "sync/atomic" + +type Metrics interface { + SetOperationTagCounters(shardID, operation, tag string, pending, inProgress, completed, resourceExhausted uint64) + Close(shardID string) +} + +var _ Metrics = (*noopMetrics)(nil) + +type noopMetrics struct{} + +func (n *noopMetrics) SetOperationTagCounters(string, string, string, uint64, uint64, uint64, uint64) { +} + +func (n *noopMetrics) Close(string) {} + +// stat presents limiter statistics cumulative counters. +// +// Each operation changes its status as follows: `pending` -> `in_progress` -> `completed` or `resource_exhausted`. +type stat struct { + completed atomic.Uint64 + pending atomic.Uint64 + resourceExhausted atomic.Uint64 + inProgress atomic.Uint64 +} + +type metricsHolder struct { + metrics Metrics +} diff --git a/internal/qos/stats.go b/internal/qos/stats.go new file mode 100644 index 000000000..f077f552b --- /dev/null +++ b/internal/qos/stats.go @@ -0,0 +1,28 @@ +package qos + +const unknownStatsTag = "unknown" + +var statTags = map[string]struct{}{ + IOTagClient.String(): {}, + IOTagBackground.String(): {}, + IOTagInternal.String(): {}, + IOTagPolicer.String(): {}, + IOTagWritecache.String(): {}, + IOTagCritical.String(): {}, + unknownStatsTag: {}, +} + +func createStats() map[string]*stat { + result := make(map[string]*stat) + for tag := range statTags { + result[tag] = &stat{} + } + return result +} + +func getStat(tag string, stats map[string]*stat) *stat { + if v, ok := stats[tag]; ok { + return v + } + return stats[unknownStatsTag] +} diff --git a/internal/qos/tags.go b/internal/qos/tags.go index 6a9a7f7a4..9db45f190 100644 --- a/internal/qos/tags.go +++ b/internal/qos/tags.go @@ -1,6 +1,11 @@ package qos -import "fmt" +import ( + "context" + "fmt" + + "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging" +) type IOTag string @@ -37,3 +42,11 @@ func FromRawString(s string) (IOTag, error) { func (t IOTag) String() string { return string(t) } + +func IOTagFromContext(ctx context.Context) string { + tag, ok := tagging.IOTagFromContext(ctx) + if !ok { + tag = "undefined" + } + return tag +} diff --git a/internal/qos/validate.go b/internal/qos/validate.go index 43aa74942..3fa4ebbd1 100644 --- a/internal/qos/validate.go +++ b/internal/qos/validate.go @@ -90,12 +90,3 @@ func float64Value(f *float64) float64 { } return *f } - -func isNoop(read, write limits.OpConfig) bool { - return read.MaxRunningOps == limits.NoLimit && - read.MaxWaitingOps == limits.NoLimit && - write.MaxRunningOps == limits.NoLimit && - write.MaxWaitingOps == limits.NoLimit && - len(read.Tags) == 0 && - len(write.Tags) == 0 -} diff --git a/pkg/local_object_storage/engine/engine_test.go b/pkg/local_object_storage/engine/engine_test.go index 7ddde1f02..3f9196128 100644 --- a/pkg/local_object_storage/engine/engine_test.go +++ b/pkg/local_object_storage/engine/engine_test.go @@ -163,6 +163,8 @@ type testQoSLimiter struct { write atomic.Int64 } +func (t *testQoSLimiter) SetMetrics(qos.Metrics) {} + func (t *testQoSLimiter) Close() { require.Equal(t.t, int64(0), t.read.Load(), "read requests count after limiter close must be 0") require.Equal(t.t, int64(0), t.write.Load(), "write requests count after limiter close must be 0") @@ -177,3 +179,5 @@ func (t *testQoSLimiter) WriteRequest(context.Context) (qos.ReleaseFunc, error) t.write.Add(1) return func() { t.write.Add(-1) }, nil } + +func (t *testQoSLimiter) SetParentID(string) {} diff --git a/pkg/local_object_storage/shard/id.go b/pkg/local_object_storage/shard/id.go index 26492cf01..b233b705c 100644 --- a/pkg/local_object_storage/shard/id.go +++ b/pkg/local_object_storage/shard/id.go @@ -61,6 +61,7 @@ func (s *Shard) UpdateID(ctx context.Context) (err error) { if s.pilorama != nil { s.pilorama.SetParentID(s.info.ID.String()) } + s.opsLimiter.SetParentID(s.info.ID.String()) if len(idFromMetabase) == 0 && !modeDegraded { if setErr := s.metaBase.SetShardID(ctx, *s.info.ID, s.GetMode()); setErr != nil { diff --git a/pkg/services/object/metrics.go b/pkg/services/object/metrics.go index 19748e938..6a6ee0f0f 100644 --- a/pkg/services/object/metrics.go +++ b/pkg/services/object/metrics.go @@ -4,6 +4,7 @@ import ( "context" "time" + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object" ) @@ -34,7 +35,7 @@ type ( } MetricRegister interface { - AddRequestDuration(string, time.Duration, bool) + AddRequestDuration(string, time.Duration, bool, string) AddPayloadSize(string, int) } ) @@ -51,7 +52,7 @@ func (m MetricCollector) Get(req *object.GetRequest, stream GetObjectStream) (er if m.enabled { t := time.Now() defer func() { - m.metrics.AddRequestDuration("Get", time.Since(t), err == nil) + m.metrics.AddRequestDuration("Get", time.Since(t), err == nil, qos.IOTagFromContext(stream.Context())) }() err = m.next.Get(req, &getStreamMetric{ ServerStream: stream, @@ -106,7 +107,7 @@ func (m MetricCollector) PutSingle(ctx context.Context, request *object.PutSingl res, err := m.next.PutSingle(ctx, request) - m.metrics.AddRequestDuration("PutSingle", time.Since(t), err == nil) + m.metrics.AddRequestDuration("PutSingle", time.Since(t), err == nil, qos.IOTagFromContext(ctx)) if err == nil { m.metrics.AddPayloadSize("PutSingle", len(request.GetBody().GetObject().GetPayload())) } @@ -122,7 +123,7 @@ func (m MetricCollector) Head(ctx context.Context, request *object.HeadRequest) res, err := m.next.Head(ctx, request) - m.metrics.AddRequestDuration("Head", time.Since(t), err == nil) + m.metrics.AddRequestDuration("Head", time.Since(t), err == nil, qos.IOTagFromContext(ctx)) return res, err } @@ -135,7 +136,7 @@ func (m MetricCollector) Search(req *object.SearchRequest, stream SearchStream) err := m.next.Search(req, stream) - m.metrics.AddRequestDuration("Search", time.Since(t), err == nil) + m.metrics.AddRequestDuration("Search", time.Since(t), err == nil, qos.IOTagFromContext(stream.Context())) return err } @@ -148,7 +149,7 @@ func (m MetricCollector) Delete(ctx context.Context, request *object.DeleteReque res, err := m.next.Delete(ctx, request) - m.metrics.AddRequestDuration("Delete", time.Since(t), err == nil) + m.metrics.AddRequestDuration("Delete", time.Since(t), err == nil, qos.IOTagFromContext(ctx)) return res, err } return m.next.Delete(ctx, request) @@ -160,7 +161,7 @@ func (m MetricCollector) GetRange(req *object.GetRangeRequest, stream GetObjectR err := m.next.GetRange(req, stream) - m.metrics.AddRequestDuration("GetRange", time.Since(t), err == nil) + m.metrics.AddRequestDuration("GetRange", time.Since(t), err == nil, qos.IOTagFromContext(stream.Context())) return err } @@ -173,7 +174,7 @@ func (m MetricCollector) GetRangeHash(ctx context.Context, request *object.GetRa res, err := m.next.GetRangeHash(ctx, request) - m.metrics.AddRequestDuration("GetRangeHash", time.Since(t), err == nil) + m.metrics.AddRequestDuration("GetRangeHash", time.Since(t), err == nil, qos.IOTagFromContext(ctx)) return res, err } @@ -209,7 +210,7 @@ func (s putStreamMetric) Send(ctx context.Context, req *object.PutRequest) error func (s putStreamMetric) CloseAndRecv(ctx context.Context) (*object.PutResponse, error) { res, err := s.stream.CloseAndRecv(ctx) - s.metrics.AddRequestDuration("Put", time.Since(s.start), err == nil) + s.metrics.AddRequestDuration("Put", time.Since(s.start), err == nil, qos.IOTagFromContext(ctx)) return res, err } @@ -223,7 +224,7 @@ func (s patchStreamMetric) Send(ctx context.Context, req *object.PatchRequest) e func (s patchStreamMetric) CloseAndRecv(ctx context.Context) (*object.PatchResponse, error) { res, err := s.stream.CloseAndRecv(ctx) - s.metrics.AddRequestDuration("Patch", time.Since(s.start), err == nil) + s.metrics.AddRequestDuration("Patch", time.Since(s.start), err == nil, qos.IOTagFromContext(ctx)) return res, err } diff --git a/pkg/services/tree/metrics.go b/pkg/services/tree/metrics.go index 0f0e4ee57..07503f8c3 100644 --- a/pkg/services/tree/metrics.go +++ b/pkg/services/tree/metrics.go @@ -6,6 +6,7 @@ type MetricsRegister interface { AddReplicateTaskDuration(time.Duration, bool) AddReplicateWaitDuration(time.Duration, bool) AddSyncDuration(time.Duration, bool) + AddOperation(string, string) } type defaultMetricsRegister struct{} @@ -13,3 +14,4 @@ type defaultMetricsRegister struct{} func (defaultMetricsRegister) AddReplicateTaskDuration(time.Duration, bool) {} func (defaultMetricsRegister) AddReplicateWaitDuration(time.Duration, bool) {} func (defaultMetricsRegister) AddSyncDuration(time.Duration, bool) {} +func (defaultMetricsRegister) AddOperation(string, string) {} diff --git a/pkg/services/tree/service.go b/pkg/services/tree/service.go index 2e9722e79..f9b7395e7 100644 --- a/pkg/services/tree/service.go +++ b/pkg/services/tree/service.go @@ -105,6 +105,7 @@ func (s *Service) Shutdown() { } func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) { + defer s.metrics.AddOperation("Add", qos.IOTagFromContext(ctx)) if !s.initialSyncDone.Load() { return nil, ErrAlreadySyncing } @@ -148,6 +149,7 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error } func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) { + defer s.metrics.AddOperation("AddByPath", qos.IOTagFromContext(ctx)) if !s.initialSyncDone.Load() { return nil, ErrAlreadySyncing } @@ -203,6 +205,7 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP } func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) { + defer s.metrics.AddOperation("Remove", qos.IOTagFromContext(ctx)) if !s.initialSyncDone.Load() { return nil, ErrAlreadySyncing } @@ -247,6 +250,7 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon // Move applies client operation to the specified tree and pushes in queue // for replication on other nodes. func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) { + defer s.metrics.AddOperation("Move", qos.IOTagFromContext(ctx)) if !s.initialSyncDone.Load() { return nil, ErrAlreadySyncing } @@ -290,6 +294,7 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er } func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) { + defer s.metrics.AddOperation("GetNodeByPath", qos.IOTagFromContext(ctx)) if !s.initialSyncDone.Load() { return nil, ErrAlreadySyncing } @@ -363,6 +368,7 @@ func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) } func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error { + defer s.metrics.AddOperation("GetSubTree", qos.IOTagFromContext(srv.Context())) if !s.initialSyncDone.Load() { return ErrAlreadySyncing } @@ -590,6 +596,7 @@ func sortByFilename(nodes []pilorama.NodeInfo, d GetSubTreeRequest_Body_Order_Di // Apply locally applies operation from the remote node to the tree. func (s *Service) Apply(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) { + defer s.metrics.AddOperation("Apply", qos.IOTagFromContext(ctx)) err := verifyMessage(req) if err != nil { return nil, err @@ -633,6 +640,7 @@ func (s *Service) Apply(ctx context.Context, req *ApplyRequest) (*ApplyResponse, } func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error { + defer s.metrics.AddOperation("GetOpLog", qos.IOTagFromContext(srv.Context())) if !s.initialSyncDone.Load() { return ErrAlreadySyncing } @@ -697,6 +705,7 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) } func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) { + defer s.metrics.AddOperation("TreeList", qos.IOTagFromContext(ctx)) if !s.initialSyncDone.Load() { return nil, ErrAlreadySyncing }