[#324] Add replicator metrics

Signed-off-by: Alejandro Lopez <a.lopez@yadro.com>
This commit is contained in:
Alejandro Lopez 2023-05-12 11:44:21 +03:00
parent a181c9e434
commit 9f24c8666f
6 changed files with 86 additions and 0 deletions

View file

@ -267,6 +267,7 @@ func createReplicator(c *cfg, keyStorage *util.KeyStorage, cache *cache.ClientCa
replicator.WithRemoteSender( replicator.WithRemoteSender(
putsvc.NewRemoteSender(keyStorage, cache), putsvc.NewRemoteSender(keyStorage, cache),
), ),
replicator.WithMetrics(c.metricsCollector),
) )
} }

View file

@ -8,6 +8,7 @@ type NodeMetrics struct {
objectServiceMetrics objectServiceMetrics
engineMetrics engineMetrics
stateMetrics stateMetrics
replicatorMetrics
epoch metric[prometheus.Gauge] epoch metric[prometheus.Gauge]
} }
@ -21,6 +22,9 @@ func NewNodeMetrics() *NodeMetrics {
state := newStateMetrics() state := newStateMetrics()
state.register() state.register()
replicator := newReplicatorMetrics()
replicator.register()
epoch := newGauge(prometheus.GaugeOpts{ epoch := newGauge(prometheus.GaugeOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: innerRingSubsystem, Subsystem: innerRingSubsystem,
@ -33,6 +37,7 @@ func NewNodeMetrics() *NodeMetrics {
objectServiceMetrics: objectService, objectServiceMetrics: objectService,
engineMetrics: engine, engineMetrics: engine,
stateMetrics: state, stateMetrics: state,
replicatorMetrics: replicator,
epoch: epoch, epoch: epoch,
} }
} }

59
pkg/metrics/replicator.go Normal file
View file

@ -0,0 +1,59 @@
package metrics
import "github.com/prometheus/client_golang/prometheus"
const replicatorSubsystem = "replicator"
type replicatorMetrics struct {
inFlightRequests metric[prometheus.Gauge]
processedObjects metric[prometheus.Counter]
totalReplicatedPayloadSize metric[prometheus.Counter]
}
func (m replicatorMetrics) IncInFlightRequest() {
m.inFlightRequests.value.Inc()
}
func (m replicatorMetrics) DecInFlightRequest() {
m.inFlightRequests.value.Dec()
}
func (m replicatorMetrics) IncProcessedObjects() {
m.processedObjects.value.Inc()
}
func (m replicatorMetrics) AddPayloadSize(size int64) {
m.totalReplicatedPayloadSize.value.Add(float64(size))
}
func newReplicatorMetrics() replicatorMetrics {
return replicatorMetrics{
inFlightRequests: newReplicatorGauge("in_flight_requests", "Number of in-flight requests"),
processedObjects: newReplicatorCounter("processed_objects", "Number of objects processed since the node startup"),
totalReplicatedPayloadSize: newReplicatorCounter("total_replicated_payload_size", "Total size of payloads replicated"),
}
}
func (m replicatorMetrics) register() {
mustRegister(m.inFlightRequests)
mustRegister(m.processedObjects)
mustRegister(m.totalReplicatedPayloadSize)
}
func newReplicatorCounter(name, help string) metric[prometheus.Counter] {
return newCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: replicatorSubsystem,
Name: name,
Help: help,
})
}
func newReplicatorGauge(name, help string) metric[prometheus.Gauge] {
return newGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: replicatorSubsystem,
Name: name,
Help: help,
})
}

View file

@ -0,0 +1,8 @@
package replicator
type MetricsRegister interface {
IncInFlightRequest()
DecInFlightRequest()
IncProcessedObjects()
AddPayloadSize(size int64)
}

View file

@ -20,6 +20,8 @@ type TaskResult interface {
// HandleTask executes replication task inside invoking goroutine. // HandleTask executes replication task inside invoking goroutine.
// Passes all the nodes that accepted the replication to the TaskResult. // Passes all the nodes that accepted the replication to the TaskResult.
func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult) { func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult) {
p.metrics.IncInFlightRequest()
defer p.metrics.DecInFlightRequest()
defer func() { defer func() {
p.log.Debug(logs.ReplicatorFinishWork, p.log.Debug(logs.ReplicatorFinishWork,
zap.Uint32("amount of unfinished replicas", task.quantity), zap.Uint32("amount of unfinished replicas", task.quantity),
@ -69,6 +71,9 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
task.quantity-- task.quantity--
res.SubmitSuccessfulReplication(task.nodes[i]) res.SubmitSuccessfulReplication(task.nodes[i])
p.metrics.IncProcessedObjects()
p.metrics.AddPayloadSize(int64(task.obj.PayloadSize()))
} }
} }
} }

View file

@ -26,6 +26,8 @@ type cfg struct {
remoteSender *putsvc.RemoteSender remoteSender *putsvc.RemoteSender
localStorage *engine.StorageEngine localStorage *engine.StorageEngine
metrics MetricsRegister
} }
func defaultCfg() *cfg { func defaultCfg() *cfg {
@ -74,3 +76,9 @@ func WithLocalStorage(v *engine.StorageEngine) Option {
c.localStorage = v c.localStorage = v
} }
} }
func WithMetrics(v MetricsRegister) Option {
return func(c *cfg) {
c.metrics = v
}
}