forked from TrueCloudLab/frostfs-node
WIP: Morph: Add unit tests #2
6 changed files with 86 additions and 0 deletions
|
@ -267,6 +267,7 @@ func createReplicator(c *cfg, keyStorage *util.KeyStorage, cache *cache.ClientCa
|
|||
replicator.WithRemoteSender(
|
||||
putsvc.NewRemoteSender(keyStorage, cache),
|
||||
),
|
||||
replicator.WithMetrics(c.metricsCollector),
|
||||
)
|
||||
}
|
||||
|
||||
|
|
|
@ -8,6 +8,7 @@ type NodeMetrics struct {
|
|||
objectServiceMetrics
|
||||
engineMetrics
|
||||
stateMetrics
|
||||
replicatorMetrics
|
||||
epoch metric[prometheus.Gauge]
|
||||
}
|
||||
|
||||
|
@ -21,6 +22,9 @@ func NewNodeMetrics() *NodeMetrics {
|
|||
state := newStateMetrics()
|
||||
state.register()
|
||||
|
||||
replicator := newReplicatorMetrics()
|
||||
replicator.register()
|
||||
|
||||
epoch := newGauge(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: innerRingSubsystem,
|
||||
|
@ -33,6 +37,7 @@ func NewNodeMetrics() *NodeMetrics {
|
|||
objectServiceMetrics: objectService,
|
||||
engineMetrics: engine,
|
||||
stateMetrics: state,
|
||||
replicatorMetrics: replicator,
|
||||
epoch: epoch,
|
||||
}
|
||||
}
|
||||
|
|
59
pkg/metrics/replicator.go
Normal file
59
pkg/metrics/replicator.go
Normal file
|
@ -0,0 +1,59 @@
|
|||
package metrics
|
||||
|
||||
import "github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
const replicatorSubsystem = "replicator"
|
||||
|
||||
type replicatorMetrics struct {
|
||||
inFlightRequests metric[prometheus.Gauge]
|
||||
processedObjects metric[prometheus.Counter]
|
||||
totalReplicatedPayloadSize metric[prometheus.Counter]
|
||||
}
|
||||
|
||||
func (m replicatorMetrics) IncInFlightRequest() {
|
||||
m.inFlightRequests.value.Inc()
|
||||
}
|
||||
|
||||
func (m replicatorMetrics) DecInFlightRequest() {
|
||||
m.inFlightRequests.value.Dec()
|
||||
}
|
||||
|
||||
func (m replicatorMetrics) IncProcessedObjects() {
|
||||
m.processedObjects.value.Inc()
|
||||
}
|
||||
|
||||
func (m replicatorMetrics) AddPayloadSize(size int64) {
|
||||
m.totalReplicatedPayloadSize.value.Add(float64(size))
|
||||
}
|
||||
|
||||
func newReplicatorMetrics() replicatorMetrics {
|
||||
return replicatorMetrics{
|
||||
inFlightRequests: newReplicatorGauge("in_flight_requests", "Number of in-flight requests"),
|
||||
processedObjects: newReplicatorCounter("processed_objects", "Number of objects processed since the node startup"),
|
||||
totalReplicatedPayloadSize: newReplicatorCounter("total_replicated_payload_size", "Total size of payloads replicated"),
|
||||
}
|
||||
}
|
||||
|
||||
func (m replicatorMetrics) register() {
|
||||
mustRegister(m.inFlightRequests)
|
||||
mustRegister(m.processedObjects)
|
||||
mustRegister(m.totalReplicatedPayloadSize)
|
||||
}
|
||||
|
||||
func newReplicatorCounter(name, help string) metric[prometheus.Counter] {
|
||||
return newCounter(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: replicatorSubsystem,
|
||||
Name: name,
|
||||
Help: help,
|
||||
})
|
||||
}
|
||||
|
||||
func newReplicatorGauge(name, help string) metric[prometheus.Gauge] {
|
||||
return newGauge(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: replicatorSubsystem,
|
||||
Name: name,
|
||||
Help: help,
|
||||
})
|
||||
}
|
8
pkg/services/replicator/metrics.go
Normal file
8
pkg/services/replicator/metrics.go
Normal file
|
@ -0,0 +1,8 @@
|
|||
package replicator
|
||||
|
||||
type MetricsRegister interface {
|
||||
IncInFlightRequest()
|
||||
DecInFlightRequest()
|
||||
IncProcessedObjects()
|
||||
AddPayloadSize(size int64)
|
||||
}
|
|
@ -20,6 +20,8 @@ type TaskResult interface {
|
|||
// HandleTask executes replication task inside invoking goroutine.
|
||||
// Passes all the nodes that accepted the replication to the TaskResult.
|
||||
func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult) {
|
||||
p.metrics.IncInFlightRequest()
|
||||
defer p.metrics.DecInFlightRequest()
|
||||
defer func() {
|
||||
p.log.Debug(logs.ReplicatorFinishWork,
|
||||
zap.Uint32("amount of unfinished replicas", task.quantity),
|
||||
|
@ -69,6 +71,9 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
|
|||
task.quantity--
|
||||
|
||||
res.SubmitSuccessfulReplication(task.nodes[i])
|
||||
|
||||
p.metrics.IncProcessedObjects()
|
||||
p.metrics.AddPayloadSize(int64(task.obj.PayloadSize()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,8 @@ type cfg struct {
|
|||
remoteSender *putsvc.RemoteSender
|
||||
|
||||
localStorage *engine.StorageEngine
|
||||
|
||||
metrics MetricsRegister
|
||||
}
|
||||
|
||||
func defaultCfg() *cfg {
|
||||
|
@ -74,3 +76,9 @@ func WithLocalStorage(v *engine.StorageEngine) Option {
|
|||
c.localStorage = v
|
||||
}
|
||||
}
|
||||
|
||||
func WithMetrics(v MetricsRegister) Option {
|
||||
return func(c *cfg) {
|
||||
c.metrics = v
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue