IO tag metrics #1653

Merged
dstepanov-yadro merged 4 commits from dstepanov-yadro/frostfs-node:feat/io_tag_metrics into master 2025-03-11 10:57:48 +00:00
18 changed files with 285 additions and 38 deletions

View file

@ -1048,6 +1048,7 @@ func (c *cfg) getShardOpts(ctx context.Context, shCfg shardCfg) shardOptsWithID
} }
if c.metricsCollector != nil { if c.metricsCollector != nil {
mbOptions = append(mbOptions, meta.WithMetrics(lsmetrics.NewMetabaseMetrics(shCfg.metaCfg.path, c.metricsCollector.MetabaseMetrics()))) mbOptions = append(mbOptions, meta.WithMetrics(lsmetrics.NewMetabaseMetrics(shCfg.metaCfg.path, c.metricsCollector.MetabaseMetrics())))
shCfg.limiter.SetMetrics(c.metricsCollector.QoSMetrics())
} }
var sh shardOptsWithID var sh shardOptsWithID

View file

@ -47,7 +47,7 @@ func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublic
} }
ioTag, err := qos.FromRawString(rawTag) ioTag, err := qos.FromRawString(rawTag)
if err != nil { if err != nil {
s.logger.Warn(ctx, logs.FailedToParseIncomingIOTag, zap.Error(err)) s.logger.Debug(ctx, logs.FailedToParseIncomingIOTag, zap.Error(err))
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String()) return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
} }
@ -70,6 +70,7 @@ func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublic
return ctx return ctx
} }
} }
s.logger.Debug(ctx, logs.FailedToValidateIncomingIOTag)
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String()) return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
case qos.IOTagInternal: case qos.IOTagInternal:
for _, pk := range s.allowedInternalPubs { for _, pk := range s.allowedInternalPubs {
@ -87,9 +88,10 @@ func (s *cfgQoSService) AdjustIncomingTag(ctx context.Context, requestSignPublic
return ctx return ctx
} }
} }
s.logger.Debug(ctx, logs.FailedToValidateIncomingIOTag)
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String()) return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
default: default:
s.logger.Warn(ctx, logs.NotSupportedIncomingIOTagReplacedWithClient, zap.Stringer("io_tag", ioTag)) s.logger.Debug(ctx, logs.NotSupportedIncomingIOTagReplacedWithClient, zap.Stringer("io_tag", ioTag))
return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String()) return qosTagging.ContextWithIOTag(ctx, qos.IOTagClient.String())
} }
} }

View file

@ -513,4 +513,5 @@ const (
FailedToParseIncomingIOTag = "failed to parse incoming IO tag" FailedToParseIncomingIOTag = "failed to parse incoming IO tag"
NotSupportedIncomingIOTagReplacedWithClient = "incoming IO tag is not supported, replaced with `client`" NotSupportedIncomingIOTagReplacedWithClient = "incoming IO tag is not supported, replaced with `client`"
FailedToGetNetmapToAdjustIOTag = "failed to get netmap to adjust IO tag, replaced with `client`" FailedToGetNetmapToAdjustIOTag = "failed to get netmap to adjust IO tag, replaced with `client`"
FailedToValidateIncomingIOTag = "failed to validate incoming IO tag, replaced with `client`"
) )

View file

@ -23,6 +23,7 @@ const (
policerSubsystem = "policer" policerSubsystem = "policer"
commonCacheSubsystem = "common_cache" commonCacheSubsystem = "common_cache"
multinetSubsystem = "multinet" multinetSubsystem = "multinet"
qosSubsystem = "qos"
successLabel = "success" successLabel = "success"
shardIDLabel = "shard_id" shardIDLabel = "shard_id"
@ -43,6 +44,7 @@ const (
hitLabel = "hit" hitLabel = "hit"
cacheLabel = "cache" cacheLabel = "cache"
sourceIPLabel = "source_ip" sourceIPLabel = "source_ip"
ioTagLabel = "io_tag"
readWriteMode = "READ_WRITE" readWriteMode = "READ_WRITE"
readOnlyMode = "READ_ONLY" readOnlyMode = "READ_ONLY"

View file

@ -26,6 +26,7 @@ type NodeMetrics struct {
morphCache *morphCacheMetrics morphCache *morphCacheMetrics
log logger.LogMetrics log logger.LogMetrics
multinet *multinetMetrics multinet *multinetMetrics
qos *QoSMetrics
// nolint: unused // nolint: unused
appInfo *ApplicationInfo appInfo *ApplicationInfo
} }
@ -55,6 +56,7 @@ func NewNodeMetrics() *NodeMetrics {
log: logger.NewLogMetrics(namespace), log: logger.NewLogMetrics(namespace),
appInfo: NewApplicationInfo(misc.Version), appInfo: NewApplicationInfo(misc.Version),
multinet: newMultinetMetrics(namespace), multinet: newMultinetMetrics(namespace),
qos: newQoSMetrics(),
} }
} }
@ -126,3 +128,7 @@ func (m *NodeMetrics) LogMetrics() logger.LogMetrics {
func (m *NodeMetrics) MultinetMetrics() MultinetMetrics { func (m *NodeMetrics) MultinetMetrics() MultinetMetrics {
return m.multinet return m.multinet
} }
func (m *NodeMetrics) QoSMetrics() *QoSMetrics {
return m.qos
}

View file

@ -9,13 +9,14 @@ import (
) )
type ObjectServiceMetrics interface { type ObjectServiceMetrics interface {
AddRequestDuration(method string, d time.Duration, success bool) AddRequestDuration(method string, d time.Duration, success bool, ioTag string)
AddPayloadSize(method string, size int) AddPayloadSize(method string, size int)
} }
type objectServiceMetrics struct { type objectServiceMetrics struct {
methodDuration *prometheus.HistogramVec methodDuration *prometheus.HistogramVec
payloadCounter *prometheus.CounterVec payloadCounter *prometheus.CounterVec
ioTagOpsCounter *prometheus.CounterVec
} }
func newObjectServiceMetrics() *objectServiceMetrics { func newObjectServiceMetrics() *objectServiceMetrics {
@ -32,14 +33,24 @@ func newObjectServiceMetrics() *objectServiceMetrics {
Name: "request_payload_bytes", Name: "request_payload_bytes",
Help: "Object Service request payload", Help: "Object Service request payload",
}, []string{methodLabel}), }, []string{methodLabel}),
ioTagOpsCounter: metrics.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: objectSubsystem,
Name: "requests_total",
Help: "Count of requests for each IO tag",
}, []string{methodLabel, ioTagLabel}),
} }
} }
func (m *objectServiceMetrics) AddRequestDuration(method string, d time.Duration, success bool) { func (m *objectServiceMetrics) AddRequestDuration(method string, d time.Duration, success bool, ioTag string) {
m.methodDuration.With(prometheus.Labels{ m.methodDuration.With(prometheus.Labels{
methodLabel: method, methodLabel: method,
successLabel: strconv.FormatBool(success), successLabel: strconv.FormatBool(success),
}).Observe(d.Seconds()) }).Observe(d.Seconds())
m.ioTagOpsCounter.With(prometheus.Labels{
ioTagLabel: ioTag,
methodLabel: method,
}).Inc()
} }
func (m *objectServiceMetrics) AddPayloadSize(method string, size int) { func (m *objectServiceMetrics) AddPayloadSize(method string, size int) {

52
internal/metrics/qos.go Normal file
View file

@ -0,0 +1,52 @@
package metrics
import (
"git.frostfs.info/TrueCloudLab/frostfs-observability/metrics"
"github.com/prometheus/client_golang/prometheus"
)
type QoSMetrics struct {
opsCounter *prometheus.GaugeVec
}
func newQoSMetrics() *QoSMetrics {
return &QoSMetrics{
opsCounter: metrics.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: qosSubsystem,
Name: "operations_total",
Help: "Count of pending, in progree, completed and failed due of resource exhausted error operations for each shard",
}, []string{shardIDLabel, operationLabel, ioTagLabel, typeLabel}),
}
}
func (m *QoSMetrics) SetOperationTagCounters(shardID, operation, tag string, pending, inProgress, completed, resourceExhausted uint64) {
m.opsCounter.With(prometheus.Labels{
shardIDLabel: shardID,
operationLabel: operation,
ioTagLabel: tag,
typeLabel: "pending",
}).Set(float64(pending))
m.opsCounter.With(prometheus.Labels{
shardIDLabel: shardID,
operationLabel: operation,
ioTagLabel: tag,
typeLabel: "in_progress",
}).Set(float64(inProgress))
m.opsCounter.With(prometheus.Labels{
shardIDLabel: shardID,
operationLabel: operation,
ioTagLabel: tag,
typeLabel: "completed",
}).Set(float64(completed))
m.opsCounter.With(prometheus.Labels{
shardIDLabel: shardID,
operationLabel: operation,
ioTagLabel: tag,
typeLabel: "resource_exhausted",
}).Set(float64(resourceExhausted))
}
func (m *QoSMetrics) Close(shardID string) {
m.opsCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
}

