From b8e10571c64aabc8cf67e1155a5960ec917c438d Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Tue, 16 Mar 2021 11:14:56 +0300 Subject: [PATCH] [#426] Put prometheus behind pkg/metrics Signed-off-by: Alex Vanin --- cmd/neofs-node/config.go | 11 +- cmd/neofs-node/metrics.go | 8 +- cmd/neofs-node/object.go | 4 +- pkg/local_object_storage/engine/container.go | 8 +- pkg/local_object_storage/engine/control.go | 4 - pkg/local_object_storage/engine/delete.go | 4 +- pkg/local_object_storage/engine/engine.go | 6 +- pkg/local_object_storage/engine/exists.go | 4 +- pkg/local_object_storage/engine/get.go | 4 +- pkg/local_object_storage/engine/head.go | 4 +- pkg/local_object_storage/engine/inhume.go | 4 +- pkg/local_object_storage/engine/metrics.go | 114 +-------- pkg/local_object_storage/engine/put.go | 4 +- pkg/local_object_storage/engine/range.go | 4 +- pkg/local_object_storage/engine/select.go | 8 +- pkg/metrics/engine.go | 178 +++++++++++++ pkg/metrics/metrics.go | 21 ++ pkg/metrics/object.go | 256 +++++++++++++++++++ pkg/services/object/metrics.go | 226 ++++------------ 19 files changed, 561 insertions(+), 311 deletions(-) create mode 100644 pkg/metrics/engine.go create mode 100644 pkg/metrics/metrics.go create mode 100644 pkg/metrics/object.go diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 8dfd2a3e6..6bd971a4b 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -24,6 +24,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "github.com/nspcc-dev/neofs-node/pkg/metrics" "github.com/nspcc-dev/neofs-node/pkg/morph/client" "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper" nmwrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper" @@ -184,7 +185,9 @@ type cfg struct { profiler profiler.Profiler - metrics profiler.Metrics + metricsServer profiler.Metrics + + metricsCollector *metrics.StorageMetrics workers []worker @@ -362,6 +365,10 @@ func initCfg(path string) *cfg { healthStatus: atomic.NewInt32(int32(control.HealthStatus_HEALTH_STATUS_UNDEFINED)), } + if viperCfg.GetBool(cfgMetricsEnable) { + c.metricsCollector = metrics.NewStorageMetrics() + } + initLocalStorage(c) return c @@ -457,7 +464,7 @@ func initLocalStorage(c *cfg) { ls := engine.New( engine.WithLogger(c.log), - engine.WithMetrics(c.viper.GetBool(cfgMetricsEnable)), + engine.WithMetrics(c.metricsCollector), ) for _, opts := range c.cfgObject.cfgLocalStorage.shardOpts { diff --git a/cmd/neofs-node/metrics.go b/cmd/neofs-node/metrics.go index b1756779b..b0056b4fc 100644 --- a/cmd/neofs-node/metrics.go +++ b/cmd/neofs-node/metrics.go @@ -5,11 +5,13 @@ import ( ) func initMetrics(c *cfg) { - c.metrics = profiler.NewMetrics(c.log, c.viper) + if c.metricsCollector != nil { + c.metricsServer = profiler.NewMetrics(c.log, c.viper) + } } func serveMetrics(c *cfg) { - if c.metrics != nil { - c.metrics.Start(c.ctx) + if c.metricsServer != nil { + c.metricsServer.Start(c.ctx) } } diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 76b25388b..c8527de97 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -345,8 +345,8 @@ func initObjectService(c *cfg) { ) var firstSvc objectService.ServiceServer = aclSvc - if c.viper.GetBool(cfgMetricsEnable) { - firstSvc = objectService.NewMetricCollector(aclSvc) + if c.metricsCollector != nil { + firstSvc = objectService.NewMetricCollector(aclSvc, c.metricsCollector) } objectGRPC.RegisterObjectServiceServer(c.cfgGRPC.server, diff --git a/pkg/local_object_storage/engine/container.go b/pkg/local_object_storage/engine/container.go index 30e13422a..e4a0c27c9 100644 --- a/pkg/local_object_storage/engine/container.go +++ b/pkg/local_object_storage/engine/container.go @@ -38,8 +38,8 @@ func (r *ListContainersRes) Containers() []*container.ID { // ContainerSize returns sum of estimation container sizes among all shards. func (e *StorageEngine) ContainerSize(prm *ContainerSizePrm) *ContainerSizeRes { - if e.enableMetrics { - defer elapsed(estimateContainerSizeDuration)() + if e.metrics != nil { + defer elapsed(e.metrics.AddEstimateContainerSizeDuration)() } return &ContainerSizeRes{ @@ -74,8 +74,8 @@ func (e *StorageEngine) containerSize(id *container.ID) (total uint64) { // ListContainers returns unique container IDs presented in the engine objects. func (e *StorageEngine) ListContainers(_ *ListContainersPrm) *ListContainersRes { - if e.enableMetrics { - defer elapsed(listContainersDuration)() + if e.metrics != nil { + defer elapsed(e.metrics.AddListContainersDuration)() } return &ListContainersRes{ diff --git a/pkg/local_object_storage/engine/control.go b/pkg/local_object_storage/engine/control.go index fcb376096..72ef84bd9 100644 --- a/pkg/local_object_storage/engine/control.go +++ b/pkg/local_object_storage/engine/control.go @@ -30,10 +30,6 @@ func (e *StorageEngine) Init() error { } } - if e.enableMetrics { - registerMetrics() - } - return nil } diff --git a/pkg/local_object_storage/engine/delete.go b/pkg/local_object_storage/engine/delete.go index 5ab841d4a..01444a306 100644 --- a/pkg/local_object_storage/engine/delete.go +++ b/pkg/local_object_storage/engine/delete.go @@ -27,8 +27,8 @@ func (p *DeletePrm) WithAddresses(addr ...*objectSDK.Address) *DeletePrm { // Delete marks the objects to be removed. func (e *StorageEngine) Delete(prm *DeletePrm) (*DeleteRes, error) { - if e.enableMetrics { - defer elapsed(deleteDuration)() + if e.metrics != nil { + defer elapsed(e.metrics.AddDeleteDuration)() } shPrm := new(shard.InhumePrm) diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index d0d9b79a6..9843e3248 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -23,7 +23,7 @@ type Option func(*cfg) type cfg struct { log *logger.Logger - enableMetrics bool + metrics MetricRegister } func defaultCfg() *cfg { @@ -54,8 +54,8 @@ func WithLogger(l *logger.Logger) Option { } } -func WithMetrics(v bool) Option { +func WithMetrics(v MetricRegister) Option { return func(c *cfg) { - c.enableMetrics = v + c.metrics = v } } diff --git a/pkg/local_object_storage/engine/exists.go b/pkg/local_object_storage/engine/exists.go index 4d14be99b..9d75d3c09 100644 --- a/pkg/local_object_storage/engine/exists.go +++ b/pkg/local_object_storage/engine/exists.go @@ -38,8 +38,8 @@ func (p *ExistsRes) Exists() bool { // Returns any error encountered that does not allow to // unambiguously determine the presence of an object. func (e *StorageEngine) Exists(prm *ExistsPrm) (*ExistsRes, error) { - if e.enableMetrics { - defer elapsed(existsDuration)() + if e.metrics != nil { + defer elapsed(e.metrics.AddExistsDuration)() } exists, err := e.exists(prm.addr) diff --git a/pkg/local_object_storage/engine/get.go b/pkg/local_object_storage/engine/get.go index d6ab92cec..05e4a2623 100644 --- a/pkg/local_object_storage/engine/get.go +++ b/pkg/local_object_storage/engine/get.go @@ -42,8 +42,8 @@ func (r *GetRes) Object() *object.Object { // // Returns ErrNotFound if requested object is missing in local storage. func (e *StorageEngine) Get(prm *GetPrm) (*GetRes, error) { - if e.enableMetrics { - defer elapsed(getDuration)() + if e.metrics != nil { + defer elapsed(e.metrics.AddGetDuration)() } var ( diff --git a/pkg/local_object_storage/engine/head.go b/pkg/local_object_storage/engine/head.go index 81e980314..4052cee5a 100644 --- a/pkg/local_object_storage/engine/head.go +++ b/pkg/local_object_storage/engine/head.go @@ -56,8 +56,8 @@ func (r *HeadRes) Header() *object.Object { // Returns object.ErrNotFound if requested object is missing in local storage. // Returns object.ErrAlreadyRemoved if requested object was inhumed. func (e *StorageEngine) Head(prm *HeadPrm) (*HeadRes, error) { - if e.enableMetrics { - defer elapsed(headDuration)() + if e.metrics != nil { + defer elapsed(e.metrics.AddHeadDuration)() } var ( diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index a29253a80..dd50e87e3 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -49,8 +49,8 @@ var errInhumeFailure = errors.New("inhume operation failed") // Inhume calls metabase. Inhume method to mark object as removed. It won't be // removed physically from shard until `Delete` operation. func (e *StorageEngine) Inhume(prm *InhumePrm) (*InhumeRes, error) { - if e.enableMetrics { - defer elapsed(inhumeDuration)() + if e.metrics != nil { + defer elapsed(e.metrics.AddInhumeDuration)() } shPrm := new(shard.InhumePrm) diff --git a/pkg/local_object_storage/engine/metrics.go b/pkg/local_object_storage/engine/metrics.go index b51f1a4bd..69fecc27d 100644 --- a/pkg/local_object_storage/engine/metrics.go +++ b/pkg/local_object_storage/engine/metrics.go @@ -2,112 +2,26 @@ package engine import ( "time" - - "github.com/prometheus/client_golang/prometheus" ) -const ( - namespace = "neofs_node" - subsystem = "engine" -) - -var ( - listContainersDuration = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "list_containers_duration", - Help: "Accumulated duration of engine list containers operations", - }) - - estimateContainerSizeDuration = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "estimate_container_size_duration", - Help: "Accumulated duration of engine container size estimate operations", - }) - - deleteDuration = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "delete_duration", - Help: "Accumulated duration of engine delete operations", - }) - - existsDuration = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "exists_duration", - Help: "Accumulated duration of engine exists operations", - }) - - getDuration = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "get_duration", - Help: "Accumulated duration of engine get operations", - }) - - headDuration = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "head_duration", - Help: "Accumulated duration of engine head operations", - }) - - inhumeDuration = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "inhume_duration", - Help: "Accumulated duration of engine inhume operations", - }) - - putDuration = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "put_duration", - Help: "Accumulated duration of engine put operations", - }) - - rangeDuration = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "range_duration", - Help: "Accumulated duration of engine range operations", - }) - - searchDuration = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "search_duration", - Help: "Accumulated duration of engine search operations", - }) - - listObjectsDuration = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "list_objects_duration", - Help: "Accumulated duration of engine list objects operations", - }) -) - -func registerMetrics() { - prometheus.MustRegister(listContainersDuration) - prometheus.MustRegister(estimateContainerSizeDuration) - prometheus.MustRegister(deleteDuration) - prometheus.MustRegister(existsDuration) - prometheus.MustRegister(getDuration) - prometheus.MustRegister(headDuration) - prometheus.MustRegister(inhumeDuration) - prometheus.MustRegister(putDuration) - prometheus.MustRegister(rangeDuration) - prometheus.MustRegister(searchDuration) - prometheus.MustRegister(listObjectsDuration) +type MetricRegister interface { + AddListContainersDuration(d time.Duration) + AddEstimateContainerSizeDuration(d time.Duration) + AddDeleteDuration(d time.Duration) + AddExistsDuration(d time.Duration) + AddGetDuration(d time.Duration) + AddHeadDuration(d time.Duration) + AddInhumeDuration(d time.Duration) + AddPutDuration(d time.Duration) + AddRangeDuration(d time.Duration) + AddSearchDuration(d time.Duration) + AddListObjectsDuration(d time.Duration) } -func elapsed(c prometheus.Counter) func() { +func elapsed(addFunc func(d time.Duration)) func() { t := time.Now() return func() { - c.Add(float64(time.Since(t))) + addFunc(time.Since(t)) } } diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index 0abeb16fd..56f62612d 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -34,8 +34,8 @@ func (p *PutPrm) WithObject(obj *object.Object) *PutPrm { // Returns any error encountered that // did not allow to completely save the object. func (e *StorageEngine) Put(prm *PutPrm) (*PutRes, error) { - if e.enableMetrics { - defer elapsed(putDuration)() + if e.metrics != nil { + defer elapsed(e.metrics.AddPutDuration)() } _, err := e.exists(prm.obj.Address()) // todo: make this check parallel diff --git a/pkg/local_object_storage/engine/range.go b/pkg/local_object_storage/engine/range.go index fa01bc57c..1d7c9cf43 100644 --- a/pkg/local_object_storage/engine/range.go +++ b/pkg/local_object_storage/engine/range.go @@ -60,8 +60,8 @@ func (r *RngRes) Object() *object.Object { // Returns ErrAlreadyRemoved if requested object is inhumed. // Returns ErrRangeOutOfBounds if requested object range is out of bounds. func (e *StorageEngine) GetRange(prm *RngPrm) (*RngRes, error) { - if e.enableMetrics { - defer elapsed(rangeDuration)() + if e.metrics != nil { + defer elapsed(e.metrics.AddRangeDuration)() } var ( diff --git a/pkg/local_object_storage/engine/select.go b/pkg/local_object_storage/engine/select.go index 14ce8ffab..9b968dc1d 100644 --- a/pkg/local_object_storage/engine/select.go +++ b/pkg/local_object_storage/engine/select.go @@ -48,8 +48,8 @@ func (r *SelectRes) AddressList() []*object.Address { // // Returns any error encountered that did not allow to completely select the objects. func (e *StorageEngine) Select(prm *SelectPrm) (*SelectRes, error) { - if e.enableMetrics { - defer elapsed(searchDuration)() + if e.metrics != nil { + defer elapsed(e.metrics.AddSearchDuration)() } addrList := make([]*object.Address, 0) @@ -98,8 +98,8 @@ func (e *StorageEngine) Select(prm *SelectPrm) (*SelectRes, error) { // List returns `limit` available physically storage object addresses in engine. // If limit is zero, then returns all available object addresses. func (e *StorageEngine) List(limit uint64) (*SelectRes, error) { - if e.enableMetrics { - defer elapsed(listObjectsDuration)() + if e.metrics != nil { + defer elapsed(e.metrics.AddListObjectsDuration)() } addrList := make([]*object.Address, limit) diff --git a/pkg/metrics/engine.go b/pkg/metrics/engine.go new file mode 100644 index 000000000..4f280e0f2 --- /dev/null +++ b/pkg/metrics/engine.go @@ -0,0 +1,178 @@ +package metrics + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +type ( + engineMetrics struct { + listContainersDuration prometheus.Counter + estimateContainerSizeDuration prometheus.Counter + deleteDuration prometheus.Counter + existsDuration prometheus.Counter + getDuration prometheus.Counter + headDuration prometheus.Counter + inhumeDuration prometheus.Counter + putDuration prometheus.Counter + rangeDuration prometheus.Counter + searchDuration prometheus.Counter + listObjectsDuration prometheus.Counter + } +) + +const engineSubsystem = "engine" + +func newEngineMetrics() engineMetrics { + var ( + listContainersDuration = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: engineSubsystem, + Name: "list_containers_duration", + Help: "Accumulated duration of engine list containers operations", + }) + + estimateContainerSizeDuration = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: engineSubsystem, + Name: "estimate_container_size_duration", + Help: "Accumulated duration of engine container size estimate operations", + }) + + deleteDuration = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: engineSubsystem, + Name: "delete_duration", + Help: "Accumulated duration of engine delete operations", + }) + + existsDuration = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: engineSubsystem, + Name: "exists_duration", + Help: "Accumulated duration of engine exists operations", + }) + + getDuration = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: engineSubsystem, + Name: "get_duration", + Help: "Accumulated duration of engine get operations", + }) + + headDuration = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: engineSubsystem, + Name: "head_duration", + Help: "Accumulated duration of engine head operations", + }) + + inhumeDuration = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: engineSubsystem, + Name: "inhume_duration", + Help: "Accumulated duration of engine inhume operations", + }) + + putDuration = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: engineSubsystem, + Name: "put_duration", + Help: "Accumulated duration of engine put operations", + }) + + rangeDuration = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: engineSubsystem, + Name: "range_duration", + Help: "Accumulated duration of engine range operations", + }) + + searchDuration = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: engineSubsystem, + Name: "search_duration", + Help: "Accumulated duration of engine search operations", + }) + + listObjectsDuration = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: engineSubsystem, + Name: "list_objects_duration", + Help: "Accumulated duration of engine list objects operations", + }) + ) + + return engineMetrics{ + listContainersDuration: listContainersDuration, + estimateContainerSizeDuration: estimateContainerSizeDuration, + deleteDuration: deleteDuration, + existsDuration: existsDuration, + getDuration: getDuration, + headDuration: headDuration, + inhumeDuration: inhumeDuration, + putDuration: putDuration, + rangeDuration: rangeDuration, + searchDuration: searchDuration, + listObjectsDuration: listObjectsDuration, + } +} + +func (m engineMetrics) register() { + prometheus.MustRegister(m.listContainersDuration) + prometheus.MustRegister(m.estimateContainerSizeDuration) + prometheus.MustRegister(m.deleteDuration) + prometheus.MustRegister(m.existsDuration) + prometheus.MustRegister(m.getDuration) + prometheus.MustRegister(m.headDuration) + prometheus.MustRegister(m.inhumeDuration) + prometheus.MustRegister(m.putDuration) + prometheus.MustRegister(m.rangeDuration) + prometheus.MustRegister(m.searchDuration) + prometheus.MustRegister(m.listObjectsDuration) +} + +func (m engineMetrics) AddListContainersDuration(d time.Duration) { + m.listObjectsDuration.Add(float64(d)) +} + +func (m engineMetrics) AddEstimateContainerSizeDuration(d time.Duration) { + m.estimateContainerSizeDuration.Add(float64(d)) +} + +func (m engineMetrics) AddDeleteDuration(d time.Duration) { + m.deleteDuration.Add(float64(d)) +} + +func (m engineMetrics) AddExistsDuration(d time.Duration) { + m.existsDuration.Add(float64(d)) +} + +func (m engineMetrics) AddGetDuration(d time.Duration) { + m.getDuration.Add(float64(d)) +} + +func (m engineMetrics) AddHeadDuration(d time.Duration) { + m.headDuration.Add(float64(d)) +} + +func (m engineMetrics) AddInhumeDuration(d time.Duration) { + m.inhumeDuration.Add(float64(d)) +} + +func (m engineMetrics) AddPutDuration(d time.Duration) { + m.putDuration.Add(float64(d)) +} + +func (m engineMetrics) AddRangeDuration(d time.Duration) { + m.rangeDuration.Add(float64(d)) +} + +func (m engineMetrics) AddSearchDuration(d time.Duration) { + m.searchDuration.Add(float64(d)) +} + +func (m engineMetrics) AddListObjectsDuration(d time.Duration) { + m.listObjectsDuration.Add(float64(d)) +} diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go new file mode 100644 index 000000000..01e246dc9 --- /dev/null +++ b/pkg/metrics/metrics.go @@ -0,0 +1,21 @@ +package metrics + +const namespace = "neofs_node" + +type StorageMetrics struct { + objectServiceMetrics + engineMetrics +} + +func NewStorageMetrics() *StorageMetrics { + objectService := newObjectServiceMetrics() + objectService.register() + + engine := newEngineMetrics() + engine.register() + + return &StorageMetrics{ + objectServiceMetrics: objectService, + engineMetrics: engine, + } +} diff --git a/pkg/metrics/object.go b/pkg/metrics/object.go new file mode 100644 index 000000000..c447f5d2e --- /dev/null +++ b/pkg/metrics/object.go @@ -0,0 +1,256 @@ +package metrics + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +const objectSubsystem = "object" + +type ( + objectServiceMetrics struct { + getCounter prometheus.Counter + putCounter prometheus.Counter + headCounter prometheus.Counter + searchCounter prometheus.Counter + deleteCounter prometheus.Counter + rangeCounter prometheus.Counter + rangeHashCounter prometheus.Counter + + getDuration prometheus.Counter + putDuration prometheus.Counter + headDuration prometheus.Counter + searchDuration prometheus.Counter + deleteDuration prometheus.Counter + rangeDuration prometheus.Counter + rangeHashDuration prometheus.Counter + + putPayload prometheus.Counter + getPayload prometheus.Counter + } +) + +func newObjectServiceMetrics() objectServiceMetrics { + var ( // Request counter metrics. + getCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: objectSubsystem, + Name: "get_req_count", + Help: "Number of get request processed", + }) + + putCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: objectSubsystem, + Name: "put_req_count", + Help: "Number of put request processed", + }) + + headCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: objectSubsystem, + Name: "head_req_count", + Help: "Number of head request processed", + }) + + searchCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: objectSubsystem, + Name: "search_req_count", + Help: "Number of search request processed", + }) + + deleteCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: objectSubsystem, + Name: "delete_req_count", + Help: "Number of delete request processed", + }) + + rangeCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: objectSubsystem, + Name: "range_req_count", + Help: "Number of range request processed", + }) + + rangeHashCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: objectSubsystem, + Name: "range_hash_req_count", + Help: "Number of range hash request processed", + }) + ) + + var ( // Request duration metrics. + getDuration = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: objectSubsystem, + Name: "get_req_duration", + Help: "Accumulated get request process duration", + }) + + putDuration = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: objectSubsystem, + Name: "put_req_duration", + Help: "Accumulated put request process duration", + }) + + headDuration = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: objectSubsystem, + Name: "head_req_duration", + Help: "Accumulated head request process duration", + }) + + searchDuration = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: objectSubsystem, + Name: "search_req_duration", + Help: "Accumulated search request process duration", + }) + + deleteDuration = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: objectSubsystem, + Name: "delete_req_duration", + Help: "Accumulated delete request process duration", + }) + + rangeDuration = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: objectSubsystem, + Name: "range_req_duration", + Help: "Accumulated range request process duration", + }) + + rangeHashDuration = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: objectSubsystem, + Name: "range_hash_req_duration", + Help: "Accumulated range hash request process duration", + }) + ) + + var ( // Object payload metrics. + putPayload = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: objectSubsystem, + Name: "put_payload", + Help: "Accumulated payload size at object put method", + }) + + getPayload = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: objectSubsystem, + Name: "get_payload", + Help: "Accumulated payload size at object get method", + }) + ) + + return objectServiceMetrics{ + getCounter: getCounter, + putCounter: putCounter, + headCounter: headCounter, + searchCounter: searchCounter, + deleteCounter: deleteCounter, + rangeCounter: rangeCounter, + rangeHashCounter: rangeHashCounter, + getDuration: getDuration, + putDuration: putDuration, + headDuration: headDuration, + searchDuration: searchDuration, + deleteDuration: deleteDuration, + rangeDuration: rangeDuration, + rangeHashDuration: rangeHashDuration, + putPayload: putPayload, + getPayload: getPayload, + } +} + +func (m objectServiceMetrics) register() { + prometheus.MustRegister(m.getCounter) + prometheus.MustRegister(m.putCounter) + prometheus.MustRegister(m.headCounter) + prometheus.MustRegister(m.searchCounter) + prometheus.MustRegister(m.deleteCounter) + prometheus.MustRegister(m.rangeCounter) + prometheus.MustRegister(m.rangeHashCounter) + + prometheus.MustRegister(m.getDuration) + prometheus.MustRegister(m.putDuration) + prometheus.MustRegister(m.headDuration) + prometheus.MustRegister(m.searchDuration) + prometheus.MustRegister(m.deleteDuration) + prometheus.MustRegister(m.rangeDuration) + prometheus.MustRegister(m.rangeHashDuration) + + prometheus.MustRegister(m.putPayload) + prometheus.MustRegister(m.getPayload) +} + +func (m objectServiceMetrics) IncGetReqCounter() { + m.getCounter.Inc() +} + +func (m objectServiceMetrics) IncPutReqCounter() { + m.putCounter.Inc() +} + +func (m objectServiceMetrics) IncHeadReqCounter() { + m.headCounter.Inc() +} + +func (m objectServiceMetrics) IncSearchReqCounter() { + m.searchCounter.Inc() +} + +func (m objectServiceMetrics) IncDeleteReqCounter() { + m.deleteCounter.Inc() +} + +func (m objectServiceMetrics) IncRangeReqCounter() { + m.rangeCounter.Inc() +} + +func (m objectServiceMetrics) IncRangeHashReqCounter() { + m.rangeHashCounter.Inc() +} + +func (m objectServiceMetrics) AddGetReqDuration(d time.Duration) { + m.getDuration.Add(float64(d)) +} + +func (m objectServiceMetrics) AddPutReqDuration(d time.Duration) { + m.putDuration.Add(float64(d)) +} + +func (m objectServiceMetrics) AddHeadReqDuration(d time.Duration) { + m.headDuration.Add(float64(d)) +} + +func (m objectServiceMetrics) AddSearchReqDuration(d time.Duration) { + m.searchDuration.Add(float64(d)) +} + +func (m objectServiceMetrics) AddDeleteReqDuration(d time.Duration) { + m.deleteDuration.Add(float64(d)) +} + +func (m objectServiceMetrics) AddRangeReqDuration(d time.Duration) { + m.rangeDuration.Add(float64(d)) +} + +func (m objectServiceMetrics) AddRangeHashReqDuration(d time.Duration) { + m.rangeHashDuration.Add(float64(d)) +} + +func (m objectServiceMetrics) AddPutPayload(ln int) { + m.putPayload.Add(float64(ln)) +} + +func (m objectServiceMetrics) AddGetPayload(ln int) { + m.getPayload.Add(float64(ln)) +} diff --git a/pkg/services/object/metrics.go b/pkg/services/object/metrics.go index 1be5ebc23..8feb8d652 100644 --- a/pkg/services/object/metrics.go +++ b/pkg/services/object/metrics.go @@ -6,198 +6,73 @@ import ( "github.com/nspcc-dev/neofs-api-go/v2/object" "github.com/nspcc-dev/neofs-node/pkg/services/util" - "github.com/prometheus/client_golang/prometheus" ) type ( MetricCollector struct { - next ServiceServer + next ServiceServer + metrics MetricRegister } getStreamMetric struct { util.ServerStream - stream GetObjectStream + stream GetObjectStream + metrics MetricRegister } putStreamMetric struct { - stream object.PutObjectStreamer + stream object.PutObjectStreamer + metrics MetricRegister + } + + MetricRegister interface { + IncGetReqCounter() + IncPutReqCounter() + IncHeadReqCounter() + IncSearchReqCounter() + IncDeleteReqCounter() + IncRangeReqCounter() + IncRangeHashReqCounter() + + AddGetReqDuration(time.Duration) + AddPutReqDuration(time.Duration) + AddHeadReqDuration(time.Duration) + AddSearchReqDuration(time.Duration) + AddDeleteReqDuration(time.Duration) + AddRangeReqDuration(time.Duration) + AddRangeHashReqDuration(time.Duration) + + AddPutPayload(int) + AddGetPayload(int) } ) -const ( - namespace = "neofs_node" - subsystem = "object" -) - -// Request counter metrics. -var ( - getCounter = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "get_req_count", - Help: "Number of get request processed", - ConstLabels: nil, - }) - - putCounter = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "put_req_count", - Help: "Number of put request processed", - }) - - headCounter = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "head_req_count", - Help: "Number of head request processed", - }) - - searchCounter = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "search_req_count", - Help: "Number of search request processed", - }) - - deleteCounter = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "delete_req_count", - Help: "Number of delete request processed", - }) - - rangeCounter = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "range_req_count", - Help: "Number of range request processed", - }) - - rangeHashCounter = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "range_hash_req_count", - Help: "Number of range hash request processed", - }) -) - -// Request duration metrics. -var ( - getDuration = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "get_req_duration", - Help: "Accumulated get request process duration", - }) - - putDuration = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "put_req_duration", - Help: "Accumulated put request process duration", - }) - - headDuration = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "head_req_duration", - Help: "Accumulated head request process duration", - }) - - searchDuration = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "search_req_duration", - Help: "Accumulated search request process duration", - }) - - deleteDuration = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "delete_req_duration", - Help: "Accumulated delete request process duration", - }) - - rangeDuration = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "range_req_duration", - Help: "Accumulated range request process duration", - }) - - rangeHashDuration = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "range_hash_req_duration", - Help: "Accumulated range hash request process duration", - }) -) - -// Object payload metrics. -var ( - putPayload = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "put_payload", - Help: "Accumulated payload size at object put method", - }) - - getPayload = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "get_payload", - Help: "Accumulated payload size at object get method", - }) -) - -func registerMetrics() { - prometheus.MustRegister(getCounter) // todo: replace with for loop over map - prometheus.MustRegister(putCounter) - prometheus.MustRegister(headCounter) - prometheus.MustRegister(searchCounter) - prometheus.MustRegister(deleteCounter) - prometheus.MustRegister(rangeCounter) - prometheus.MustRegister(rangeHashCounter) - - prometheus.MustRegister(getDuration) - prometheus.MustRegister(putDuration) - prometheus.MustRegister(headDuration) - prometheus.MustRegister(searchDuration) - prometheus.MustRegister(deleteDuration) - prometheus.MustRegister(rangeDuration) - prometheus.MustRegister(rangeHashDuration) - - prometheus.MustRegister(putPayload) - prometheus.MustRegister(getPayload) -} - -func NewMetricCollector(next ServiceServer) *MetricCollector { - registerMetrics() - +func NewMetricCollector(next ServiceServer, register MetricRegister) *MetricCollector { return &MetricCollector{ - next: next, + next: next, + metrics: register, } } func (m MetricCollector) Get(req *object.GetRequest, stream GetObjectStream) error { t := time.Now() defer func() { - getCounter.Inc() - getDuration.Add(float64(time.Since(t))) + m.metrics.IncGetReqCounter() + m.metrics.AddGetReqDuration(time.Since(t)) }() return m.next.Get(req, &getStreamMetric{ ServerStream: stream, stream: stream, + metrics: m.metrics, }) } func (m MetricCollector) Put(ctx context.Context) (object.PutObjectStreamer, error) { t := time.Now() defer func() { - putCounter.Inc() - putDuration.Add(float64(time.Since(t))) + m.metrics.IncPutReqCounter() + m.metrics.AddPutReqDuration(time.Since(t)) }() stream, err := m.next.Put(ctx) @@ -205,14 +80,17 @@ func (m MetricCollector) Put(ctx context.Context) (object.PutObjectStreamer, err return nil, err } - return &putStreamMetric{stream: stream}, nil + return &putStreamMetric{ + stream: stream, + metrics: m.metrics, + }, nil } func (m MetricCollector) Head(ctx context.Context, request *object.HeadRequest) (*object.HeadResponse, error) { t := time.Now() defer func() { - headCounter.Inc() - headDuration.Add(float64(time.Since(t))) + m.metrics.IncHeadReqCounter() + m.metrics.AddHeadReqDuration(time.Since(t)) }() return m.next.Head(ctx, request) @@ -221,8 +99,8 @@ func (m MetricCollector) Head(ctx context.Context, request *object.HeadRequest) func (m MetricCollector) Search(req *object.SearchRequest, stream SearchStream) error { t := time.Now() defer func() { - searchCounter.Inc() - searchDuration.Add(float64(time.Since(t))) + m.metrics.IncSearchReqCounter() + m.metrics.AddSearchReqDuration(time.Since(t)) }() return m.next.Search(req, stream) @@ -231,8 +109,8 @@ func (m MetricCollector) Search(req *object.SearchRequest, stream SearchStream) func (m MetricCollector) Delete(ctx context.Context, request *object.DeleteRequest) (*object.DeleteResponse, error) { t := time.Now() defer func() { - deleteCounter.Inc() - deleteDuration.Add(float64(time.Since(t))) + m.metrics.IncDeleteReqCounter() + m.metrics.AddDeleteReqDuration(time.Since(t)) }() return m.next.Delete(ctx, request) @@ -241,8 +119,8 @@ func (m MetricCollector) Delete(ctx context.Context, request *object.DeleteReque func (m MetricCollector) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error { t := time.Now() defer func() { - rangeCounter.Inc() - rangeDuration.Add(float64(time.Since(t))) + m.metrics.IncRangeReqCounter() + m.metrics.AddRangeReqDuration(time.Since(t)) }() return m.next.GetRange(req, stream) @@ -251,8 +129,8 @@ func (m MetricCollector) GetRange(req *object.GetRangeRequest, stream GetObjectR func (m MetricCollector) GetRangeHash(ctx context.Context, request *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) { t := time.Now() defer func() { - rangeHashCounter.Inc() - rangeHashDuration.Add(float64(time.Since(t))) + m.metrics.IncRangeHashReqCounter() + m.metrics.AddRangeHashReqDuration(time.Since(t)) }() return m.next.GetRangeHash(ctx, request) @@ -261,8 +139,7 @@ func (m MetricCollector) GetRangeHash(ctx context.Context, request *object.GetRa func (s getStreamMetric) Send(resp *object.GetResponse) error { chunk, ok := resp.GetBody().GetObjectPart().(*object.GetObjectPartChunk) if ok { - ln := len(chunk.GetChunk()) - getPayload.Add(float64(ln)) + s.metrics.AddGetPayload(len(chunk.GetChunk())) } return s.stream.Send(resp) @@ -271,8 +148,7 @@ func (s getStreamMetric) Send(resp *object.GetResponse) error { func (s putStreamMetric) Send(req *object.PutRequest) error { chunk, ok := req.GetBody().GetObjectPart().(*object.PutObjectPartChunk) if ok { - ln := len(chunk.GetChunk()) - putPayload.Add(float64(ln)) + s.metrics.AddPutPayload(len(chunk.GetChunk())) } return s.stream.Send(req)