[#1653] treeSvc: Add operations by IO tag metric

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2025-03-05 15:44:15 +03:00 committed by Dmitrii Stepanov
parent 4ed2bbdb0f
commit 597bce7a87
3 changed files with 26 additions and 0 deletions

View file

@ -12,12 +12,14 @@ type TreeMetricsRegister interface {
AddReplicateTaskDuration(time.Duration, bool) AddReplicateTaskDuration(time.Duration, bool)
AddReplicateWaitDuration(time.Duration, bool) AddReplicateWaitDuration(time.Duration, bool)
AddSyncDuration(time.Duration, bool) AddSyncDuration(time.Duration, bool)
AddOperation(string, string)
} }
type treeServiceMetrics struct { type treeServiceMetrics struct {
replicateTaskDuration *prometheus.HistogramVec replicateTaskDuration *prometheus.HistogramVec
replicateWaitDuration *prometheus.HistogramVec replicateWaitDuration *prometheus.HistogramVec
syncOpDuration *prometheus.HistogramVec syncOpDuration *prometheus.HistogramVec
ioTagOpsCounter *prometheus.CounterVec
} }
var _ TreeMetricsRegister = (*treeServiceMetrics)(nil) var _ TreeMetricsRegister = (*treeServiceMetrics)(nil)
@ -42,6 +44,12 @@ func newTreeServiceMetrics() *treeServiceMetrics {
Name: "sync_duration_seconds", Name: "sync_duration_seconds",
Help: "Duration of synchronization operations", Help: "Duration of synchronization operations",
}, []string{successLabel}), }, []string{successLabel}),
ioTagOpsCounter: metrics.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: treeServiceSubsystem,
Name: "requests_total",
Help: "Count of requests for each IO tag",
}, []string{methodLabel, ioTagLabel}),
} }
} }
@ -62,3 +70,10 @@ func (m *treeServiceMetrics) AddSyncDuration(d time.Duration, success bool) {
successLabel: strconv.FormatBool(success), successLabel: strconv.FormatBool(success),
}).Observe(d.Seconds()) }).Observe(d.Seconds())
} }
func (m *treeServiceMetrics) AddOperation(op string, ioTag string) {
m.ioTagOpsCounter.With(prometheus.Labels{
ioTagLabel: ioTag,
methodLabel: op,
}).Inc()
}

View file

@ -6,6 +6,7 @@ type MetricsRegister interface {
AddReplicateTaskDuration(time.Duration, bool) AddReplicateTaskDuration(time.Duration, bool)
AddReplicateWaitDuration(time.Duration, bool) AddReplicateWaitDuration(time.Duration, bool)
AddSyncDuration(time.Duration, bool) AddSyncDuration(time.Duration, bool)
AddOperation(string, string)
} }
type defaultMetricsRegister struct{} type defaultMetricsRegister struct{}
@ -13,3 +14,4 @@ type defaultMetricsRegister struct{}
func (defaultMetricsRegister) AddReplicateTaskDuration(time.Duration, bool) {} func (defaultMetricsRegister) AddReplicateTaskDuration(time.Duration, bool) {}
func (defaultMetricsRegister) AddReplicateWaitDuration(time.Duration, bool) {} func (defaultMetricsRegister) AddReplicateWaitDuration(time.Duration, bool) {}
func (defaultMetricsRegister) AddSyncDuration(time.Duration, bool) {} func (defaultMetricsRegister) AddSyncDuration(time.Duration, bool) {}
func (defaultMetricsRegister) AddOperation(string, string) {}

View file

@ -105,6 +105,7 @@ func (s *Service) Shutdown() {
} }
func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) { func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) {
defer s.metrics.AddOperation("Add", qos.IOTagFromContext(ctx))
if !s.initialSyncDone.Load() { if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing return nil, ErrAlreadySyncing
} }
@ -148,6 +149,7 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error
} }
func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) { func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) {
defer s.metrics.AddOperation("AddByPath", qos.IOTagFromContext(ctx))
if !s.initialSyncDone.Load() { if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing return nil, ErrAlreadySyncing
} }
@ -203,6 +205,7 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP
} }
func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) { func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) {
defer s.metrics.AddOperation("Remove", qos.IOTagFromContext(ctx))
if !s.initialSyncDone.Load() { if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing return nil, ErrAlreadySyncing
} }
@ -247,6 +250,7 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
// Move applies client operation to the specified tree and pushes in queue // Move applies client operation to the specified tree and pushes in queue
// for replication on other nodes. // for replication on other nodes.
func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) { func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) {
defer s.metrics.AddOperation("Move", qos.IOTagFromContext(ctx))
if !s.initialSyncDone.Load() { if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing return nil, ErrAlreadySyncing
} }
@ -290,6 +294,7 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er
} }
func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) { func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) {
defer s.metrics.AddOperation("GetNodeByPath", qos.IOTagFromContext(ctx))
if !s.initialSyncDone.Load() { if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing return nil, ErrAlreadySyncing
} }
@ -363,6 +368,7 @@ func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest)
} }
func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error { func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error {
defer s.metrics.AddOperation("GetSubTree", qos.IOTagFromContext(srv.Context()))
if !s.initialSyncDone.Load() { if !s.initialSyncDone.Load() {
return ErrAlreadySyncing return ErrAlreadySyncing
} }
@ -590,6 +596,7 @@ func sortByFilename(nodes []pilorama.NodeInfo, d GetSubTreeRequest_Body_Order_Di
// Apply locally applies operation from the remote node to the tree. // Apply locally applies operation from the remote node to the tree.
func (s *Service) Apply(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) { func (s *Service) Apply(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) {
defer s.metrics.AddOperation("Apply", qos.IOTagFromContext(ctx))
err := verifyMessage(req) err := verifyMessage(req)
if err != nil { if err != nil {
return nil, err return nil, err
@ -633,6 +640,7 @@ func (s *Service) Apply(ctx context.Context, req *ApplyRequest) (*ApplyResponse,
} }
func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error { func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {
defer s.metrics.AddOperation("GetOpLog", qos.IOTagFromContext(srv.Context()))
if !s.initialSyncDone.Load() { if !s.initialSyncDone.Load() {
return ErrAlreadySyncing return ErrAlreadySyncing
} }
@ -697,6 +705,7 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer)
} }
func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) { func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) {
defer s.metrics.AddOperation("TreeList", qos.IOTagFromContext(ctx))
if !s.initialSyncDone.Load() { if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing return nil, ErrAlreadySyncing
} }