View file

@ -12,12 +12,14 @@ type TreeMetricsRegister interface {
AddReplicateTaskDuration(time.Duration, bool) AddReplicateTaskDuration(time.Duration, bool)
AddReplicateWaitDuration(time.Duration, bool) AddReplicateWaitDuration(time.Duration, bool)
AddSyncDuration(time.Duration, bool) AddSyncDuration(time.Duration, bool)
AddOperation(string, string)
} }
type treeServiceMetrics struct { type treeServiceMetrics struct {
replicateTaskDuration *prometheus.HistogramVec replicateTaskDuration *prometheus.HistogramVec
replicateWaitDuration *prometheus.HistogramVec replicateWaitDuration *prometheus.HistogramVec
syncOpDuration *prometheus.HistogramVec syncOpDuration *prometheus.HistogramVec
ioTagOpsCounter *prometheus.CounterVec
} }
var _ TreeMetricsRegister = (*treeServiceMetrics)(nil) var _ TreeMetricsRegister = (*treeServiceMetrics)(nil)
@ -42,6 +44,12 @@ func newTreeServiceMetrics() *treeServiceMetrics {
Name: "sync_duration_seconds", Name: "sync_duration_seconds",
Help: "Duration of synchronization operations", Help: "Duration of synchronization operations",
}, []string{successLabel}), }, []string{successLabel}),
ioTagOpsCounter: metrics.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: treeServiceSubsystem,
Name: "requests_total",
Help: "Count of requests for each IO tag",
}, []string{methodLabel, ioTagLabel}),
} }
} }
@ -62,3 +70,10 @@ func (m *treeServiceMetrics) AddSyncDuration(d time.Duration, success bool) {
successLabel: strconv.FormatBool(success), successLabel: strconv.FormatBool(success),
}).Observe(d.Seconds()) }).Observe(d.Seconds())
} }
func (m *treeServiceMetrics) AddOperation(op string, ioTag string) {
m.ioTagOpsCounter.With(prometheus.Labels{
ioTagLabel: ioTag,
methodLabel: op,
}).Inc()
}

View file

@ -4,6 +4,8 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"sync"
"sync/atomic"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits"
@ -15,6 +17,9 @@ import (
const ( const (
defaultIdleTimeout time.Duration = 0 defaultIdleTimeout time.Duration = 0
defaultShare float64 = 1.0 defaultShare float64 = 1.0
minusOne = ^uint64(0)
defaultMetricsCollectTimeout = 5 * time.Second
) )
type ReleaseFunc scheduling.ReleaseFunc type ReleaseFunc scheduling.ReleaseFunc
@ -22,6 +27,8 @@ type ReleaseFunc scheduling.ReleaseFunc
type Limiter interface { type Limiter interface {
ReadRequest(context.Context) (ReleaseFunc, error) ReadRequest(context.Context) (ReleaseFunc, error)
WriteRequest(context.Context) (ReleaseFunc, error) WriteRequest(context.Context) (ReleaseFunc, error)
SetParentID(string)
SetMetrics(Metrics)
Close() Close()
} }
@ -34,10 +41,6 @@ func NewLimiter(c *limits.Config) (Limiter, error) {
if err := validateConfig(c); err != nil { if err := validateConfig(c); err != nil {
return nil, err return nil, err
} }
read, write := c.Read(), c.Write()
if isNoop(read, write) {
return noopLimiterInstance, nil
}
readScheduler, err := createScheduler(c.Read()) readScheduler, err := createScheduler(c.Read())
if err != nil { if err != nil {
return nil, fmt.Errorf("create read scheduler: %w", err) return nil, fmt.Errorf("create read scheduler: %w", err)
@ -46,10 +49,18 @@ func NewLimiter(c *limits.Config) (Limiter, error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("create write scheduler: %w", err) return nil, fmt.Errorf("create write scheduler: %w", err)
} }
return &mClockLimiter{ l := &mClockLimiter{
readScheduler: readScheduler, readScheduler: readScheduler,
writeScheduler: writeScheduler, writeScheduler: writeScheduler,
}, nil closeCh: make(chan struct{}),
wg: &sync.WaitGroup{},
readStats: createStats(),
writeStats: createStats(),
}
l.shardID.Store(&shardID{})
l.metrics.Store(&metricsHolder{metrics: &noopMetrics{}})
l.startMetricsCollect()
return l, nil
} }
func createScheduler(config limits.OpConfig) (scheduler, error) { func createScheduler(config limits.OpConfig) (scheduler, error) {
@ -91,7 +102,7 @@ var (
) )
func NewNoopLimiter() Limiter { func NewNoopLimiter() Limiter {
return &noopLimiter{} return noopLimiterInstance
} }
type noopLimiter struct{} type noopLimiter struct{}
@ -104,43 +115,109 @@ func (n *noopLimiter) WriteRequest(context.Context) (ReleaseFunc, error) {
return releaseStub, nil return releaseStub, nil
} }
func (n *noopLimiter) SetParentID(string) {}
func (n *noopLimiter) Close() {} func (n *noopLimiter) Close() {}
func (n *noopLimiter) SetMetrics(Metrics) {}
var _ Limiter = (*mClockLimiter)(nil) var _ Limiter = (*mClockLimiter)(nil)
type shardID struct {
id string
}
type mClockLimiter struct { type mClockLimiter struct {
readScheduler scheduler readScheduler scheduler
writeScheduler scheduler writeScheduler scheduler
readStats map[string]*stat
writeStats map[string]*stat
shardID atomic.Pointer[shardID]
metrics atomic.Pointer[metricsHolder]
closeCh chan struct{}
wg *sync.WaitGroup
} }
func (n *mClockLimiter) ReadRequest(ctx context.Context) (ReleaseFunc, error) { func (n *mClockLimiter) ReadRequest(ctx context.Context) (ReleaseFunc, error) {
return requestArrival(ctx, n.readScheduler) return requestArrival(ctx, n.readScheduler, n.readStats)
} }
func (n *mClockLimiter) WriteRequest(ctx context.Context) (ReleaseFunc, error) { func (n *mClockLimiter) WriteRequest(ctx context.Context) (ReleaseFunc, error) {
return requestArrival(ctx, n.writeScheduler) return requestArrival(ctx, n.writeScheduler, n.writeStats)
} }
func requestArrival(ctx context.Context, s scheduler) (ReleaseFunc, error) { func requestArrival(ctx context.Context, s scheduler, stats map[string]*stat) (ReleaseFunc, error) {
tag, ok := tagging.IOTagFromContext(ctx) tag, ok := tagging.IOTagFromContext(ctx)
if !ok { if !ok {
tag = IOTagClient.String() tag = IOTagClient.String()
} }
stat := getStat(tag, stats)
stat.pending.Add(1)
if tag == IOTagCritical.String() { if tag == IOTagCritical.String() {
return releaseStub, nil stat.inProgress.Add(1)
return func() {
stat.completed.Add(1)
}, nil
} }
rel, err := s.RequestArrival(ctx, tag) rel, err := s.RequestArrival(ctx, tag)
stat.inProgress.Add(1)
if err != nil { if err != nil {
if errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) || if errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) ||
errors.Is(err, errSemaphoreLimitExceeded) { errors.Is(err, errSemaphoreLimitExceeded) {
stat.resourceExhausted.Add(1)
return nil, &apistatus.ResourceExhausted{} return nil, &apistatus.ResourceExhausted{}
} }
stat.completed.Add(1)
return nil, err return nil, err
} }
return ReleaseFunc(rel), nil return func() {
rel()
stat.completed.Add(1)
}, nil
} }
func (n *mClockLimiter) Close() { func (n *mClockLimiter) Close() {
n.readScheduler.Close() n.readScheduler.Close()
n.writeScheduler.Close() n.writeScheduler.Close()
close(n.closeCh)
n.wg.Wait()
n.metrics.Load().metrics.Close(n.shardID.Load().id)
}
func (n *mClockLimiter) SetParentID(parentID string) {
n.shardID.Store(&shardID{id: parentID})
}
func (n *mClockLimiter) SetMetrics(m Metrics) {
n.metrics.Store(&metricsHolder{metrics: m})
}
func (n *mClockLimiter) startMetricsCollect() {
n.wg.Add(1)
go func() {
defer n.wg.Done()
ticker := time.NewTicker(defaultMetricsCollectTimeout)
defer ticker.Stop()
for {
select {
case <-n.closeCh:
return
case <-ticker.C:
shardID := n.shardID.Load().id
if shardID == "" {
continue
}
metrics := n.metrics.Load().metrics
for tag, s := range n.readStats {
metrics.SetOperationTagCounters(shardID, "read", tag, s.pending.Load(), s.inProgress.Load(), s.completed.Load(), s.resourceExhausted.Load())
}
for tag, s := range n.writeStats {
metrics.SetOperationTagCounters(shardID, "write", tag, s.pending.Load(), s.inProgress.Load(), s.completed.Load(), s.resourceExhausted.Load())
}
}
}
}()
} }

31
internal/qos/metrics.go Normal file
View file

@ -0,0 +1,31 @@
package qos
import "sync/atomic"
type Metrics interface {
SetOperationTagCounters(shardID, operation, tag string, pending, inProgress, completed, resourceExhausted uint64)
Close(shardID string)
}
var _ Metrics = (*noopMetrics)(nil)
type noopMetrics struct{}
func (n *noopMetrics) SetOperationTagCounters(string, string, string, uint64, uint64, uint64, uint64) {
}
func (n *noopMetrics) Close(string) {}
// stat presents limiter statistics cumulative counters.
//
// Each operation changes its status as follows: `pending` -> `in_progress` -> `completed` or `resource_exhausted`.
type stat struct {
completed atomic.Uint64
pending atomic.Uint64
resourceExhausted atomic.Uint64
inProgress atomic.Uint64
}
type metricsHolder struct {
metrics Metrics
}

28
internal/qos/stats.go Normal file
View file

@ -0,0 +1,28 @@
package qos
const unknownStatsTag = "unknown"
var statTags = map[string]struct{}{
IOTagClient.String(): {},
IOTagBackground.String(): {},
IOTagInternal.String(): {},
IOTagPolicer.String(): {},
IOTagWritecache.String(): {},
IOTagCritical.String(): {},
unknownStatsTag: {},
}
func createStats() map[string]*stat {
result := make(map[string]*stat)
for tag := range statTags {
result[tag] = &stat{}
}
return result
}
func getStat(tag string, stats map[string]*stat) *stat {
if v, ok := stats[tag]; ok {
return v
}
return stats[unknownStatsTag]
}

View file

