diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index 4ff9b8522..1ce330cb5 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -267,6 +267,7 @@ func createReplicator(c *cfg, keyStorage *util.KeyStorage, cache *cache.ClientCa replicator.WithRemoteSender( putsvc.NewRemoteSender(keyStorage, cache), ), + replicator.WithMetrics(c.metricsCollector), ) } diff --git a/pkg/metrics/node.go b/pkg/metrics/node.go index 0f9c6183d..bf12e610f 100644 --- a/pkg/metrics/node.go +++ b/pkg/metrics/node.go @@ -8,6 +8,7 @@ type NodeMetrics struct { objectServiceMetrics engineMetrics stateMetrics + replicatorMetrics epoch metric[prometheus.Gauge] } @@ -21,6 +22,9 @@ func NewNodeMetrics() *NodeMetrics { state := newStateMetrics() state.register() + replicator := newReplicatorMetrics() + replicator.register() + epoch := newGauge(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: innerRingSubsystem, @@ -33,6 +37,7 @@ func NewNodeMetrics() *NodeMetrics { objectServiceMetrics: objectService, engineMetrics: engine, stateMetrics: state, + replicatorMetrics: replicator, epoch: epoch, } } diff --git a/pkg/metrics/replicator.go b/pkg/metrics/replicator.go new file mode 100644 index 000000000..55f736c66 --- /dev/null +++ b/pkg/metrics/replicator.go @@ -0,0 +1,59 @@ +package metrics + +import "github.com/prometheus/client_golang/prometheus" + +const replicatorSubsystem = "replicator" + +type replicatorMetrics struct { + inFlightRequests metric[prometheus.Gauge] + processedObjects metric[prometheus.Counter] + totalReplicatedPayloadSize metric[prometheus.Counter] +} + +func (m replicatorMetrics) IncInFlightRequest() { + m.inFlightRequests.value.Inc() +} + +func (m replicatorMetrics) DecInFlightRequest() { + m.inFlightRequests.value.Dec() +} + +func (m replicatorMetrics) IncProcessedObjects() { + m.processedObjects.value.Inc() +} + +func (m replicatorMetrics) AddPayloadSize(size int64) { + m.totalReplicatedPayloadSize.value.Add(float64(size)) +} + +func newReplicatorMetrics() replicatorMetrics { + return replicatorMetrics{ + inFlightRequests: newReplicatorGauge("in_flight_requests", "Number of in-flight requests"), + processedObjects: newReplicatorCounter("processed_objects", "Number of objects processed since the node startup"), + totalReplicatedPayloadSize: newReplicatorCounter("total_replicated_payload_size", "Total size of payloads replicated"), + } +} + +func (m replicatorMetrics) register() { + mustRegister(m.inFlightRequests) + mustRegister(m.processedObjects) + mustRegister(m.totalReplicatedPayloadSize) +} + +func newReplicatorCounter(name, help string) metric[prometheus.Counter] { + return newCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: replicatorSubsystem, + Name: name, + Help: help, + }) +} + +func newReplicatorGauge(name, help string) metric[prometheus.Gauge] { + return newGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: replicatorSubsystem, + Name: name, + Help: help, + }) +} diff --git a/pkg/services/replicator/metrics.go b/pkg/services/replicator/metrics.go new file mode 100644 index 000000000..3fc062926 --- /dev/null +++ b/pkg/services/replicator/metrics.go @@ -0,0 +1,8 @@ +package replicator + +type MetricsRegister interface { + IncInFlightRequest() + DecInFlightRequest() + IncProcessedObjects() + AddPayloadSize(size int64) +} diff --git a/pkg/services/replicator/process.go b/pkg/services/replicator/process.go index 46e0c9468..0f82ff232 100644 --- a/pkg/services/replicator/process.go +++ b/pkg/services/replicator/process.go @@ -20,6 +20,8 @@ type TaskResult interface { // HandleTask executes replication task inside invoking goroutine. // Passes all the nodes that accepted the replication to the TaskResult. func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult) { + p.metrics.IncInFlightRequest() + defer p.metrics.DecInFlightRequest() defer func() { p.log.Debug(logs.ReplicatorFinishWork, zap.Uint32("amount of unfinished replicas", task.quantity), @@ -69,6 +71,9 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult) task.quantity-- res.SubmitSuccessfulReplication(task.nodes[i]) + + p.metrics.IncProcessedObjects() + p.metrics.AddPayloadSize(int64(task.obj.PayloadSize())) } } } diff --git a/pkg/services/replicator/replicator.go b/pkg/services/replicator/replicator.go index 493982100..bb817cb32 100644 --- a/pkg/services/replicator/replicator.go +++ b/pkg/services/replicator/replicator.go @@ -26,6 +26,8 @@ type cfg struct { remoteSender *putsvc.RemoteSender localStorage *engine.StorageEngine + + metrics MetricsRegister } func defaultCfg() *cfg { @@ -74,3 +76,9 @@ func WithLocalStorage(v *engine.StorageEngine) Option { c.localStorage = v } } + +func WithMetrics(v MetricsRegister) Option { + return func(c *cfg) { + c.metrics = v + } +}