IO tag metrics #1653

Merged
dstepanov-yadro merged 4 commits from dstepanov-yadro/frostfs-node:feat/io_tag_metrics into master 2025-03-11 10:57:48 +00:00
18 changed files with 285 additions and 38 deletions

View file

@ -1048,6 +1048,7 @@ func (c *cfg) getShardOpts(ctx context.Context, shCfg shardCfg) shardOptsWithID
}
if c.metricsCollector != nil {
mbOptions = append(mbOptions, meta.WithMetrics(lsmetrics.NewMetabaseMetrics(shCfg.metaCfg.path, c.metricsCollector.MetabaseMetrics())))
shCfg.limiter.SetMetrics(c.metricsCollector.QoSMetrics())
}
var sh shardOptsWithID

View file

@ -47,7 +47,7 @@ func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublic
}
ioTag, err := qos.FromRawString(rawTag)
if err != nil {
s.logger.Warn(ctx, logs.FailedToParseIncomingIOTag, zap.Error(err))
s.logger.Debug(ctx, logs.FailedToParseIncomingIOTag, zap.Error(err))
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
}
@ -70,6 +70,7 @@ func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublic
return ctx
}
}
s.logger.Debug(ctx, logs.FailedToValidateIncomingIOTag)
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
case qos.IOTagInternal:
for _, pk := range s.allowedInternalPubs {
@ -87,9 +88,10 @@ func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublic
return ctx
}
}
s.logger.Debug(ctx, logs.FailedToValidateIncomingIOTag)
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
default:
s.logger.Warn(ctx, logs.NotSupportedIncomingIOTagReplacedWithClient, zap.Stringer("io_tag", ioTag))
s.logger.Debug(ctx, logs.NotSupportedIncomingIOTagReplacedWithClient, zap.Stringer("io_tag", ioTag))
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
}
}

View file

@ -513,4 +513,5 @@ const (
FailedToParseIncomingIOTag = "failed to parse incoming IO tag"
NotSupportedIncomingIOTagReplacedWithClient = "incoming IO tag is not supported, replaced with `client`"
FailedToGetNetmapToAdjustIOTag = "failed to get netmap to adjust IO tag, replaced with `client`"
FailedToValidateIncomingIOTag = "failed to validate incoming IO tag, replaced with `client`"
)

View file

@ -23,6 +23,7 @@ const (
policerSubsystem = "policer"
commonCacheSubsystem = "common_cache"
multinetSubsystem = "multinet"
qosSubsystem = "qos"
successLabel = "success"
shardIDLabel = "shard_id"
@ -43,6 +44,7 @@ const (
hitLabel = "hit"
cacheLabel = "cache"
sourceIPLabel = "source_ip"
ioTagLabel = "io_tag"
readWriteMode = "READ_WRITE"
readOnlyMode = "READ_ONLY"

View file

@ -26,6 +26,7 @@ type NodeMetrics struct {
morphCache *morphCacheMetrics
log logger.LogMetrics
multinet *multinetMetrics
qos *QoSMetrics
// nolint: unused
appInfo *ApplicationInfo
}
@ -55,6 +56,7 @@ func NewNodeMetrics() *NodeMetrics {
log: logger.NewLogMetrics(namespace),
appInfo: NewApplicationInfo(misc.Version),
multinet: newMultinetMetrics(namespace),
qos: newQoSMetrics(),
}
}
@ -126,3 +128,7 @@ func (m *NodeMetrics) LogMetrics() logger.LogMetrics {
func (m *NodeMetrics) MultinetMetrics() MultinetMetrics {
return m.multinet
}
func (m *NodeMetrics) QoSMetrics() *QoSMetrics {
return m.qos
}

View file

@ -9,13 +9,14 @@ import (
)
type ObjectServiceMetrics interface {
AddRequestDuration(method string, d time.Duration, success bool)
AddRequestDuration(method string, d time.Duration, success bool, ioTag string)
AddPayloadSize(method string, size int)
}
type objectServiceMetrics struct {
methodDuration *prometheus.HistogramVec
payloadCounter *prometheus.CounterVec
ioTagOpsCounter *prometheus.CounterVec
}
func newObjectServiceMetrics() *objectServiceMetrics {
@ -32,14 +33,24 @@ func newObjectServiceMetrics() *objectServiceMetrics {
Name: "request_payload_bytes",
Help: "Object Service request payload",
}, []string{methodLabel}),
ioTagOpsCounter: metrics.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: objectSubsystem,
Name: "requests_total",
Help: "Count of requests for each IO tag",
}, []string{methodLabel, ioTagLabel}),
}
}
func (m *objectServiceMetrics) AddRequestDuration(method string, d time.Duration, success bool) {
func (m *objectServiceMetrics) AddRequestDuration(method string, d time.Duration, success bool, ioTag string) {
m.methodDuration.With(prometheus.Labels{
methodLabel: method,
successLabel: strconv.FormatBool(success),
}).Observe(d.Seconds())
m.ioTagOpsCounter.With(prometheus.Labels{
ioTagLabel: ioTag,
methodLabel: method,
}).Inc()
}
func (m *objectServiceMetrics) AddPayloadSize(method string, size int) {

52
internal/metrics/qos.go Normal file
View file

@ -0,0 +1,52 @@
package metrics
import (
"git.frostfs.info/TrueCloudLab/frostfs-observability/metrics"
"github.com/prometheus/client_golang/prometheus"
)
type QoSMetrics struct {
opsCounter *prometheus.GaugeVec
}
func newQoSMetrics() *QoSMetrics {
return &QoSMetrics{
opsCounter: metrics.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: qosSubsystem,
Name: "operations_total",
Help: "Count of pending, in progree, completed and failed due of resource exhausted error operations for each shard",
}, []string{shardIDLabel, operationLabel, ioTagLabel, typeLabel}),
}
}
func (m *QoSMetrics) SetOperationTagCounters(shardID, operation, tag string, pending, inProgress, completed, resourceExhausted uint64) {
m.opsCounter.With(prometheus.Labels{
shardIDLabel: shardID,
operationLabel: operation,
ioTagLabel: tag,
typeLabel: "pending",
}).Set(float64(pending))
m.opsCounter.With(prometheus.Labels{
shardIDLabel: shardID,
operationLabel: operation,
ioTagLabel: tag,
typeLabel: "in_progress",
}).Set(float64(inProgress))
m.opsCounter.With(prometheus.Labels{
shardIDLabel: shardID,
operationLabel: operation,
ioTagLabel: tag,
typeLabel: "completed",
}).Set(float64(completed))
m.opsCounter.With(prometheus.Labels{
shardIDLabel: shardID,
operationLabel: operation,
ioTagLabel: tag,
typeLabel: "resource_exhausted",
}).Set(float64(resourceExhausted))
}
func (m *QoSMetrics) Close(shardID string) {
m.opsCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
}

View file

@ -12,12 +12,14 @@ type TreeMetricsRegister interface {
AddReplicateTaskDuration(time.Duration, bool)
AddReplicateWaitDuration(time.Duration, bool)
AddSyncDuration(time.Duration, bool)
AddOperation(string, string)
}
type treeServiceMetrics struct {
replicateTaskDuration *prometheus.HistogramVec
replicateWaitDuration *prometheus.HistogramVec
syncOpDuration *prometheus.HistogramVec
ioTagOpsCounter *prometheus.CounterVec
}
var _ TreeMetricsRegister = (*treeServiceMetrics)(nil)
@ -42,6 +44,12 @@ func newTreeServiceMetrics() *treeServiceMetrics {
Name: "sync_duration_seconds",
Help: "Duration of synchronization operations",
}, []string{successLabel}),
ioTagOpsCounter: metrics.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: treeServiceSubsystem,
Name: "requests_total",
Help: "Count of requests for each IO tag",
}, []string{methodLabel, ioTagLabel}),
}
}
@ -62,3 +70,10 @@ func (m *treeServiceMetrics) AddSyncDuration(d time.Duration, success bool) {
successLabel: strconv.FormatBool(success),
}).Observe(d.Seconds())
}
func (m *treeServiceMetrics) AddOperation(op string, ioTag string) {
m.ioTagOpsCounter.With(prometheus.Labels{
ioTagLabel: ioTag,
methodLabel: op,
}).Inc()
}

View file

@ -4,6 +4,8 @@ import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits"
@ -15,6 +17,9 @@ import (
const (
defaultIdleTimeout time.Duration = 0
defaultShare float64 = 1.0
minusOne = ^uint64(0)
defaultMetricsCollectTimeout = 5 * time.Second
)
type ReleaseFunc scheduling.ReleaseFunc
@ -22,6 +27,8 @@ type ReleaseFunc scheduling.ReleaseFunc
type Limiter interface {
ReadRequest(context.Context) (ReleaseFunc, error)
WriteRequest(context.Context) (ReleaseFunc, error)
SetParentID(string)
SetMetrics(Metrics)
Close()
}
@ -34,10 +41,6 @@ func NewLimiter(c *limits.Config) (Limiter, error) {
if err := validateConfig(c); err != nil {
return nil, err
}
read, write := c.Read(), c.Write()
fyrchik marked this conversation as resolved Outdated

This belongs to the commit [#1653] qos: Add metrics
Could you explain why have you removed there lines?

This belongs to the commit `[#1653] qos: Add metrics` Could you explain why have you removed there lines?

After this commit limiter must be able to report metrics. But noop limiter is just stub. So not to add extra code for noop limiter I just dropped this code.

After this commit limiter must be able to report metrics. But noop limiter is just stub. So not to add extra code for noop limiter I just dropped this code.
if isNoop(read, write) {
return noopLimiterInstance, nil
}
readScheduler, err := createScheduler(c.Read())
if err != nil {
return nil, fmt.Errorf("create read scheduler: %w", err)
@ -46,10 +49,18 @@ func NewLimiter(c *limits.Config) (Limiter, error) {
if err != nil {
return nil, fmt.Errorf("create write scheduler: %w", err)
}
return &mClockLimiter{
l := &mClockLimiter{
readScheduler: readScheduler,
writeScheduler: writeScheduler,
}, nil
closeCh: make(chan struct{}),
wg: &sync.WaitGroup{},
readStats: createStats(),
writeStats: createStats(),
}
l.shardID.Store(&shardID{})
l.metrics.Store(&metricsHolder{metrics: &noopMetrics{}})
l.startMetricsCollect()
return l, nil
}
func createScheduler(config limits.OpConfig) (scheduler, error) {
@ -91,7 +102,7 @@ var (
)
func NewNoopLimiter() Limiter {
return &noopLimiter{}
return noopLimiterInstance
}
type noopLimiter struct{}
@ -104,43 +115,109 @@ func (n *noopLimiter) WriteRequest(context.Context) (ReleaseFunc, error) {
return releaseStub, nil
}
func (n *noopLimiter) SetParentID(string) {}
func (n *noopLimiter) Close() {}
func (n *noopLimiter) SetMetrics(Metrics) {}
var _ Limiter = (*mClockLimiter)(nil)
type shardID struct {
id string
}
type mClockLimiter struct {
readScheduler scheduler
writeScheduler scheduler
fyrchik marked this conversation as resolved Outdated

This will contain a pointer to string anyway (implicitly, when setting interface value to string).
Why have you decided to use atomic.Value instead of atomic.Pointer?

This will contain a pointer to string anyway (implicitly, when setting interface value to string). Why have you decided to use `atomic.Value` instead of `atomic.Pointer`?

I scare about pointer dereference. But ok, replaced with atomic.Pointer[shardID]

I scare about pointer dereference. But ok, replaced with `atomic.Pointer[shardID]`
readStats map[string]*stat
writeStats map[string]*stat
shardID atomic.Pointer[shardID]
metrics atomic.Pointer[metricsHolder]
closeCh chan struct{}
wg *sync.WaitGroup
}
func (n *mClockLimiter) ReadRequest(ctx context.Context) (ReleaseFunc, error) {
return requestArrival(ctx, n.readScheduler)
return requestArrival(ctx, n.readScheduler, n.readStats)
}
func (n *mClockLimiter) WriteRequest(ctx context.Context) (ReleaseFunc, error) {
return requestArrival(ctx, n.writeScheduler)
return requestArrival(ctx, n.writeScheduler, n.writeStats)
}
func requestArrival(ctx context.Context, s scheduler) (ReleaseFunc, error) {
func requestArrival(ctx context.Context, s scheduler, stats map[string]*stat) (ReleaseFunc, error) {
tag, ok := tagging.IOTagFromContext(ctx)
if !ok {
tag = IOTagClient.String()
}
stat := getStat(tag, stats)
stat.pending.Add(1)
if tag == IOTagCritical.String() {
return releaseStub, nil
stat.inProgress.Add(1)
return func() {
stat.completed.Add(1)
}, nil
}
rel, err := s.RequestArrival(ctx, tag)
stat.inProgress.Add(1)
if err != nil {
if errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) ||
errors.Is(err, errSemaphoreLimitExceeded) {
stat.resourceExhausted.Add(1)
return nil, &apistatus.ResourceExhausted{}
}
stat.completed.Add(1)
fyrchik marked this conversation as resolved Outdated

It is important that stat.completed.Add() is called before rel?
The way I see it is that rel will release resources associated with the request, and we should "complete" request only afterwards. But OK, if it was delibarate.

It is important that `stat.completed.Add()` is called before `rel`? The way I see it is that `rel` will release resources associated with the request, and we should "complete" request only afterwards. But OK, if it was delibarate.

Fixed

Fixed
return nil, err
}
return ReleaseFunc(rel), nil
return func() {
rel()
stat.completed.Add(1)
}, nil
}
func (n *mClockLimiter) Close() {
n.readScheduler.Close()
n.writeScheduler.Close()
close(n.closeCh)
n.wg.Wait()
n.metrics.Load().metrics.Close(n.shardID.Load().id)
}
func (n *mClockLimiter) SetParentID(parentID string) {
n.shardID.Store(&shardID{id: parentID})
}
func (n *mClockLimiter) SetMetrics(m Metrics) {
n.metrics.Store(&metricsHolder{metrics: m})
}
func (n *mClockLimiter) startMetricsCollect() {
n.wg.Add(1)
go func() {
defer n.wg.Done()
ticker := time.NewTicker(defaultMetricsCollectTimeout)
defer ticker.Stop()
for {
select {
case <-n.closeCh:
return
case <-ticker.C:
shardID := n.shardID.Load().id
if shardID == "" {
continue
}
metrics := n.metrics.Load().metrics
for tag, s := range n.readStats {
metrics.SetOperationTagCounters(shardID, "read", tag, s.pending.Load(), s.inProgress.Load(), s.completed.Load(), s.resourceExhausted.Load())
}
for tag, s := range n.writeStats {
metrics.SetOperationTagCounters(shardID, "write", tag, s.pending.Load(), s.inProgress.Load(), s.completed.Load(), s.resourceExhausted.Load())
}
}
}
}()
}

31
internal/qos/metrics.go Normal file
View file

@ -0,0 +1,31 @@
package qos
import "sync/atomic"
type Metrics interface {
SetOperationTagCounters(shardID, operation, tag string, pending, inProgress, completed, resourceExhausted uint64)
Close(shardID string)
}
var _ Metrics = (*noopMetrics)(nil)
type noopMetrics struct{}
func (n *noopMetrics) SetOperationTagCounters(string, string, string, uint64, uint64, uint64, uint64) {
}
func (n *noopMetrics) Close(string) {}
// stat presents limiter statistics cumulative counters.
//
// Each operation changes its status as follows: `pending` -> `in_progress` -> `completed` or `resource_exhausted`.

So these metrics do not reflect an instantaneous snapshot, but rather an accumulated value?
It should be noted somewhere, because when I see metric{type="in_progress"}=123, my natural interpretation is "how much requests are currently being handled".
Maybe we could export in_progress - pending instead (without changing this struct)?

So these metrics do not reflect an instantaneous snapshot, but rather an accumulated value? It should be noted somewhere, because when I see `metric{type="in_progress"}=123`, my natural interpretation is "how much requests are currently being handled". Maybe we could export `in_progress - pending` instead (without changing this struct)?

Changed a little struct description.
complatedTotal might be clearer, but it looks cumbersome.
Prometheus metrics have _total suffix like others accumulated counters.
in_progress is also of interest, for example, to check the weights of tags.

Changed a little struct description. `complatedTotal` might be clearer, but it looks cumbersome. Prometheus metrics have `_total` suffix like others accumulated counters. `in_progress` is also of interest, for example, to check the weights of tags.
type stat struct {
completed atomic.Uint64
pending atomic.Uint64
resourceExhausted atomic.Uint64
inProgress atomic.Uint64
}
type metricsHolder struct {
metrics Metrics
}

28
internal/qos/stats.go Normal file
View file

@ -0,0 +1,28 @@
package qos
const unknownStatsTag = "unknown"
var statTags = map[string]struct{}{
IOTagClient.String(): {},
IOTagBackground.String(): {},
IOTagInternal.String(): {},
IOTagPolicer.String(): {},
IOTagWritecache.String(): {},
IOTagCritical.String(): {},
unknownStatsTag: {},
}
func createStats() map[string]*stat {
result := make(map[string]*stat)
for tag := range statTags {
result[tag] = &stat{}
}
return result
}
func getStat(tag string, stats map[string]*stat) *stat {
if v, ok := stats[tag]; ok {
return v
fyrchik marked this conversation as resolved Outdated

Is there any reason you use statTags map here instead of stats?
The first returned value could then be returned immediately without additional map access.

Is there any reason you use `statTags` map here instead of `stats`? The first returned value could then be returned immediately without additional map access.

No, as they must contain the same keys. Replaced with stats.

No, as they must contain the same keys. Replaced with `stats`.
}
return stats[unknownStatsTag]
}

How about change to this:

	if _, ok := stats[tag]; ok {
		return stats[tag]
	}
	return stats[unknownStatsTag]
How about change to this: ``` if _, ok := stats[tag]; ok { return stats[tag] } return stats[unknownStatsTag] ```

fixed

fixed

View file

@ -1,6 +1,11 @@
package qos
import "fmt"
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
)
type IOTag string
@ -37,3 +42,11 @@ func FromRawString(s string) (IOTag, error) {
func (t IOTag) String() string {
return string(t)
}
func IOTagFromContext(ctx context.Context) string {
tag, ok := tagging.IOTagFromContext(ctx)
if !ok {
tag = "undefined"
fyrchik marked this conversation as resolved Outdated

There is undefined here, and unknown in internal/qos/stats.go.
How do these strings relate and why do they differ?

There is `undefined` here, and `unknown` in `internal/qos/stats.go`. How do these strings relate and why do they differ?

This method used to get tag from context to report metrics. If current context doesn't contain any tag, then undefined placeholder used.

In internal/qos/stats.go before getting tag stats:

	tag, ok := tagging.IOTagFromContext(ctx)
	if !ok {
		tag = IOTagClient.String()
	}

This is because shard assumes that request without tag is client request.
So when when getting tag stat, tag must be defined. But it could be unknown in case of some minor bugs I think. mclock scheduler will return error in this case, but for metrics it is unknown tag.

This method used to get tag from context to report metrics. If current context doesn't contain any tag, then `undefined` placeholder used. In `internal/qos/stats.go` before getting tag stats: ``` tag, ok := tagging.IOTagFromContext(ctx) if !ok { tag = IOTagClient.String() } ``` This is because shard assumes that request without tag is `client` request. So when when getting tag stat, tag must be defined. But it could be unknown in case of some minor bugs I think. `mclock` scheduler will return error in this case, but for metrics it is `unknown` tag.

Also object service and tree service first adjust IO tags and then report metrics, so there must not be unknown tags.

Also object service and tree service first adjust IO tags and then report metrics, so there must not be `unknown` tags.
}
return tag
}

View file

@ -90,12 +90,3 @@ func float64Value(f *float64) float64 {
}
return *f
}
func isNoop(read, write limits.OpConfig) bool {
return read.MaxRunningOps == limits.NoLimit &&
read.MaxWaitingOps == limits.NoLimit &&
write.MaxRunningOps == limits.NoLimit &&
write.MaxWaitingOps == limits.NoLimit &&
len(read.Tags) == 0 &&
len(write.Tags) == 0
}

View file

@ -163,6 +163,8 @@ type testQoSLimiter struct {
write atomic.Int64
}
func (t *testQoSLimiter) SetMetrics(qos.Metrics) {}
func (t *testQoSLimiter) Close() {
require.Equal(t.t, int64(0), t.read.Load(), "read requests count after limiter close must be 0")
require.Equal(t.t, int64(0), t.write.Load(), "write requests count after limiter close must be 0")
@ -177,3 +179,5 @@ func (t *testQoSLimiter) WriteRequest(context.Context) (qos.ReleaseFunc, error)
t.write.Add(1)
return func() { t.write.Add(-1) }, nil
}
func (t *testQoSLimiter) SetParentID(string) {}

View file

@ -61,6 +61,7 @@ func (s *Shard) UpdateID(ctx context.Context) (err error) {
if s.pilorama != nil {
s.pilorama.SetParentID(s.info.ID.String())
}
s.opsLimiter.SetParentID(s.info.ID.String())
if len(idFromMetabase) == 0 && !modeDegraded {
if setErr := s.metaBase.SetShardID(ctx, *s.info.ID, s.GetMode()); setErr != nil {

View file

@ -4,6 +4,7 @@ import (
"context"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
)
@ -34,7 +35,7 @@ type (
}
MetricRegister interface {
AddRequestDuration(string, time.Duration, bool)
AddRequestDuration(string, time.Duration, bool, string)
AddPayloadSize(string, int)
}
)
@ -51,7 +52,7 @@ func (m MetricCollector) Get(req *object.GetRequest, stream GetObjectStream) (er
if m.enabled {
t := time.Now()
defer func() {
m.metrics.AddRequestDuration("Get", time.Since(t), err == nil)
m.metrics.AddRequestDuration("Get", time.Since(t), err == nil, qos.IOTagFromContext(stream.Context()))
}()
err = m.next.Get(req, &getStreamMetric{
ServerStream: stream,
@ -106,7 +107,7 @@ func (m MetricCollector) PutSingle(ctx context.Context, request *object.PutSingl
res, err := m.next.PutSingle(ctx, request)
m.metrics.AddRequestDuration("PutSingle", time.Since(t), err == nil)
m.metrics.AddRequestDuration("PutSingle", time.Since(t), err == nil, qos.IOTagFromContext(ctx))
if err == nil {
m.metrics.AddPayloadSize("PutSingle", len(request.GetBody().GetObject().GetPayload()))
}
@ -122,7 +123,7 @@ func (m MetricCollector) Head(ctx context.Context, request *object.HeadRequest)
res, err := m.next.Head(ctx, request)
m.metrics.AddRequestDuration("Head", time.Since(t), err == nil)
m.metrics.AddRequestDuration("Head", time.Since(t), err == nil, qos.IOTagFromContext(ctx))
return res, err
}
@ -135,7 +136,7 @@ func (m MetricCollector) Search(req *object.SearchRequest, stream SearchStream)
err := m.next.Search(req, stream)
m.metrics.AddRequestDuration("Search", time.Since(t), err == nil)
m.metrics.AddRequestDuration("Search", time.Since(t), err == nil, qos.IOTagFromContext(stream.Context()))
return err
}
@ -148,7 +149,7 @@ func (m MetricCollector) Delete(ctx context.Context, request *object.DeleteReque
res, err := m.next.Delete(ctx, request)
m.metrics.AddRequestDuration("Delete", time.Since(t), err == nil)
m.metrics.AddRequestDuration("Delete", time.Since(t), err == nil, qos.IOTagFromContext(ctx))
return res, err
}
return m.next.Delete(ctx, request)
@ -160,7 +161,7 @@ func (m MetricCollector) GetRange(req *object.GetRangeRequest, stream GetObjectR
err := m.next.GetRange(req, stream)
m.metrics.AddRequestDuration("GetRange", time.Since(t), err == nil)
m.metrics.AddRequestDuration("GetRange", time.Since(t), err == nil, qos.IOTagFromContext(stream.Context()))
return err
}
@ -173,7 +174,7 @@ func (m MetricCollector) GetRangeHash(ctx context.Context, request *object.GetRa
res, err := m.next.GetRangeHash(ctx, request)
m.metrics.AddRequestDuration("GetRangeHash", time.Since(t), err == nil)
m.metrics.AddRequestDuration("GetRangeHash", time.Since(t), err == nil, qos.IOTagFromContext(ctx))
return res, err
}
@ -209,7 +210,7 @@ func (s putStreamMetric) Send(ctx context.Context, req *object.PutRequest) error
func (s putStreamMetric) CloseAndRecv(ctx context.Context) (*object.PutResponse, error) {
res, err := s.stream.CloseAndRecv(ctx)
s.metrics.AddRequestDuration("Put", time.Since(s.start), err == nil)
s.metrics.AddRequestDuration("Put", time.Since(s.start), err == nil, qos.IOTagFromContext(ctx))
return res, err
}
@ -223,7 +224,7 @@ func (s patchStreamMetric) Send(ctx context.Context, req *object.PatchRequest) e
func (s patchStreamMetric) CloseAndRecv(ctx context.Context) (*object.PatchResponse, error) {
res, err := s.stream.CloseAndRecv(ctx)
s.metrics.AddRequestDuration("Patch", time.Since(s.start), err == nil)
s.metrics.AddRequestDuration("Patch", time.Since(s.start), err == nil, qos.IOTagFromContext(ctx))
return res, err
}

View file

@ -6,6 +6,7 @@ type MetricsRegister interface {
AddReplicateTaskDuration(time.Duration, bool)
AddReplicateWaitDuration(time.Duration, bool)
AddSyncDuration(time.Duration, bool)
AddOperation(string, string)
}
type defaultMetricsRegister struct{}
@ -13,3 +14,4 @@ type defaultMetricsRegister struct{}
func (defaultMetricsRegister) AddReplicateTaskDuration(time.Duration, bool) {}
func (defaultMetricsRegister) AddReplicateWaitDuration(time.Duration, bool) {}
func (defaultMetricsRegister) AddSyncDuration(time.Duration, bool) {}
func (defaultMetricsRegister) AddOperation(string, string) {}

View file

@ -105,6 +105,7 @@ func (s *Service) Shutdown() {
}
func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) {
defer s.metrics.AddOperation("Add", qos.IOTagFromContext(ctx))
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
@ -148,6 +149,7 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error
}
func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) {
defer s.metrics.AddOperation("AddByPath", qos.IOTagFromContext(ctx))
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
@ -203,6 +205,7 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP
}
func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) {
defer s.metrics.AddOperation("Remove", qos.IOTagFromContext(ctx))
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
@ -247,6 +250,7 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
// Move applies client operation to the specified tree and pushes in queue
// for replication on other nodes.
func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) {
defer s.metrics.AddOperation("Move", qos.IOTagFromContext(ctx))
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
@ -290,6 +294,7 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er
}
func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) {
defer s.metrics.AddOperation("GetNodeByPath", qos.IOTagFromContext(ctx))
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
@ -363,6 +368,7 @@ func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest)
}
func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error {
defer s.metrics.AddOperation("GetSubTree", qos.IOTagFromContext(srv.Context()))
if !s.initialSyncDone.Load() {
return ErrAlreadySyncing
}
@ -590,6 +596,7 @@ func sortByFilename(nodes []pilorama.NodeInfo, d GetSubTreeRequest_Body_Order_Di
// Apply locally applies operation from the remote node to the tree.
func (s *Service) Apply(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) {
defer s.metrics.AddOperation("Apply", qos.IOTagFromContext(ctx))
err := verifyMessage(req)
if err != nil {
return nil, err
@ -633,6 +640,7 @@ func (s *Service) Apply(ctx context.Context, req *ApplyRequest) (*ApplyResponse,
}
func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {
defer s.metrics.AddOperation("GetOpLog", qos.IOTagFromContext(srv.Context()))
if !s.initialSyncDone.Load() {
return ErrAlreadySyncing
}
@ -697,6 +705,7 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer)
}
func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) {
defer s.metrics.AddOperation("TreeList", qos.IOTagFromContext(ctx))
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}