@ -1,6 +1,11 @@
package qos package qos
import "fmt" import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
)
type IOTag string type IOTag string
@ -37,3 +42,11 @@ func FromRawString(s string) (IOTag, error) {
func (t IOTag) String() string { func (t IOTag) String() string {
return string(t) return string(t)
} }
func IOTagFromContext(ctx context.Context) string {
tag, ok := tagging.IOTagFromContext(ctx)
if !ok {
tag = "undefined"
}
return tag
}

View file

@ -90,12 +90,3 @@ func float64Value(f *float64) float64 {
} }
return *f return *f
} }
func isNoop(read, write limits.OpConfig) bool {
return read.MaxRunningOps == limits.NoLimit &&
read.MaxWaitingOps == limits.NoLimit &&
write.MaxRunningOps == limits.NoLimit &&
write.MaxWaitingOps == limits.NoLimit &&
len(read.Tags) == 0 &&
len(write.Tags) == 0
}

View file

@ -163,6 +163,8 @@ type testQoSLimiter struct {
write atomic.Int64 write atomic.Int64
} }
func (t *testQoSLimiter) SetMetrics(qos.Metrics) {}
func (t *testQoSLimiter) Close() { func (t *testQoSLimiter) Close() {
require.Equal(t.t, int64(0), t.read.Load(), "read requests count after limiter close must be 0") require.Equal(t.t, int64(0), t.read.Load(), "read requests count after limiter close must be 0")
require.Equal(t.t, int64(0), t.write.Load(), "write requests count after limiter close must be 0") require.Equal(t.t, int64(0), t.write.Load(), "write requests count after limiter close must be 0")
@ -177,3 +179,5 @@ func (t *testQoSLimiter) WriteRequest(context.Context) (qos.ReleaseFunc, error)
t.write.Add(1) t.write.Add(1)
return func() { t.write.Add(-1) }, nil return func() { t.write.Add(-1) }, nil
} }
func (t *testQoSLimiter) SetParentID(string) {}

View file

@ -61,6 +61,7 @@ func (s *Shard) UpdateID(ctx context.Context) (err error) {
if s.pilorama != nil { if s.pilorama != nil {
s.pilorama.SetParentID(s.info.ID.String()) s.pilorama.SetParentID(s.info.ID.String())
} }
s.opsLimiter.SetParentID(s.info.ID.String())
if len(idFromMetabase) == 0 && !modeDegraded { if len(idFromMetabase) == 0 && !modeDegraded {
if setErr := s.metaBase.SetShardID(ctx, *s.info.ID, s.GetMode()); setErr != nil { if setErr := s.metaBase.SetShardID(ctx, *s.info.ID, s.GetMode()); setErr != nil {

View file

@ -4,6 +4,7 @@ import (
"context" "context"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
) )
@ -34,7 +35,7 @@ type (
} }
MetricRegister interface { MetricRegister interface {
AddRequestDuration(string, time.Duration, bool) AddRequestDuration(string, time.Duration, bool, string)
AddPayloadSize(string, int) AddPayloadSize(string, int)
} }
) )
@ -51,7 +52,7 @@ func (m MetricCollector) Get(req *object.GetRequest, stream GetObjectStream) (er
if m.enabled { if m.enabled {
t := time.Now() t := time.Now()
defer func() { defer func() {
m.metrics.AddRequestDuration("Get", time.Since(t), err == nil) m.metrics.AddRequestDuration("Get", time.Since(t), err == nil, qos.IOTagFromContext(stream.Context()))
}() }()
err = m.next.Get(req, &getStreamMetric{ err = m.next.Get(req, &getStreamMetric{
ServerStream: stream, ServerStream: stream,
@ -106,7 +107,7 @@ func (m MetricCollector) PutSingle(ctx context.Context, request *object.PutSingl
res, err := m.next.PutSingle(ctx, request) res, err := m.next.PutSingle(ctx, request)
m.metrics.AddRequestDuration("PutSingle", time.Since(t), err == nil) m.metrics.AddRequestDuration("PutSingle", time.Since(t), err == nil, qos.IOTagFromContext(ctx))
if err == nil { if err == nil {
m.metrics.AddPayloadSize("PutSingle", len(request.GetBody().GetObject().GetPayload())) m.metrics.AddPayloadSize("PutSingle", len(request.GetBody().GetObject().GetPayload()))
} }
@ -122,7 +123,7 @@ func (m MetricCollector) Head(ctx context.Context, request *object.HeadRequest)
res, err := m.next.Head(ctx, request) res, err := m.next.Head(ctx, request)
m.metrics.AddRequestDuration("Head", time.Since(t), err == nil) m.metrics.AddRequestDuration("Head", time.Since(t), err == nil, qos.IOTagFromContext(ctx))
return res, err return res, err
} }
@ -135,7 +136,7 @@ func (m MetricCollector) Search(req *object.SearchRequest, stream SearchStream)
err := m.next.Search(req, stream) err := m.next.Search(req, stream)
m.metrics.AddRequestDuration("Search", time.Since(t), err == nil) m.metrics.AddRequestDuration("Search", time.Since(t), err == nil, qos.IOTagFromContext(stream.Context()))
return err return err
} }
@ -148,7 +149,7 @@ func (m MetricCollector) Delete(ctx context.Context, request *object.DeleteReque
res, err := m.next.Delete(ctx, request) res, err := m.next.Delete(ctx, request)
m.metrics.AddRequestDuration("Delete", time.Since(t), err == nil) m.metrics.AddRequestDuration("Delete", time.Since(t), err == nil, qos.IOTagFromContext(ctx))
return res, err return res, err
} }
return m.next.Delete(ctx, request) return m.next.Delete(ctx, request)
@ -160,7 +161,7 @@ func (m MetricCollector) GetRange(req *object.GetRangeRequest, stream GetObjectR
err := m.next.GetRange(req, stream) err := m.next.GetRange(req, stream)
m.metrics.AddRequestDuration("GetRange", time.Since(t), err == nil) m.metrics.AddRequestDuration("GetRange", time.Since(t), err == nil, qos.IOTagFromContext(stream.Context()))
return err return err
} }
@ -173,7 +174,7 @@ func (m MetricCollector) GetRangeHash(ctx context.Context, request *object.GetRa
res, err := m.next.GetRangeHash(ctx, request) res, err := m.next.GetRangeHash(ctx, request)
m.metrics.AddRequestDuration("GetRangeHash", time.Since(t), err == nil) m.metrics.AddRequestDuration("GetRangeHash", time.Since(t), err == nil, qos.IOTagFromContext(ctx))
return res, err return res, err
} }
@ -209,7 +210,7 @@ func (s putStreamMetric) Send(ctx context.Context, req *object.PutRequest) error
func (s putStreamMetric) CloseAndRecv(ctx context.Context) (*object.PutResponse, error) { func (s putStreamMetric) CloseAndRecv(ctx context.Context) (*object.PutResponse, error) {
res, err := s.stream.CloseAndRecv(ctx) res, err := s.stream.CloseAndRecv(ctx)
s.metrics.AddRequestDuration("Put", time.Since(s.start), err == nil) s.metrics.AddRequestDuration("Put", time.Since(s.start), err == nil, qos.IOTagFromContext(ctx))
return res, err return res, err
} }
@ -223,7 +224,7 @@ func (s patchStreamMetric) Send(ctx context.Context, req *object.PatchRequest) e
func (s patchStreamMetric) CloseAndRecv(ctx context.Context) (*object.PatchResponse, error) { func (s patchStreamMetric) CloseAndRecv(ctx context.Context) (*object.PatchResponse, error) {
res, err := s.stream.CloseAndRecv(ctx) res, err := s.stream.CloseAndRecv(ctx)
s.metrics.AddRequestDuration("Patch", time.Since(s.start), err == nil) s.metrics.AddRequestDuration("Patch", time.Since(s.start), err == nil, qos.IOTagFromContext(ctx))
return res, err return res, err
} }

View file

@ -6,6 +6,7 @@ type MetricsRegister interface {
AddReplicateTaskDuration(time.Duration, bool) AddReplicateTaskDuration(time.Duration, bool)
AddReplicateWaitDuration(time.Duration, bool) AddReplicateWaitDuration(time.Duration, bool)
AddSyncDuration(time.Duration, bool) AddSyncDuration(time.Duration, bool)
AddOperation(string, string)
} }
type defaultMetricsRegister struct{} type defaultMetricsRegister struct{}
@ -13,3 +14,4 @@ type defaultMetricsRegister struct{}
func (defaultMetricsRegister) AddReplicateTaskDuration(time.Duration, bool) {} func (defaultMetricsRegister) AddReplicateTaskDuration(time.Duration, bool) {}
func (defaultMetricsRegister) AddReplicateWaitDuration(time.Duration, bool) {} func (defaultMetricsRegister) AddReplicateWaitDuration(time.Duration, bool) {}
func (defaultMetricsRegister) AddSyncDuration(time.Duration, bool) {} func (defaultMetricsRegister) AddSyncDuration(time.Duration, bool) {}
func (defaultMetricsRegister) AddOperation(string, string) {}

View file

@ -105,6 +105,7 @@ func (s *Service) Shutdown() {
} }
func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) { func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) {
defer s.metrics.AddOperation("Add", qos.IOTagFromContext(ctx))
if !s.initialSyncDone.Load() { if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing return nil, ErrAlreadySyncing
} }
@ -148,6 +149,7 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error
} }
func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) { func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) {
defer s.metrics.AddOperation("AddByPath", qos.IOTagFromContext(ctx))
if !s.initialSyncDone.Load() { if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing return nil, ErrAlreadySyncing
} }
@ -203,6 +205,7 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP
} }
func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) { func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) {
defer s.metrics.AddOperation("Remove", qos.IOTagFromContext(ctx))
if !s.initialSyncDone.Load() { if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing return nil, ErrAlreadySyncing
} }
@ -247,6 +250,7 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
// Move applies client operation to the specified tree and pushes in queue // Move applies client operation to the specified tree and pushes in queue
// for replication on other nodes. // for replication on other nodes.
func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) { func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) {
defer s.metrics.AddOperation("Move", qos.IOTagFromContext(ctx))
if !s.initialSyncDone.Load() { if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing return nil, ErrAlreadySyncing
} }
@ -290,6 +294,7 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er
} }
func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) { func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) {
defer s.metrics.AddOperation("GetNodeByPath", qos.IOTagFromContext(ctx))
if !s.initialSyncDone.Load() { if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing return nil, ErrAlreadySyncing
} }
@ -363,6 +368,7 @@ func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest)
} }
func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error { func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error {
defer s.metrics.AddOperation("GetSubTree", qos.IOTagFromContext(srv.Context()))
if !s.initialSyncDone.Load() { if !s.initialSyncDone.Load() {
return ErrAlreadySyncing return ErrAlreadySyncing
} }
@ -590,6 +596,7 @@ func sortByFilename(nodes []pilorama.NodeInfo, d GetSubTreeRequest_Body_Order_Di
// Apply locally applies operation from the remote node to the tree. // Apply locally applies operation from the remote node to the tree.
func (s *Service) Apply(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) { func (s *Service) Apply(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) {
defer s.metrics.AddOperation("Apply", qos.IOTagFromContext(ctx))
err := verifyMessage(req) err := verifyMessage(req)
if err != nil { if err != nil {
return nil, err return nil, err
@ -633,6 +640,7 @@ func (s *Service) Apply(ctx context.Context, req *ApplyRequest) (*ApplyResponse,
} }
func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error { func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {
defer s.metrics.AddOperation("GetOpLog", qos.IOTagFromContext(srv.Context()))
if !s.initialSyncDone.Load() { if !s.initialSyncDone.Load() {
return ErrAlreadySyncing return ErrAlreadySyncing
} }
@ -697,6 +705,7 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer)
} }
func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) { func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) {
defer s.metrics.AddOperation("TreeList", qos.IOTagFromContext(ctx))
if !s.initialSyncDone.Load() { if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing return nil, ErrAlreadySyncing
} }