forked from TrueCloudLab/frostfs-node
WIP: Morph: Add unit tests #2
10 changed files with 173 additions and 285 deletions
|
@ -679,7 +679,7 @@ func (c *cfg) engineOpts() []engine.Option {
|
|||
)
|
||||
|
||||
if c.metricsCollector != nil {
|
||||
opts = append(opts, engine.WithMetrics(c.metricsCollector))
|
||||
opts = append(opts, engine.WithMetrics(c.metricsCollector.Engine()))
|
||||
}
|
||||
|
||||
return opts
|
||||
|
|
|
@ -80,7 +80,7 @@ func (c *cfg) setHealthStatus(st control.HealthStatus) {
|
|||
c.healthStatus.Store(int32(st))
|
||||
|
||||
if c.metricsCollector != nil {
|
||||
c.metricsCollector.SetHealth(int32(st))
|
||||
c.metricsCollector.State().SetHealth(int32(st))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -195,7 +195,7 @@ func initObjectService(c *cfg) {
|
|||
)
|
||||
|
||||
c.shared.metricsSvc = objectService.NewMetricCollector(
|
||||
signSvc, c.metricsCollector, metricsconfig.Enabled(c.appCfg))
|
||||
signSvc, c.metricsCollector.ObjectService(), metricsconfig.Enabled(c.appCfg))
|
||||
server := objectTransportGRPC.New(c.shared.metricsSvc)
|
||||
|
||||
for _, srv := range c.cfgGRPC.servers {
|
||||
|
@ -272,7 +272,7 @@ func createReplicator(c *cfg, keyStorage *util.KeyStorage, cache *cache.ClientCa
|
|||
replicator.WithRemoteSender(
|
||||
putsvc.NewRemoteSender(keyStorage, cache),
|
||||
),
|
||||
replicator.WithMetrics(c.metricsCollector),
|
||||
replicator.WithMetrics(c.metricsCollector.Replicator()),
|
||||
)
|
||||
}
|
||||
|
||||
|
|
|
@ -7,23 +7,40 @@ import (
|
|||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
type (
|
||||
engineMetrics struct {
|
||||
methodDuration *prometheus.HistogramVec
|
||||
type EngineMetrics interface {
|
||||
AddMethodDuration(method string, d time.Duration)
|
||||
AddToContainerSize(cnrID string, size int64)
|
||||
IncErrorCounter(shardID string)
|
||||
ClearErrorCounter(shardID string)
|
||||
DeleteShardMetrics(shardID string)
|
||||
AddToObjectCounter(shardID, objectType string, delta int)
|
||||
SetObjectCounter(shardID, objectType string, v uint64)
|
||||
AddToPayloadCounter(shardID string, size int64)
|
||||
SetReadonly(shardID string, readonly bool)
|
||||
|
||||
WriteCache() WriteCacheMetrics
|
||||
GC() GCMetrics
|
||||
}
|
||||
|
||||
type engineMetrics struct {
|
||||
methodDuration *prometheus.HistogramVec
|
||||
objectCounter *prometheus.GaugeVec
|
||||
containerSize *prometheus.GaugeVec
|
||||
payloadSize *prometheus.GaugeVec
|
||||
errorCounter *prometheus.GaugeVec
|
||||
shardsReadonly *prometheus.GaugeVec
|
||||
|
||||
gc *gcMetrics
|
||||
writeCache *writeCacheMetrics
|
||||
}
|
||||
)
|
||||
|
||||
const (
|
||||
engineSubsystem = "engine"
|
||||
engineMethod = "method"
|
||||
)
|
||||
|
||||
func newEngineMetrics() engineMetrics {
|
||||
return engineMetrics{
|
||||
func newEngineMetrics() *engineMetrics {
|
||||
return &engineMetrics{
|
||||
containerSize: newEngineGaugeVector("container_size_bytes", "Accumulated size of all objects in a container", []string{containerIDLabelKey}),
|
||||
payloadSize: newEngineGaugeVector("payload_size_bytes", "Accumulated size of all objects in a shard", []string{shardIDLabelKey}),
|
||||
errorCounter: newEngineGaugeVector("error_counter", "Shard's error counter", []string{shardIDLabelKey}),
|
||||
|
@ -33,6 +50,10 @@ func newEngineMetrics() engineMetrics {
|
|||
Name: "request_duration_seconds",
|
||||
Help: "Duration of Engine requests",
|
||||
}, []string{engineMethod}),
|
||||
objectCounter: newEngineGaugeVector("object_counter", "Objects counters per shards", []string{shardIDLabelKey, counterTypeLabelKey}),
|
||||
shardsReadonly: newEngineGaugeVector("mode", "Shard mode", []string{shardIDLabelKey}),
|
||||
gc: newGCMetrics(),
|
||||
writeCache: newWriteCacheMetrics(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -51,23 +72,63 @@ func (m *engineMetrics) AddMethodDuration(method string, d time.Duration) {
|
|||
}).Observe(d.Seconds())
|
||||
}
|
||||
|
||||
func (m engineMetrics) AddToContainerSize(cnrID string, size int64) {
|
||||
func (m *engineMetrics) AddToContainerSize(cnrID string, size int64) {
|
||||
m.containerSize.With(prometheus.Labels{containerIDLabelKey: cnrID}).Add(float64(size))
|
||||
}
|
||||
|
||||
func (m engineMetrics) AddToPayloadCounter(shardID string, size int64) {
|
||||
func (m *engineMetrics) AddToPayloadCounter(shardID string, size int64) {
|
||||
m.payloadSize.With(prometheus.Labels{shardIDLabelKey: shardID}).Add(float64(size))
|
||||
}
|
||||
|
||||
func (m engineMetrics) IncErrorCounter(shardID string) {
|
||||
func (m *engineMetrics) IncErrorCounter(shardID string) {
|
||||
m.errorCounter.With(prometheus.Labels{shardIDLabelKey: shardID}).Inc()
|
||||
}
|
||||
|
||||
func (m engineMetrics) ClearErrorCounter(shardID string) {
|
||||
func (m *engineMetrics) ClearErrorCounter(shardID string) {
|
||||
m.errorCounter.With(prometheus.Labels{shardIDLabelKey: shardID}).Set(0)
|
||||
}
|
||||
|
||||
func (m engineMetrics) DeleteShardMetrics(shardID string) {
|
||||
func (m *engineMetrics) DeleteShardMetrics(shardID string) {
|
||||
m.errorCounter.Delete(prometheus.Labels{shardIDLabelKey: shardID})
|
||||
m.payloadSize.Delete(prometheus.Labels{shardIDLabelKey: shardID})
|
||||
m.objectCounter.DeletePartialMatch(prometheus.Labels{shardIDLabelKey: shardID})
|
||||
m.shardsReadonly.Delete(prometheus.Labels{shardIDLabelKey: shardID})
|
||||
}
|
||||
|
||||
func (m *engineMetrics) AddToObjectCounter(shardID, objectType string, delta int) {
|
||||
m.objectCounter.With(
|
||||
prometheus.Labels{
|
||||
shardIDLabelKey: shardID,
|
||||
counterTypeLabelKey: objectType,
|
||||
},
|
||||
).Add(float64(delta))
|
||||
}
|
||||
|
||||
func (m *engineMetrics) SetObjectCounter(shardID, objectType string, v uint64) {
|
||||
m.objectCounter.With(
|
||||
prometheus.Labels{
|
||||
shardIDLabelKey: shardID,
|
||||
counterTypeLabelKey: objectType,
|
||||
},
|
||||
).Set(float64(v))
|
||||
}
|
||||
|
||||
func (m *engineMetrics) SetReadonly(shardID string, readonly bool) {
|
||||
var flag float64
|
||||
if readonly {
|
||||
flag = 1
|
||||
}
|
||||
m.shardsReadonly.With(
|
||||
prometheus.Labels{
|
||||
shardIDLabelKey: shardID,
|
||||
},
|
||||
).Set(flag)
|
||||
}
|
||||
|
||||
func (m *engineMetrics) WriteCache() WriteCacheMetrics {
|
||||
return m.writeCache
|
||||
}
|
||||
|
||||
func (m *engineMetrics) GC() GCMetrics {
|
||||
return m.gc
|
||||
}
|
||||
|
|
|
@ -54,22 +54,22 @@ func NewInnerRingMetrics() *InnerRingServiceMetrics {
|
|||
}
|
||||
|
||||
// SetEpoch updates epoch metrics.
|
||||
func (m InnerRingServiceMetrics) SetEpoch(epoch uint64) {
|
||||
func (m *InnerRingServiceMetrics) SetEpoch(epoch uint64) {
|
||||
m.epoch.Set(float64(epoch))
|
||||
}
|
||||
|
||||
// SetHealth updates health metrics.
|
||||
func (m InnerRingServiceMetrics) SetHealth(s int32) {
|
||||
func (m *InnerRingServiceMetrics) SetHealth(s int32) {
|
||||
m.health.Set(float64(s))
|
||||
}
|
||||
|
||||
func (m InnerRingServiceMetrics) AddEvent(d time.Duration, typ string, success bool) {
|
||||
func (m *InnerRingServiceMetrics) AddEvent(d time.Duration, typ string, success bool) {
|
||||
m.eventDuration.With(prometheus.Labels{
|
||||
innerRingLabelType: typ,
|
||||
innerRingLabelSuccess: strconv.FormatBool(success),
|
||||
}).Observe(d.Seconds())
|
||||
}
|
||||
|
||||
func (m InnerRingServiceMetrics) MorphCacheMetrics() MorphCacheMetrics {
|
||||
func (m *InnerRingServiceMetrics) MorphCacheMetrics() MorphCacheMetrics {
|
||||
return m.morphCacheMetrics
|
||||
}
|
||||
|
|
|
@ -8,15 +8,12 @@ import (
|
|||
const namespace = "frostfs_node"
|
||||
|
||||
type NodeMetrics struct {
|
||||
objectServiceMetrics
|
||||
engineMetrics
|
||||
stateMetrics
|
||||
replicatorMetrics
|
||||
|
||||
writeCacheMetrics *writeCacheMetrics
|
||||
engine *engineMetrics
|
||||
state *stateMetrics
|
||||
replicator *replicatorMetrics
|
||||
objectService *objectServiceMetrics
|
||||
treeService *treeServiceMetrics
|
||||
epoch prometheus.Gauge
|
||||
gc *gcMetrics
|
||||
}
|
||||
|
||||
func NewNodeMetrics() *NodeMetrics {
|
||||
|
@ -37,19 +34,13 @@ func NewNodeMetrics() *NodeMetrics {
|
|||
Help: "Current epoch as seen by inner-ring node.",
|
||||
})
|
||||
|
||||
writeCacheMetrics := newWriteCacheMetrics()
|
||||
|
||||
gc := newGCMetrics()
|
||||
|
||||
return &NodeMetrics{
|
||||
objectServiceMetrics: objectService,
|
||||
engineMetrics: engine,
|
||||
stateMetrics: state,
|
||||
replicatorMetrics: replicator,
|
||||
objectService: objectService,
|
||||
engine: engine,
|
||||
state: state,
|
||||
replicator: replicator,
|
||||
treeService: treeService,
|
||||
epoch: epoch,
|
||||
writeCacheMetrics: writeCacheMetrics,
|
||||
gc: gc,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -58,18 +49,22 @@ func (m *NodeMetrics) SetEpoch(epoch uint64) {
|
|||
m.epoch.Set(float64(epoch))
|
||||
}
|
||||
|
||||
// WriteCache returns WriteCache metrics.
|
||||
func (m *NodeMetrics) WriteCache() WriteCacheMetrics {
|
||||
if m == nil {
|
||||
return nil
|
||||
}
|
||||
return m.writeCacheMetrics
|
||||
}
|
||||
|
||||
func (m *NodeMetrics) TreeService() TreeMetricsRegister {
|
||||
return m.treeService
|
||||
}
|
||||
|
||||
func (m *NodeMetrics) GC() GCMetrics {
|
||||
return m.gc
|
||||
func (m *NodeMetrics) Replicator() ReplicatorMetrics {
|
||||
return m.replicator
|
||||
}
|
||||
|
||||
func (m *NodeMetrics) ObjectService() ObjectServiceMetrics {
|
||||
return m.objectService
|
||||
}
|
||||
|
||||
func (m *NodeMetrics) Engine() EngineMetrics {
|
||||
return m.engine
|
||||
}
|
||||
|
||||
func (m *NodeMetrics) State() StateMetrics {
|
||||
return m.state
|
||||
}
|
||||
|
|
|
@ -1,8 +1,7 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/metrics"
|
||||
|
@ -11,207 +10,50 @@ import (
|
|||
|
||||
const objectSubsystem = "object"
|
||||
|
||||
type (
|
||||
methodCount struct {
|
||||
success prometheus.Counter
|
||||
total prometheus.Counter
|
||||
type ObjectServiceMetrics interface {
|
||||
AddRequestDuration(method string, d time.Duration, success bool)
|
||||
AddPayloadSize(method string, size int)
|
||||
}
|
||||
|
||||
objectServiceMetrics struct {
|
||||
getCounter methodCount
|
||||
putCounter methodCount
|
||||
headCounter methodCount
|
||||
searchCounter methodCount
|
||||
deleteCounter methodCount
|
||||
rangeCounter methodCount
|
||||
rangeHashCounter methodCount
|
||||
|
||||
getDuration prometheus.Counter
|
||||
putDuration prometheus.Counter
|
||||
headDuration prometheus.Counter
|
||||
searchDuration prometheus.Counter
|
||||
deleteDuration prometheus.Counter
|
||||
rangeDuration prometheus.Counter
|
||||
rangeHashDuration prometheus.Counter
|
||||
|
||||
putPayload prometheus.Counter
|
||||
getPayload prometheus.Counter
|
||||
|
||||
shardMetrics *prometheus.GaugeVec
|
||||
shardsReadonly *prometheus.GaugeVec
|
||||
type objectServiceMetrics struct {
|
||||
methodDuration *prometheus.HistogramVec
|
||||
payloadCounter *prometheus.CounterVec
|
||||
}
|
||||
)
|
||||
|
||||
const (
|
||||
shardIDLabelKey = "shard"
|
||||
counterTypeLabelKey = "type"
|
||||
containerIDLabelKey = "cid"
|
||||
methodLabelKey = "method"
|
||||
successLabelKey = "success"
|
||||
)
|
||||
|
||||
func newObjectMethodCallCounter(name string) methodCount {
|
||||
return methodCount{
|
||||
success: metrics.NewCounter(prometheus.CounterOpts{
|
||||
func newObjectServiceMetrics() *objectServiceMetrics {
|
||||
return &objectServiceMetrics{
|
||||
methodDuration: metrics.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: objectSubsystem,
|
||||
Name: fmt.Sprintf("%s_req_count_success", name),
|
||||
Help: fmt.Sprintf("The number of successful %s requests processed", name),
|
||||
}),
|
||||
total: metrics.NewCounter(prometheus.CounterOpts{
|
||||
Name: "request_duration_seconds",
|
||||
Help: "Object Service request process duration",
|
||||
}, []string{methodLabelKey, successLabelKey}),
|
||||
payloadCounter: metrics.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: objectSubsystem,
|
||||
Name: fmt.Sprintf("%s_req_count", name),
|
||||
Help: fmt.Sprintf("Total number of %s requests processed", name),
|
||||
}),
|
||||
Name: "request_payload_bytes",
|
||||
Help: "Object Service request payload",
|
||||
}, []string{methodLabelKey}),
|
||||
}
|
||||
}
|
||||
|
||||
func (m methodCount) Inc(success bool) {
|
||||
m.total.Inc()
|
||||
if success {
|
||||
m.success.Inc()
|
||||
}
|
||||
func (m *objectServiceMetrics) AddRequestDuration(method string, d time.Duration, success bool) {
|
||||
m.methodDuration.With(prometheus.Labels{
|
||||
methodLabelKey: method,
|
||||
successLabelKey: strconv.FormatBool(success),
|
||||
}).Observe(d.Seconds())
|
||||
}
|
||||
|
||||
func newObjectServiceMetrics() objectServiceMetrics {
|
||||
return objectServiceMetrics{
|
||||
getCounter: newObjectMethodCallCounter("get"),
|
||||
putCounter: newObjectMethodCallCounter("put"),
|
||||
headCounter: newObjectMethodCallCounter("head"),
|
||||
searchCounter: newObjectMethodCallCounter("search"),
|
||||
deleteCounter: newObjectMethodCallCounter("delete"),
|
||||
rangeCounter: newObjectMethodCallCounter("range"),
|
||||
rangeHashCounter: newObjectMethodCallCounter("range_hash"),
|
||||
getDuration: newObjectMethodDurationCounter("get"),
|
||||
putDuration: newObjectMethodDurationCounter("put"),
|
||||
headDuration: newObjectMethodDurationCounter("head"),
|
||||
searchDuration: newObjectMethodDurationCounter("search"),
|
||||
deleteDuration: newObjectMethodDurationCounter("delete"),
|
||||
rangeDuration: newObjectMethodDurationCounter("range"),
|
||||
rangeHashDuration: newObjectMethodDurationCounter("range_hash"),
|
||||
putPayload: newObjectMethodPayloadCounter("put"),
|
||||
getPayload: newObjectMethodPayloadCounter("get"),
|
||||
shardMetrics: newObjectGaugeVector("counter", "Objects counters per shards", []string{shardIDLabelKey, counterTypeLabelKey}),
|
||||
shardsReadonly: newObjectGaugeVector("readonly", "Shard state", []string{shardIDLabelKey}),
|
||||
}
|
||||
}
|
||||
|
||||
func newObjectMethodPayloadCounter(method string) prometheus.Counter {
|
||||
return metrics.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: objectSubsystem,
|
||||
Name: fmt.Sprintf("%s_payload", method),
|
||||
Help: fmt.Sprintf("Accumulated payload size at object %s method", strings.ReplaceAll(method, "_", " ")),
|
||||
})
|
||||
}
|
||||
|
||||
func newObjectMethodDurationCounter(method string) prometheus.Counter {
|
||||
return metrics.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: objectSubsystem,
|
||||
Name: fmt.Sprintf("%s_req_duration", method),
|
||||
Help: fmt.Sprintf("Accumulated %s request process duration", strings.ReplaceAll(method, "_", " ")),
|
||||
})
|
||||
}
|
||||
|
||||
func newObjectGaugeVector(name, help string, labels []string) *prometheus.GaugeVec {
|
||||
return metrics.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: objectSubsystem,
|
||||
Name: name,
|
||||
Help: help,
|
||||
}, labels)
|
||||
}
|
||||
|
||||
func (m objectServiceMetrics) IncGetReqCounter(success bool) {
|
||||
m.getCounter.Inc(success)
|
||||
}
|
||||
|
||||
func (m objectServiceMetrics) IncPutReqCounter(success bool) {
|
||||
m.putCounter.Inc(success)
|
||||
}
|
||||
|
||||
func (m objectServiceMetrics) IncHeadReqCounter(success bool) {
|
||||
m.headCounter.Inc(success)
|
||||
}
|
||||
|
||||
func (m objectServiceMetrics) IncSearchReqCounter(success bool) {
|
||||
m.searchCounter.Inc(success)
|
||||
}
|
||||
|
||||
func (m objectServiceMetrics) IncDeleteReqCounter(success bool) {
|
||||
m.deleteCounter.Inc(success)
|
||||
}
|
||||
|
||||
func (m objectServiceMetrics) IncRangeReqCounter(success bool) {
|
||||
m.rangeCounter.Inc(success)
|
||||
}
|
||||
|
||||
func (m objectServiceMetrics) IncRangeHashReqCounter(success bool) {
|
||||
m.rangeHashCounter.Inc(success)
|
||||
}
|
||||
|
||||
func (m objectServiceMetrics) AddGetReqDuration(d time.Duration) {
|
||||
m.getDuration.Add(float64(d))
|
||||
}
|
||||
|
||||
func (m objectServiceMetrics) AddPutReqDuration(d time.Duration) {
|
||||
m.putDuration.Add(float64(d))
|
||||
}
|
||||
|
||||
func (m objectServiceMetrics) AddHeadReqDuration(d time.Duration) {
|
||||
m.headDuration.Add(float64(d))
|
||||
}
|
||||
|
||||
func (m objectServiceMetrics) AddSearchReqDuration(d time.Duration) {
|
||||
m.searchDuration.Add(float64(d))
|
||||
}
|
||||
|
||||
func (m objectServiceMetrics) AddDeleteReqDuration(d time.Duration) {
|
||||
m.deleteDuration.Add(float64(d))
|
||||
}
|
||||
|
||||
func (m objectServiceMetrics) AddRangeReqDuration(d time.Duration) {
|
||||
m.rangeDuration.Add(float64(d))
|
||||
}
|
||||
|
||||
func (m objectServiceMetrics) AddRangeHashReqDuration(d time.Duration) {
|
||||
m.rangeHashDuration.Add(float64(d))
|
||||
}
|
||||
|
||||
func (m objectServiceMetrics) AddPutPayload(ln int) {
|
||||
m.putPayload.Add(float64(ln))
|
||||
}
|
||||
|
||||
func (m objectServiceMetrics) AddGetPayload(ln int) {
|
||||
m.getPayload.Add(float64(ln))
|
||||
}
|
||||
|
||||
func (m objectServiceMetrics) AddToObjectCounter(shardID, objectType string, delta int) {
|
||||
m.shardMetrics.With(
|
||||
prometheus.Labels{
|
||||
shardIDLabelKey: shardID,
|
||||
counterTypeLabelKey: objectType,
|
||||
},
|
||||
).Add(float64(delta))
|
||||
}
|
||||
|
||||
func (m objectServiceMetrics) SetObjectCounter(shardID, objectType string, v uint64) {
|
||||
m.shardMetrics.With(
|
||||
prometheus.Labels{
|
||||
shardIDLabelKey: shardID,
|
||||
counterTypeLabelKey: objectType,
|
||||
},
|
||||
).Set(float64(v))
|
||||
}
|
||||
|
||||
func (m objectServiceMetrics) SetReadonly(shardID string, readonly bool) {
|
||||
var flag float64
|
||||
if readonly {
|
||||
flag = 1
|
||||
}
|
||||
m.shardsReadonly.With(
|
||||
prometheus.Labels{
|
||||
shardIDLabelKey: shardID,
|
||||
},
|
||||
).Set(flag)
|
||||
func (m *objectServiceMetrics) AddPayloadSize(method string, size int) {
|
||||
m.payloadCounter.With(prometheus.Labels{
|
||||
methodLabelKey: method,
|
||||
}).Add(float64(size))
|
||||
}
|
||||
|
|
|
@ -7,30 +7,39 @@ import (
|
|||
|
||||
const replicatorSubsystem = "replicator"
|
||||
|
||||
//TODO
|
||||
|
||||
type ReplicatorMetrics interface {
|
||||
IncInFlightRequest()
|
||||
DecInFlightRequest()
|
||||
IncProcessedObjects()
|
||||
AddPayloadSize(size int64)
|
||||
}
|
||||
|
||||
type replicatorMetrics struct {
|
||||
inFlightRequests prometheus.Gauge
|
||||
processedObjects prometheus.Counter
|
||||
totalReplicatedPayloadSize prometheus.Counter
|
||||
}
|
||||
|
||||
func (m replicatorMetrics) IncInFlightRequest() {
|
||||
func (m *replicatorMetrics) IncInFlightRequest() {
|
||||
m.inFlightRequests.Inc()
|
||||
}
|
||||
|
||||
func (m replicatorMetrics) DecInFlightRequest() {
|
||||
func (m *replicatorMetrics) DecInFlightRequest() {
|
||||
m.inFlightRequests.Dec()
|
||||
}
|
||||
|
||||
func (m replicatorMetrics) IncProcessedObjects() {
|
||||
func (m *replicatorMetrics) IncProcessedObjects() {
|
||||
m.processedObjects.Inc()
|
||||
}
|
||||
|
||||
func (m replicatorMetrics) AddPayloadSize(size int64) {
|
||||
func (m *replicatorMetrics) AddPayloadSize(size int64) {
|
||||
m.totalReplicatedPayloadSize.Add(float64(size))
|
||||
}
|
||||
|
||||
func newReplicatorMetrics() replicatorMetrics {
|
||||
return replicatorMetrics{
|
||||
func newReplicatorMetrics() *replicatorMetrics {
|
||||
return &replicatorMetrics{
|
||||
inFlightRequests: newReplicatorGauge("in_flight_requests", "Number of in-flight requests"),
|
||||
processedObjects: newReplicatorCounter("processed_objects", "Number of objects processed since the node startup"),
|
||||
totalReplicatedPayloadSize: newReplicatorCounter("total_replicated_payload_size", "Total size of payloads replicated"),
|
||||
|
|
|
@ -7,12 +7,16 @@ import (
|
|||
|
||||
const stateSubsystem = "state"
|
||||
|
||||
type StateMetrics interface {
|
||||
SetHealth(s int32)
|
||||
}
|
||||
|
||||
type stateMetrics struct {
|
||||
healthCheck prometheus.Gauge
|
||||
}
|
||||
|
||||
func newStateMetrics() stateMetrics {
|
||||
return stateMetrics{
|
||||
func newStateMetrics() *stateMetrics {
|
||||
return &stateMetrics{
|
||||
healthCheck: metrics.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: stateSubsystem,
|
||||
|
@ -22,6 +26,6 @@ func newStateMetrics() stateMetrics {
|
|||
}
|
||||
}
|
||||
|
||||
func (m stateMetrics) SetHealth(s int32) {
|
||||
func (m *stateMetrics) SetHealth(s int32) {
|
||||
m.healthCheck.Set(float64(s))
|
||||
}
|
||||
|
|
|
@ -28,24 +28,8 @@ type (
|
|||
}
|
||||
|
||||
MetricRegister interface {
|
||||
IncGetReqCounter(success bool)
|
||||
IncPutReqCounter(success bool)
|
||||
IncHeadReqCounter(success bool)
|
||||
IncSearchReqCounter(success bool)
|
||||
IncDeleteReqCounter(success bool)
|
||||
IncRangeReqCounter(success bool)
|
||||
IncRangeHashReqCounter(success bool)
|
||||
|
||||
AddGetReqDuration(time.Duration)
|
||||
AddPutReqDuration(time.Duration)
|
||||
AddHeadReqDuration(time.Duration)
|
||||
AddSearchReqDuration(time.Duration)
|
||||
AddDeleteReqDuration(time.Duration)
|
||||
AddRangeReqDuration(time.Duration)
|
||||
AddRangeHashReqDuration(time.Duration)
|
||||
|
||||
AddPutPayload(int)
|
||||
AddGetPayload(int)
|
||||
AddRequestDuration(string, time.Duration, bool)
|
||||
AddPayloadSize(string, int)
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -61,8 +45,7 @@ func (m MetricCollector) Get(req *object.GetRequest, stream GetObjectStream) (er
|
|||
if m.enabled {
|
||||
t := time.Now()
|
||||
defer func() {
|
||||
m.metrics.IncGetReqCounter(err == nil)
|
||||
m.metrics.AddGetReqDuration(time.Since(t))
|
||||
m.metrics.AddRequestDuration("Get", time.Since(t), err == nil)
|
||||
}()
|
||||
err = m.next.Get(req, &getStreamMetric{
|
||||
ServerStream: stream,
|
||||
|
@ -99,8 +82,7 @@ func (m MetricCollector) Head(ctx context.Context, request *object.HeadRequest)
|
|||
|
||||
res, err := m.next.Head(ctx, request)
|
||||
|
||||
m.metrics.IncHeadReqCounter(err == nil)
|
||||
m.metrics.AddHeadReqDuration(time.Since(t))
|
||||
m.metrics.AddRequestDuration("Head", time.Since(t), err == nil)
|
||||
|
||||
return res, err
|
||||
}
|
||||
|
@ -113,8 +95,7 @@ func (m MetricCollector) Search(req *object.SearchRequest, stream SearchStream)
|
|||
|
||||
err := m.next.Search(req, stream)
|
||||
|
||||
m.metrics.IncSearchReqCounter(err == nil)
|
||||
m.metrics.AddSearchReqDuration(time.Since(t))
|
||||
m.metrics.AddRequestDuration("Search", time.Since(t), err == nil)
|
||||
|
||||
return err
|
||||
}
|
||||
|
@ -127,8 +108,7 @@ func (m MetricCollector) Delete(ctx context.Context, request *object.DeleteReque
|
|||
|
||||
res, err := m.next.Delete(ctx, request)
|
||||
|
||||
m.metrics.IncDeleteReqCounter(err == nil)
|
||||
m.metrics.AddDeleteReqDuration(time.Since(t))
|
||||
m.metrics.AddRequestDuration("Delete", time.Since(t), err == nil)
|
||||
return res, err
|
||||
}
|
||||
return m.next.Delete(ctx, request)
|
||||
|
@ -140,8 +120,7 @@ func (m MetricCollector) GetRange(req *object.GetRangeRequest, stream GetObjectR
|
|||
|
||||
err := m.next.GetRange(req, stream)
|
||||
|
||||
m.metrics.IncRangeReqCounter(err == nil)
|
||||
m.metrics.AddRangeReqDuration(time.Since(t))
|
||||
m.metrics.AddRequestDuration("GetRange", time.Since(t), err == nil)
|
||||
|
||||
return err
|
||||
}
|
||||
|
@ -154,8 +133,7 @@ func (m MetricCollector) GetRangeHash(ctx context.Context, request *object.GetRa
|
|||
|
||||
res, err := m.next.GetRangeHash(ctx, request)
|
||||
|
||||
m.metrics.IncRangeHashReqCounter(err == nil)
|
||||
m.metrics.AddRangeHashReqDuration(time.Since(t))
|
||||
m.metrics.AddRequestDuration("GetRangeHash", time.Since(t), err == nil)
|
||||
|
||||
return res, err
|
||||
}
|
||||
|
@ -173,7 +151,7 @@ func (m *MetricCollector) Disable() {
|
|||
func (s getStreamMetric) Send(resp *object.GetResponse) error {
|
||||
chunk, ok := resp.GetBody().GetObjectPart().(*object.GetObjectPartChunk)
|
||||
if ok {
|
||||
s.metrics.AddGetPayload(len(chunk.GetChunk()))
|
||||
s.metrics.AddPayloadSize("Get", len(chunk.GetChunk()))
|
||||
}
|
||||
|
||||
return s.stream.Send(resp)
|
||||
|
@ -182,7 +160,7 @@ func (s getStreamMetric) Send(resp *object.GetResponse) error {
|
|||
func (s putStreamMetric) Send(ctx context.Context, req *object.PutRequest) error {
|
||||
chunk, ok := req.GetBody().GetObjectPart().(*object.PutObjectPartChunk)
|
||||
if ok {
|
||||
s.metrics.AddPutPayload(len(chunk.GetChunk()))
|
||||
s.metrics.AddPayloadSize("Put", len(chunk.GetChunk()))
|
||||
}
|
||||
|
||||
return s.stream.Send(ctx, req)
|
||||
|
@ -191,8 +169,7 @@ func (s putStreamMetric) Send(ctx context.Context, req *object.PutRequest) error
|
|||
func (s putStreamMetric) CloseAndRecv(ctx context.Context) (*object.PutResponse, error) {
|
||||
res, err := s.stream.CloseAndRecv(ctx)
|
||||
|
||||
s.metrics.IncPutReqCounter(err == nil)
|
||||
s.metrics.AddPutReqDuration(time.Since(s.start))
|
||||
s.metrics.AddRequestDuration("Put", time.Since(s.start), err == nil)
|
||||
|
||||
return res, err
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue